Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion back/scripts/workflow/data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,43 @@ def _send_to_postgres(self):
f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name= '{table_name}')"
)
table_exists = conn.execute(table_exists_query).scalar()

if table_exists:
conn.execute(text(f"TRUNCATE {table_name}"))

add_missing_columns_to_sql_table(conn, table_name, df)
df.write_database(table_name, conn, if_table_exists=if_table_exists)

def add_missing_columns_to_sql_table(conn, table_name: str, df: pl.DataFrame):
"""Ajoute les colonnes manquantes dans la table SQL à partir du DataFrame Polars."""

schema = df.schema
columns_sql = conn.execute(
text(f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{table_name}'
""")
).fetchall()
existing_cols = {col[0] for col in columns_sql}

missing_cols = set(schema.keys()) - existing_cols
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tu savais que dict.keys() possède une compatibilité partielle avec les set ?
missing_cols = schema.keys() - existing_cols serait suffisant ici.

if not missing_cols:
return

# Mapping Polars -> SQL
type_mapping = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Il y a un bout de code qui a l'air de faire l'inverse dans Historisateur.convert_types_for_sql.
Ce dictionnaire pourrait être utile ailleurs dans le code, je le sortirai de la méthode pour en créer une constante.

pl.Int64: "BIGINT",
pl.Int32: "INTEGER",
pl.Float64: "DOUBLE PRECISION",
pl.Float32: "REAL",
pl.Boolean: "BOOLEAN",
pl.Utf8: "TEXT",
pl.Date: "DATE",
pl.Datetime: "TIMESTAMP",
}

for col in missing_cols:
pl_type = schema[col]
sql_type = type_mapping.get(pl_type, "TEXT")
sql = text(f'ALTER TABLE "{table_name}" ADD COLUMN "{col}" {sql_type};')
conn.execute(sql)
conn.commit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est bon sur le principe, idéalement il faudrait réduire les appels à la db en concaténant les requêtes pour n'en faire qu'une seule.

Peut-être quelque chose comme ça ?

        add_columns = [… for col in missing_cols]
        sql = text(f'ALTER TABLE "{table_name}" {", ".join(add_columns)};)
        conn.execute(sql)
        conn.commit()