forked from alan-turing-institute/sqlsynthgen
-
Notifications
You must be signed in to change notification settings - Fork 1
Duckdb #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Duckdb #75
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
0fe4e31
Refactoring query construction
4d8e9c2
Removd _get_row_partition
0230da2
A bit more refactoring
6d26ee5
Fixed #73 Grouped generators query results overlap
c0125c5
FKs to concept table named, initial implementation
4275014
Many extra comments output with partitions
a0d4b16
Named column fetched from config.yaml
dd1e852
configure-tables allows the setting of naming columns
1377fc0
Experimental DuckDB output support
fae9305
Parquet output for dump-data
9dd4073
DuckDB with files df.py fixed
d70f39c
Fixed test_dump
a968e5d
dump-data tests now run with DuckDB and PostgreSQL
29db13a
precommit hooks pass
1896db0
A couple more tests for DuckDB
954962c
More parquet tests, some bugs fixed:
d8e40bf
Added --parquet-dir to make-tables
eb91e17
Fixed for opiates example
57350f7
Merge branch 'main' of github:SAFEHR-data/datafaker into duckdb
7277ccb
Updated DuckDB documentation
d2a17b1
Dockerfile needs build tools for DuckDB support
86839b0
Response to Stef's comments
4071ea0
Black reformat
1392372
A little comment
958bb95
Tests for parquet2orm
61a1546
Made parquet2orm tests a bit looser
56e71db
pre-commit clean
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,38 +1,186 @@ | ||
| """Data dumping functions.""" | ||
| import csv | ||
| import io | ||
| from typing import TYPE_CHECKING | ||
| from abc import ABC, abstractmethod | ||
| from pathlib import Path | ||
|
|
||
| import pandas as pd | ||
| import sqlalchemy | ||
| from sqlalchemy.schema import MetaData | ||
|
|
||
| from datafaker.utils import create_db_engine, get_sync_engine, logger | ||
|
|
||
| if TYPE_CHECKING: | ||
| from _csv import Writer | ||
|
|
||
|
|
||
| def _make_csv_writer(file: io.TextIOBase) -> "Writer": | ||
| """Make the standard CSV file writer.""" | ||
| return csv.writer(file, quoting=csv.QUOTE_MINIMAL) | ||
|
|
||
|
|
||
| def dump_db_tables( | ||
| metadata: MetaData, | ||
| dsn: str, | ||
| schema: str | None, | ||
| table_name: str, | ||
| file: io.TextIOBase, | ||
| ) -> None: | ||
| """Output the table as CSV.""" | ||
| if table_name not in metadata.tables: | ||
| logger.error("%s is not a table described in the ORM file", table_name) | ||
| return | ||
| table = metadata.tables[table_name] | ||
| csv_out = _make_csv_writer(file) | ||
| csv_out.writerow(table.columns.keys()) | ||
| engine = get_sync_engine(create_db_engine(dsn, schema_name=schema)) | ||
| with engine.connect() as connection: | ||
| result = connection.execute(sqlalchemy.select(table)) | ||
| for row in result: | ||
| csv_out.writerow(row) | ||
|
|
||
| class TableWriter(ABC): | ||
| """Writes a table out to a file.""" | ||
|
|
||
| EXTENSION = ".csv" | ||
|
|
||
| def __init__(self, metadata: MetaData, dsn: str, schema: str | None) -> None: | ||
| """ | ||
| Initialize the TableWriter. | ||
|
|
||
| :param metadata: The metadata for our database. | ||
| :param dsn: The connection string for our database. | ||
| :param schema: The schema name for our database, or None for the default. | ||
| """ | ||
| self._metadata = metadata | ||
| self._dsn = dsn | ||
| self._schema = schema | ||
|
|
||
| def connect(self) -> sqlalchemy.engine.Connection: | ||
| """Connect to the database.""" | ||
| engine = get_sync_engine(create_db_engine(self._dsn, schema_name=self._schema)) | ||
| return engine.connect() | ||
|
|
||
| @abstractmethod | ||
| def write_file(self, table: sqlalchemy.Table, filepath: Path) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param dir: The directory to write into. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
|
|
||
| def write(self, table: sqlalchemy.Table, directory: Path) -> bool: | ||
| """ | ||
| Write the table into a directory with a filename based on the table's name. | ||
|
|
||
| :param table: The table to write out. | ||
| :param directory: The directory to write the table into. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
| tn = table.name | ||
| # DuckDB tables derived from files have confusing suffixes | ||
| # that we should probably remove | ||
| tn = tn.removesuffix(".csv") | ||
| tn = tn.removesuffix(".parquet") | ||
| return self.write_file(table, directory / f"{tn}{self.EXTENSION}") | ||
|
|
||
|
|
||
| class ParquetTableWriter(TableWriter): | ||
| """Writes the table to a Parquet file.""" | ||
|
|
||
| EXTENSION = ".parquet" | ||
|
|
||
| def write_file(self, table: sqlalchemy.Table, filepath: Path) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param filename: The filename of the file to write to. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
| with self.connect() as connection: | ||
| dates = [ | ||
| str(name) | ||
| for name, col in table.columns.items() | ||
| if isinstance( | ||
| col.type, | ||
| ( | ||
| sqlalchemy.types.DATE, | ||
| sqlalchemy.types.DATETIME, | ||
| sqlalchemy.types.TIMESTAMP, | ||
| ), | ||
| ) | ||
| ] | ||
| df = pd.read_sql( | ||
| sql=f"SELECT * FROM {table.name}", | ||
| con=connection, | ||
| columns=[str(col.name) for col in table.columns.values()], | ||
| parse_dates=dates, | ||
| ) | ||
| df.to_parquet(filepath) | ||
| return True | ||
|
|
||
|
|
||
| class DuckDbParquetTableWriter(ParquetTableWriter): | ||
| """ | ||
| Writes the table to a Parquet file using DuckDB SQL. | ||
|
|
||
| The Pandas method used by ParquetTableWriter currently | ||
| does not work with DuckDB. | ||
| """ | ||
|
|
||
| def write_file(self, table: sqlalchemy.Table, filepath: Path) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param filename: The filename of the file to write to. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
| with self.connect() as connection: | ||
| result = connection.execute( | ||
| sqlalchemy.text( | ||
| # We need the double quotes to get DuckDB to read the table not the file. | ||
| f"COPY \"{table.name}\" TO '{filepath}' (FORMAT PARQUET)" | ||
| ) | ||
| ) | ||
| return result is not None | ||
|
|
||
|
|
||
| def get_parquet_table_writer( | ||
| metadata: MetaData, dsn: str, schema: str | None | ||
| ) -> TableWriter: | ||
| """ | ||
| Get a ``TableWriter`` that writes parquet files. | ||
|
|
||
| :param metadata: The database metadata containing the tables to be dumped to files. | ||
| :param dsn: The database connection string. | ||
| :param schema: The schema name, if required. | ||
| :return: ``TableWriter`` to write a parquet file. | ||
| """ | ||
| if dsn.startswith("duckdb:"): | ||
| return DuckDbParquetTableWriter(metadata, dsn, schema) | ||
| return ParquetTableWriter(metadata, dsn, schema) | ||
|
|
||
|
|
||
| class TableWriterIO(TableWriter): | ||
| """Writes the table to an output object.""" | ||
|
|
||
| @abstractmethod | ||
| def write_io(self, table: sqlalchemy.Table, out: io.TextIOBase) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param filename: The filename of the file to write to. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
|
|
||
| def write_file(self, table: sqlalchemy.Table, filepath: Path) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param filename: The filename of the file to write to. | ||
| :return: ``true`` on success, otherwise ``false``. | ||
| """ | ||
| with open(filepath, "wt", newline="", encoding="utf-8") as out: | ||
| return self.write_io(table, out) | ||
|
|
||
|
|
||
| class CsvTableWriter(TableWriterIO): | ||
| """Writes the table to a CSV file.""" | ||
|
|
||
| def write_io(self, table: sqlalchemy.Table, out: io.TextIOBase) -> bool: | ||
| """ | ||
| Write the named table into the named file. | ||
|
|
||
| :param table: The table to output | ||
| :param filename: The filename of the file to write to. | ||
| :return: ``True`` on success, otherwise ``False``. | ||
| """ | ||
| if table.name not in self._metadata.tables: | ||
| logger.error("%s is not a table described in the ORM file", table.name) | ||
| return False | ||
| table = self._metadata.tables[table.name] | ||
| csv_out = csv.writer(out, quoting=csv.QUOTE_MINIMAL) | ||
| csv_out.writerow(table.columns.keys()) | ||
| with self.connect() as connection: | ||
| result = connection.execute(sqlalchemy.select(table)) | ||
| for row in result: | ||
| csv_out.writerow(row) | ||
| return True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -291,9 +291,18 @@ def __init__( | |
| ) | ||
| self.buckets: Sequence[int] = [0] * 10 | ||
| for rb in raw_buckets: | ||
| if rb.b is not None: | ||
| bucket = min(9, max(0, int(rb.b) + 1)) | ||
| self.buckets[bucket] += rb.f / count | ||
| try: | ||
| x = float(rb.b) | ||
| if x.is_integer(): | ||
| bucket = min(9, max(0, int(x) + 1)) | ||
| self.buckets[bucket] += rb.f / count | ||
| except TypeError: | ||
| # We get a type error if there are no rows returned at all | ||
| # because rb.b is None in this case. | ||
| # We could just test for None explicitly, but this way | ||
| # catches errors if SQLAlchemy returns something that | ||
| # isn't a number for some other unknown reason. | ||
| pass | ||
|
Comment on lines
299
to
305
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooh when are we expecting this to happen, and if so do we want to log it? |
||
| self.mean = mean | ||
| self.stddev = stddev | ||
|
|
||
|
|
@@ -406,7 +415,7 @@ class ConstantGeneratorFactory(GeneratorFactory): | |
| """Just the null generator.""" | ||
|
|
||
| def get_generators( | ||
| self, columns: list[Column], engine: Engine | ||
| self, columns: list[Column], _engine: Engine | ||
| ) -> Sequence[Generator]: | ||
| """Get the generators appropriate for these columns.""" | ||
| if len(columns) != 1: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh this is fun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty nasty actually. But yes, fun that this hook exists!