From c4fb10c2d3985204662079454161e00af1c0beec Mon Sep 17 00:00:00 2001 From: chloebarre Date: Tue, 15 Jul 2025 22:32:52 +0200 Subject: [PATCH 1/4] pb de nouvelle colonne --- back/scripts/workflow/data_warehouse.py | 38 ++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/back/scripts/workflow/data_warehouse.py b/back/scripts/workflow/data_warehouse.py index bdb23daa0..67ab67bb7 100644 --- a/back/scripts/workflow/data_warehouse.py +++ b/back/scripts/workflow/data_warehouse.py @@ -59,7 +59,43 @@ def _send_to_postgres(self): f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name= '{table_name}')" ) table_exists = conn.execute(table_exists_query).scalar() + if table_exists: conn.execute(text(f"TRUNCATE {table_name}")) - + add_missing_columns_to_sql_table(conn, table_name, df) df.write_database(table_name, conn, if_table_exists=if_table_exists) + + def add_missing_columns_to_sql_table(conn, table_name: str, df: pl.DataFrame): + """Ajoute les colonnes manquantes dans la table SQL à partir du DataFrame Polars.""" + + schema = df.schema + columns_sql = conn.execute( + text(f""" + SELECT column_name FROM information_schema.columns + WHERE table_name = '{table_name}' + """) + ).fetchall() + existing_cols = {col[0] for col in columns_sql} + + missing_cols = set(schema.keys()) - existing_cols + if not missing_cols: + return + + # Mapping Polars -> SQL + type_mapping = { + pl.Int64: "BIGINT", + pl.Int32: "INTEGER", + pl.Float64: "DOUBLE PRECISION", + pl.Float32: "REAL", + pl.Boolean: "BOOLEAN", + pl.Utf8: "TEXT", + pl.Date: "DATE", + pl.Datetime: "TIMESTAMP", + } + + for col in missing_cols: + pl_type = schema[col] + sql_type = type_mapping.get(pl_type, "TEXT") + sql = text(f'ALTER TABLE "{table_name}" ADD COLUMN "{col}" {sql_type};') + conn.execute(sql) + conn.commit() From 570750760b375f65011b07b2191a0988c08e305c Mon Sep 17 00:00:00 2001 From: chloebarre Date: Thu, 24 Jul 2025 12:48:24 +0200 Subject: [PATCH 2/4] add_missing_columns_to_sql_table and chunksize --- back/scripts/workflow/data_warehouse.py | 27 +++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/back/scripts/workflow/data_warehouse.py b/back/scripts/workflow/data_warehouse.py index 67ab67bb7..7f105cee3 100644 --- a/back/scripts/workflow/data_warehouse.py +++ b/back/scripts/workflow/data_warehouse.py @@ -1,5 +1,6 @@ from pathlib import Path +import pandas as pd import polars as pl from sqlalchemy import text @@ -19,16 +20,17 @@ def __init__(self, config: dict): self._config = config self.warehouse_folder = Path(self._config["warehouse"]["data_folder"]) self.warehouse_folder.mkdir(exist_ok=True, parents=True) + self.chunksize = 10000 self.send_to_db = { - "collectivites": CommunitiesEnricher.get_output_path(config), - "marches_publics": MarchesPublicsEnricher.get_output_path(config), - "subventions": SubventionsEnricher.get_output_path(config), - "comptes_collectivites": FinancialEnricher.get_output_path(config), - "elus": ElectedOfficialsEnricher.get_output_path(config), - "declarations_interet": DeclaInteretWorkflow.get_output_path(config), - "communities_contacts": CommunitiesContact.get_output_path(config), - "bareme": BaremeEnricher.get_output_path(config), + "collectivites_test": CommunitiesEnricher.get_output_path(config), + "marches_publics_test": MarchesPublicsEnricher.get_output_path(config), + "subventions_test": SubventionsEnricher.get_output_path(config), + "comptes_collectivites_test": FinancialEnricher.get_output_path(config), + "elus_test": ElectedOfficialsEnricher.get_output_path(config), + "declarations_interet_test": DeclaInteretWorkflow.get_output_path(config), + "communities_contacts_test": CommunitiesContact.get_output_path(config), + "bareme_test": BaremeEnricher.get_output_path(config), } def run(self) -> None: @@ -50,7 +52,7 @@ def _send_to_postgres(self): # or keep the same schema. if_table_exists = "replace" if self._config["workflow"]["replace_tables"] else "append" - with connector.engine.connect() as conn: + with connector.engine.begin() as conn: for table_name, filename in self.send_to_db.items(): df = pl.read_parquet(filename) @@ -62,9 +64,12 @@ def _send_to_postgres(self): if table_exists: conn.execute(text(f"TRUNCATE {table_name}")) - add_missing_columns_to_sql_table(conn, table_name, df) - df.write_database(table_name, conn, if_table_exists=if_table_exists) + self.add_missing_columns_to_sql_table(conn, table_name, df) + df.to_pandas().to_sql( + table_name, conn, if_exists=if_table_exists, chunksize=self.chunksize + ) + @staticmethod def add_missing_columns_to_sql_table(conn, table_name: str, df: pl.DataFrame): """Ajoute les colonnes manquantes dans la table SQL à partir du DataFrame Polars.""" From 4de4a8abfed98e96b1aeb0d91b5956334ae0b4c4 Mon Sep 17 00:00:00 2001 From: chloebarre Date: Thu, 24 Jul 2025 12:56:10 +0200 Subject: [PATCH 3/4] correction des commentaires --- back/scripts/workflow/data_warehouse.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/back/scripts/workflow/data_warehouse.py b/back/scripts/workflow/data_warehouse.py index 7f105cee3..3f0e09ded 100644 --- a/back/scripts/workflow/data_warehouse.py +++ b/back/scripts/workflow/data_warehouse.py @@ -82,7 +82,7 @@ def add_missing_columns_to_sql_table(conn, table_name: str, df: pl.DataFrame): ).fetchall() existing_cols = {col[0] for col in columns_sql} - missing_cols = set(schema.keys()) - existing_cols + missing_cols = schema.keys() - existing_cols if not missing_cols: return @@ -98,9 +98,13 @@ def add_missing_columns_to_sql_table(conn, table_name: str, df: pl.DataFrame): pl.Datetime: "TIMESTAMP", } - for col in missing_cols: - pl_type = schema[col] - sql_type = type_mapping.get(pl_type, "TEXT") - sql = text(f'ALTER TABLE "{table_name}" ADD COLUMN "{col}" {sql_type};') - conn.execute(sql) + if missing_cols: + add_columns = [] + for col in missing_cols: + pl_type = schema[col] + sql_type = type_mapping.get(pl_type, "TEXT") + add_columns.append(f'ADD COLUMN "{col}" {sql_type}') + + alter_query = f'ALTER TABLE "{table_name}" {", ".join(add_columns)};' + conn.execute(text(alter_query)) conn.commit() From 2a409ba15757d07c379e285a5be7144135510cac Mon Sep 17 00:00:00 2001 From: chloebarre Date: Thu, 24 Jul 2025 13:11:21 +0200 Subject: [PATCH 4/4] enleve test --- back/scripts/workflow/data_warehouse.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/back/scripts/workflow/data_warehouse.py b/back/scripts/workflow/data_warehouse.py index a3931aa86..04f0a6762 100644 --- a/back/scripts/workflow/data_warehouse.py +++ b/back/scripts/workflow/data_warehouse.py @@ -23,14 +23,14 @@ def __init__(self, config: dict): self.chunksize = 10000 self.send_to_db = { - "collectivites_test": CommunitiesEnricher.get_output_path(config), - "marches_publics_test": MarchesPublicsEnricher.get_output_path(config), - "subventions_test": SubventionsEnricher.get_output_path(config), - "comptes_collectivites_test": FinancialEnricher.get_output_path(config), - "elus_test": ElectedOfficialsEnricher.get_output_path(config), - "declarations_interet_test": DeclaInteretWorkflow.get_output_path(config), - "communities_contacts_test": CommunitiesContact.get_output_path(config), - "bareme_test": BaremeEnricher.get_output_path(config), + "collectivites": CommunitiesEnricher.get_output_path(config), + "marches_publics": MarchesPublicsEnricher.get_output_path(config), + "subventions": SubventionsEnricher.get_output_path(config), + "comptes_collectivites": FinancialEnricher.get_output_path(config), + "elus": ElectedOfficialsEnricher.get_output_path(config), + "declarations_interet": DeclaInteretWorkflow.get_output_path(config), + "communities_contacts": CommunitiesContact.get_output_path(config), + "bareme": BaremeEnricher.get_output_path(config), } def run(self) -> None: