Skip to content

Commit

Permalink
correctly closing psycop2 connections
Browse files Browse the repository at this point in the history
  • Loading branch information
enricofer committed Sep 1, 2024
1 parent 7aa7460 commit 305303b
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 147 deletions.
196 changes: 106 additions & 90 deletions webapp/djakart/kart_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@


def get_pg_versions_connection():
return psycopg2.connect(
connection = psycopg2.connect(
database=os.environ.get("VERSION_DB", "blabla"),
user=os.environ.get("POSTGRES_USER", "blabla"),
password=os.environ.get("POSTGRES_PASSWORD", "blabla"),
host=os.environ.get("POSTGRES_SERVER", "blabla"),
port=os.environ.get("POSTGRES_PORT", "blabla")
)
return connection


class KartException(Exception):
Expand Down Expand Up @@ -116,74 +117,84 @@ def crea_fdw(nuova_versione):
GRANT ALL ON TABLE "{schema}".pua TO postgres;
"""

cursor = get_pg_versions_connection().cursor()
with get_pg_versions_connection() as conn:

common_params = {
"admin": KART_SU,
"user": KART_PGUSER,
"crs": SRID
}
with conn.cursor() as cursor:

try:
print (crea_fdw_template.format(**common_params, schema=nuova_versione))
cursor.execute(crea_fdw_template.format(**common_params, schema=nuova_versione))
return True
except Exception as e:
print(e)
cursor.execute(crea_fdw_template.format(**common_params, schema=nuova_versione+"_pub"))
return None
common_params = {
"admin": KART_SU,
"user": KART_PGUSER,
"crs": SRID
}

try:
print (crea_fdw_template.format(**common_params, schema=nuova_versione))
cursor.execute(crea_fdw_template.format(**common_params, schema=nuova_versione))
return True
except Exception as e:
print(e)
cursor.execute(crea_fdw_template.format(**common_params, schema=nuova_versione+"_pub"))
return None


def crea_pg_schema(nuova_versione, readonly=False):

if readonly:
schema_owner = PG_SU
else:
schema_owner = KART_SU
with get_pg_versions_connection() as conn:
if readonly:
schema_owner = PG_SU
else:
schema_owner = KART_SU

crea_schema_template = '''
CREATE SCHEMA IF NOT EXISTS "{schema}" AUTHORIZATION "{owner}";
'''
crea_schema_template = '''
CREATE SCHEMA IF NOT EXISTS "{schema}" AUTHORIZATION "{owner}";
GRANT USAGE ON SCHEMA "{schema}" TO "{user}";
GRANT USAGE ON SCHEMA "{schema}" TO "{admin}";
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO "{user}";
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO "{admin}";
'''

cursor = get_pg_versions_connection().cursor()
cursor.execute(crea_schema_template.format(schema=nuova_versione, owner=schema_owner))
with conn.cursor() as curs:
curs.execute(crea_schema_template.format(schema=nuova_versione, owner=schema_owner, admin=KART_SU, user=KART_PGUSER))
print ("crea_schema_template", crea_schema_template.format(schema=nuova_versione, owner=schema_owner, admin=KART_SU, user=KART_PGUSER))


def grant_select_schema(versione, schema_user=KART_PGUSER, schema_admin=KART_SU):

grant_select_template = '''
GRANT USAGE ON SCHEMA "{schema}" TO "{user}";
GRANT ALL ON SCHEMA "{schema}" TO "{admin}";
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA "{schema}" TO "{admin}";
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO "{user}";
GRANT SELECT ON ALL SEQUENCES IN SCHEMA "{schema}" TO "{user}";
GRANT ALL ON ALL TABLES IN SCHEMA "{schema}" TO "{admin}";
'''

check_schema_template = "SELECT schema_name FROM information_schema.schemata where schema_name = '%s';"

cursor = get_pg_versions_connection().cursor()
with get_pg_versions_connection() as conn:

cursor.execute( check_schema_template % versione)
schema_nobase = cursor.fetchall()
with conn.cursor() as cursor:

cursor.execute( check_schema_template % (versione+'_pub'))
schema_base = cursor.fetchall()
cursor.execute(check_schema_template % versione)
schema_nobase = cursor.fetchall()

cursor.execute(check_schema_template % (versione+'_pub'))
schema_base = cursor.fetchall()

if schema_nobase:
cursor.execute(grant_select_template.format(
schema=versione,
admin=schema_admin,
user=schema_user
))
return True
elif schema_base:
print("SCHEMA BASE")
cursor.execute(grant_select_template.format(
schema=versione+"_pub",
admin=schema_admin,
user=schema_user
))
return None
if len(schema_nobase) > 0:
cursor.execute(grant_select_template.format(
schema=versione,
admin=schema_admin,
user=schema_user
))
return True
elif len(schema_base) > 0:
cursor.execute(grant_select_template.format(
schema=versione+"_pub",
admin=schema_admin,
user=schema_user
))
return None


def elimina_pg_schema(versione):
Expand All @@ -192,14 +203,15 @@ def elimina_pg_schema(versione):
canc_schema_template = '''
DROP SCHEMA IF EXISTS "{schema}" CASCADE
'''
with get_pg_versions_connection() as conn:

cursor = get_pg_versions_connection().cursor()
try:
cursor.execute(canc_schema_template.format(schema=versione, owner=schema_owner))
return True
except Exception as e:
print(e)
return None
with conn.cursor() as cursor:
try:
cursor.execute(canc_schema_template.format(schema=versione, owner=schema_owner))
return True
except Exception as e:
print(e)
return None

def crea_nuovo_repository(repo_name,bare=True,readonly_workingcopy=None):
repo_path = os.path.join(settings.KART_REPO,repo_name)
Expand Down Expand Up @@ -245,7 +257,6 @@ def crea_nuova_versione(nuova_versione,base,tipo="pg"):
new_branch = executeCmd(["--repo",nuova_versione_path,"checkout", "-b", nuova_versione])
new_wc = create_workingcopy(nuova_versione,uri,force=True)
grant_select_schema(nuova_versione)
grant_select_schema(nuova_versione)
#crea_fdw(nuova_versione)
serial_pk_setup(nuova_versione)

Expand Down Expand Up @@ -290,12 +301,12 @@ def merge_versione(versione, abort=False, confirm=False):
master_versione = os.path.split(master_path)[-1]
push_cmd = executeCmd(["--repo", versione_path, "push", "origin", versione])
#chkout_cmd = executeKart(["--repo", master_path, "checkout", "main"])
merge_cmd = executeCmd(["--repo",master_path,"merge","-m","merge "+versione, versione])
merge_cmd = executeCmd(["--repo",master_path,"merge","--ff", versione])
if ("Nothing to commit, working copy clean" in status_versione(master_path)) or ("No working copy" in status_versione(master_path)):
#non dovrebbe cancellare branch con conflitti in fase di merge
delete_branch_cmd = executeCmd(["--repo",master_path,"branch","-d",versione])
grant_select_schema(master_versione)
grant_select_schema(versione)
#grant_select_schema(master_versione)
#grant_select_schema(versione)


def clone_versione(versione, target):
Expand Down Expand Up @@ -370,7 +381,7 @@ def importa_dataset(versione,ds_path,max_extent=None):
print ("MAXEXTENT", versione, max_extent)

importa_cmd = executeCmd(["--repo", versione_path, "import", "--replace-existing", "-a", ds_path])
grant_select_schema(versione)
#grant_select_schema(versione)

return max_extent

Expand Down Expand Up @@ -517,10 +528,10 @@ def list_versioned_tables(versione):
return [ds for ds in ds_list if ds != ""]

def prevent_conflicts_on_ids(versione):
cursor = get_pg_versions_connection().cursor()
for table in list_versioned_tables(versione):
stepoverseq = ""

with get_pg_versions_connection() as conn:
with conn.cursor() as cursor:
for table in list_versioned_tables(versione):
stepoverseq = ""

def geo_tables(versione):
versione_path = os.path.join(settings.KART_REPO,versione)
Expand Down Expand Up @@ -587,46 +598,51 @@ def recover_uncommitted_nulls(versione):
sql = ""
for corr in corrs:
sql += 'UPDATE "{schema}"."{table}" SET "{field}" = {value};\n'.format(**corr)
cursor = get_pg_versions_connection().cursor()
cursor.execute(sql)

with get_pg_versions_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)


def get_schemas():
sql = """ SELECT nspname FROM pg_catalog.pg_namespace;"""
cursor = get_pg_versions_connection().cursor()
cursor.execute(sql)
return [row[0] for row in cursor.fetchall()]
with get_pg_versions_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
return [row[0] for row in cursor.fetchall()]


def get_sequences(schema):
sql = """SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = '{schema}' """.format(schema=schema)
cursor = get_pg_versions_connection().cursor()
cursor.execute(sql)
return [row[0] for row in cursor.fetchall()]
with get_pg_versions_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
return [row[0] for row in cursor.fetchall()]


def serial_pk_setup(versione, aumento=100):
cursor = get_pg_versions_connection().cursor()
base_path = get_remote(versione)
base = os.path.split(base_path)[-1]
if base:
schemas = get_schemas()
#if not base in schemas:
# base = base + "_pub"
# if not base in schemas:
# raise KartException
sequences = get_sequences(base.replace("_pub",""))
for tab in list_versioned_tables(base):
if not (tab + "_auto_pk_seq") in sequences:
continue
#sql = """SELECT MAX(auto_pk) from "{schema}"."{table}";""".format(schema=versione,table=tab)
sql = """SELECT last_value FROM "{schema}"."{table}_auto_pk_seq";""".format(schema=base,table=tab)
cursor.execute(sql)
min_pk = cursor.fetchone()[0]
min_pk += 100
sql = """ALTER SEQUENCE "{schema}"."{table}_auto_pk_seq" RESTART WITH {val}""".format(schema=versione, table=tab, val=min_pk)
cursor.execute(sql)
sql = """ALTER SEQUENCE "{schema}"."{table}_auto_pk_seq" RESTART WITH {val}""".format(schema=base, table=tab, val=min_pk)
cursor.execute(sql)
with get_pg_versions_connection() as conn:
with conn.cursor() as cursor:
base_path = get_remote(versione)
base = os.path.split(base_path)[-1]
if base:
schemas = get_schemas()
#if not base in schemas:
# base = base + "_pub"
# if not base in schemas:
# raise KartException
sequences = get_sequences(base.replace("_pub",""))
for tab in list_versioned_tables(base):
if not (tab + "_auto_pk_seq") in sequences:
continue
#sql = """SELECT MAX(auto_pk) from "{schema}"."{table}";""".format(schema=versione,table=tab)
sql = """SELECT last_value FROM "{schema}"."{table}_auto_pk_seq";""".format(schema=base,table=tab)
cursor.execute(sql)
min_pk = cursor.fetchone()[0]
min_pk += 100
sql = """ALTER SEQUENCE "{schema}"."{table}_auto_pk_seq" RESTART WITH {val}""".format(schema=versione, table=tab, val=min_pk)
cursor.execute(sql)
sql = """ALTER SEQUENCE "{schema}"."{table}_auto_pk_seq" RESTART WITH {val}""".format(schema=base, table=tab, val=min_pk)
cursor.execute(sql)


Loading

0 comments on commit 305303b

Please sign in to comment.