From b9c0c724886e3e39306d44f3d7510fd5a4542c50 Mon Sep 17 00:00:00 2001 From: Ilyas Gasanov Date: Mon, 24 Feb 2025 13:33:41 +0300 Subject: [PATCH] [DOP-22133] Implement increment for transfers with file sources --- .env.docker | 8 + .env.local | 7 + docker-compose.test.yml | 49 +++- docs/changelog/next_release/209.feature.rst | 1 + poetry.lock | 259 +++++++++++++----- pyproject.toml | 2 + syncmaster/dto/transfers.py | 18 +- syncmaster/dto/transfers_strategy.py | 33 +++ syncmaster/schemas/v1/transfers/__init__.py | 14 +- syncmaster/schemas/v1/transfers/file/base.py | 3 + syncmaster/worker/controller.py | 51 +++- syncmaster/worker/handlers/base.py | 4 + syncmaster/worker/handlers/db/base.py | 5 + syncmaster/worker/handlers/db/clickhouse.py | 11 +- syncmaster/worker/handlers/file/base.py | 59 ---- syncmaster/worker/handlers/file/ftp.py | 4 +- syncmaster/worker/handlers/file/ftps.py | 4 +- syncmaster/worker/handlers/file/hdfs.py | 4 +- .../file/{protocol.py => local_df.py} | 17 +- syncmaster/worker/handlers/file/remote_df.py | 62 +++++ syncmaster/worker/handlers/file/s3.py | 4 +- syncmaster/worker/handlers/file/samba.py | 4 +- syncmaster/worker/handlers/file/sftp.py | 4 +- syncmaster/worker/handlers/file/webdav.py | 4 +- syncmaster/worker/settings/__init__.py | 2 + syncmaster/worker/settings/hwm_store.py | 36 +++ syncmaster/worker/transfer.py | 3 +- .../{file.csv => document.csv} | 0 .../{file.txt => document.txt} | 0 .../{large_file.csv => large_document.csv} | 0 .../{small_file.csv => small_document.csv} | 0 .../csv/for_file_filtering/unknown_name.csv | 1 - .../connection_fixtures/__init__.py | 6 + .../file_storage_fixtures.py | 5 + .../connection_fixtures/filters_fixtures.py | 2 +- .../connection_fixtures/strategy_fixtures.py | 24 ++ .../test_run_transfer/test_ftp.py | 117 +++++--- .../test_run_transfer/test_ftps.py | 106 ++++--- .../test_run_transfer/test_samba.py | 106 ++++--- .../test_run_transfer/test_sftp.py | 109 +++++--- .../test_run_transfer/test_webdav.py | 109 +++++--- .../test_transfers/test_create_transfer.py | 44 ++- .../test_create_transfer.py | 27 +- .../test_file_transfers/test_read_transfer.py | 4 +- .../test_update_transfer.py | 6 +- .../transfer_fixtures/transfers_fixture.py | 4 +- tests/utils.py | 75 +++++ 47 files changed, 1067 insertions(+), 350 deletions(-) create mode 100644 docs/changelog/next_release/209.feature.rst create mode 100644 syncmaster/dto/transfers_strategy.py rename syncmaster/worker/handlers/file/{protocol.py => local_df.py} (85%) create mode 100644 syncmaster/worker/handlers/file/remote_df.py create mode 100644 syncmaster/worker/settings/hwm_store.py rename tests/resources/file_connection/csv/for_file_filtering/{file.csv => document.csv} (100%) rename tests/resources/file_connection/csv/for_file_filtering/{file.txt => document.txt} (100%) rename tests/resources/file_connection/csv/for_file_filtering/{large_file.csv => large_document.csv} (100%) rename tests/resources/file_connection/csv/for_file_filtering/{small_file.csv => small_document.csv} (100%) create mode 100644 tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py diff --git a/.env.docker b/.env.docker index 03b6e33b..eda90682 100644 --- a/.env.docker +++ b/.env.docker @@ -38,6 +38,14 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string # Scheduler options SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200 +# HWM Store +SYNCMASTER__HWM_STORE__TYPE=horizon +SYNCMASTER__HWM_STORE__URL=http://horizon:8000 +SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace +SYNCMASTER__HWM_STORE__USER=admin +SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@! + +# Tests-only TEST_S3_HOST_FOR_CONFTEST=test-s3 TEST_S3_PORT_FOR_CONFTEST=9000 TEST_S3_HOST_FOR_WORKER=test-s3 diff --git a/.env.local b/.env.local index 563dff86..704bb8d0 100644 --- a/.env.local +++ b/.env.local @@ -38,6 +38,13 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string # Scheduler options export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200 +# HWM Store +export SYNCMASTER__HWM_STORE__TYPE=horizon +export SYNCMASTER__HWM_STORE__URL=http://localhost:8020 +export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace +export SYNCMASTER__HWM_STORE__USER=admin +export SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@! + # Tests-only export TEST_S3_HOST_FOR_CONFTEST=localhost export TEST_S3_PORT_FOR_CONFTEST=9010 diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 129a4078..d04459a2 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -129,7 +129,51 @@ services: condition: service_completed_successfully rabbitmq: condition: service_healthy - profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + profiles: [worker, scheduler, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + + horizon: + image: mtsrus/horizon-backend:develop + restart: unless-stopped + env_file: .env.docker + environment: + # list here usernames which should be assigned SUPERADMIN role on application start + HORIZON__ENTRYPOINT__ADMIN_USERS: admin + HORIZON__DATABASE__URL: postgresql+asyncpg://horizon:changeme@horizon-db:5432/horizon + HORIZON__AUTH__ACCESS_TOKEN__SECRET_KEY: generate_another_random_string + HORIZON__AUTH__PROVIDER: horizon.backend.providers.auth.dummy.DummyAuthProvider + HORIZON__SERVER__LOGGING__PRESET: colored + # PROMETHEUS_MULTIPROC_DIR is required for multiple workers, see: + # https://prometheus.github.io/client_python/multiprocess/ + PROMETHEUS_MULTIPROC_DIR: /tmp/prometheus-metrics + # tmpfs dir is cleaned up each container restart + tmpfs: + - /tmp/prometheus-metrics + ports: + - 8020:8000 + depends_on: + horizon-db: + condition: service_healthy + profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + + horizon-db: + image: postgres + restart: unless-stopped + environment: + TZ: UTC + POSTGRES_DB: horizon + POSTGRES_USER: horizon + POSTGRES_PASSWORD: changeme + ports: + - 5434:5432 + volumes: + - horizon_test_data:/var/lib/postgresql/data + healthcheck: + test: pg_isready + start_period: 5s + interval: 30s + timeout: 5s + retries: 3 + profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] test-postgres: image: postgres @@ -147,7 +191,7 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, webdav, all] + profiles: [s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] test-s3: image: bitnami/minio:latest @@ -366,4 +410,5 @@ services: volumes: postgres_test_data: rabbitmq_test_data: + horizon_test_data: keycloak_data: diff --git a/docs/changelog/next_release/209.feature.rst b/docs/changelog/next_release/209.feature.rst new file mode 100644 index 00000000..d18475c9 --- /dev/null +++ b/docs/changelog/next_release/209.feature.rst @@ -0,0 +1 @@ +Implement increment for transfers with file sources \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 68b82a41..04d699e2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -44,7 +44,7 @@ SQLAlchemy = ">=1.3.0" typing-extensions = ">=4" [package.extras] -tz = ["backports.zoneinfo", "tzdata"] +tz = ["backports.zoneinfo ; python_version < \"3.9\"", "tzdata"] [[package]] name = "amqp" @@ -93,7 +93,7 @@ typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} [package.extras] doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"] -test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\" and python_version < \"3.14\""] trio = ["trio (>=0.26.1)"] [[package]] @@ -120,7 +120,7 @@ mongodb = ["pymongo (>=3.0)"] redis = ["redis (>=3.0)"] rethinkdb = ["rethinkdb (>=2.4.0)"] sqlalchemy = ["sqlalchemy (>=1.4)"] -test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6", "anyio (>=4.5.2)", "gevent", "pytest", "pytz", "twisted"] +test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6 ; platform_python_implementation == \"CPython\" and python_version < \"3.14\"", "anyio (>=4.5.2)", "gevent ; python_version < \"3.14\"", "pytest", "pytz", "twisted ; python_version < \"3.14\""] tornado = ["tornado (>=4.3)"] twisted = ["twisted"] zookeeper = ["kazoo"] @@ -279,8 +279,24 @@ files = [ [package.extras] docs = ["Sphinx (>=8.1.3,<8.2.0)", "sphinx-rtd-theme (>=1.2.2)"] -gssauth = ["gssapi", "sspilib"] -test = ["distro (>=1.9.0,<1.10.0)", "flake8 (>=6.1,<7.0)", "flake8-pyi (>=24.1.0,<24.2.0)", "gssapi", "k5test", "mypy (>=1.8.0,<1.9.0)", "sspilib", "uvloop (>=0.15.3)"] +gssauth = ["gssapi ; platform_system != \"Windows\"", "sspilib ; platform_system == \"Windows\""] +test = ["distro (>=1.9.0,<1.10.0)", "flake8 (>=6.1,<7.0)", "flake8-pyi (>=24.1.0,<24.2.0)", "gssapi ; platform_system == \"Linux\"", "k5test ; platform_system == \"Linux\"", "mypy (>=1.8.0,<1.9.0)", "sspilib ; platform_system == \"Windows\"", "uvloop (>=0.15.3) ; platform_system != \"Windows\" and python_version < \"3.14.0\""] + +[[package]] +name = "authlib" +version = "1.4.1" +description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients." +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "Authlib-1.4.1-py2.py3-none-any.whl", hash = "sha256:edc29c3f6a3e72cd9e9f45fff67fc663a2c364022eb0371c003f22d5405915c1"}, + {file = "authlib-1.4.1.tar.gz", hash = "sha256:30ead9ea4993cdbab821dc6e01e818362f92da290c04c7f6a1940f86507a790d"}, +] + +[package.dependencies] +cryptography = "*" [[package]] name = "autodoc-pydantic" @@ -304,7 +320,7 @@ erdantic = ["erdantic (<2.0)"] linting = ["ruff (>=0.4.0,<0.5.0)"] security = ["pip-audit (>=2.7.2,<3.0.0)"] test = ["coverage (>=7,<8)", "defusedxml (>=0.7.1)", "pytest (>=8.0.0,<9.0.0)", "pytest-sugar (>=1.0.0,<2.0.0)"] -type-checking = ["mypy (>=1.9,<2.0)", "types-docutils (>=0.20,<0.21)", "typing-extensions (>=4.11,<5.0)"] +type-checking = ["mypy (>=1.9,<2.0)", "types-docutils (>=0.20,<0.21)", "typing-extensions (>=4.11,<5.0) ; python_version <= \"3.9\""] [[package]] name = "babel" @@ -319,7 +335,7 @@ files = [ ] [package.extras] -dev = ["backports.zoneinfo", "freezegun (>=1.0,<2.0)", "jinja2 (>=3.0)", "pytest (>=6.0)", "pytest-cov", "pytz", "setuptools", "tzdata"] +dev = ["backports.zoneinfo ; python_version < \"3.9\"", "freezegun (>=1.0,<2.0)", "jinja2 (>=3.0)", "pytest (>=6.0)", "pytest-cov", "pytz", "setuptools", "tzdata ; sys_platform == \"win32\""] [[package]] name = "bcrypt" @@ -482,32 +498,32 @@ vine = ">=5.1.0,<6.0" arangodb = ["pyArango (>=2.0.2)"] auth = ["cryptography (==42.0.5)"] azureblockblob = ["azure-storage-blob (>=12.15.0)"] -brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"] +brotli = ["brotli (>=1.0.0) ; platform_python_implementation == \"CPython\"", "brotlipy (>=0.7.0) ; platform_python_implementation == \"PyPy\""] cassandra = ["cassandra-driver (>=3.25.0,<4)"] consul = ["python-consul2 (==0.1.5)"] cosmosdbsql = ["pydocumentdb (==2.3.5)"] -couchbase = ["couchbase (>=3.0.0)"] +couchbase = ["couchbase (>=3.0.0) ; platform_python_implementation != \"PyPy\" and (platform_system != \"Windows\" or python_version < \"3.10\")"] couchdb = ["pycouchdb (==1.14.2)"] django = ["Django (>=2.2.28)"] dynamodb = ["boto3 (>=1.26.143)"] elasticsearch = ["elastic-transport (<=8.13.0)", "elasticsearch (<=8.13.0)"] -eventlet = ["eventlet (>=0.32.0)"] +eventlet = ["eventlet (>=0.32.0) ; python_version < \"3.10\""] gcs = ["google-cloud-storage (>=2.10.0)"] gevent = ["gevent (>=1.5.0)"] -librabbitmq = ["librabbitmq (>=2.0.0)"] -memcache = ["pylibmc (==1.6.3)"] +librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""] +memcache = ["pylibmc (==1.6.3) ; platform_system != \"Windows\""] mongodb = ["pymongo[srv] (>=4.0.2)"] msgpack = ["msgpack (==1.0.8)"] pymemcache = ["python-memcached (>=1.61)"] -pyro = ["pyro4 (==4.82)"] +pyro = ["pyro4 (==4.82) ; python_version < \"3.11\""] pytest = ["pytest-celery[all] (>=1.0.0)"] redis = ["redis (>=4.5.2,!=4.5.5,<6.0.0)"] s3 = ["boto3 (>=1.26.143)"] slmq = ["softlayer-messaging (>=1.0.3)"] -solar = ["ephem (==4.1.5)"] +solar = ["ephem (==4.1.5) ; platform_python_implementation != \"PyPy\""] sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] -sqs = ["boto3 (>=1.26.143)", "kombu[sqs] (>=5.3.4)", "pycurl (>=7.43.0.5)", "urllib3 (>=1.26.16)"] -tblib = ["tblib (>=1.3.0)", "tblib (>=1.5.0)"] +sqs = ["boto3 (>=1.26.143)", "kombu[sqs] (>=5.3.4)", "pycurl (>=7.43.0.5) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\"", "urllib3 (>=1.26.16)"] +tblib = ["tblib (>=1.3.0) ; python_version < \"3.8.0\"", "tblib (>=1.5.0) ; python_version >= \"3.8.0\""] yaml = ["PyYAML (>=3.10)"] zookeeper = ["kazoo (>=1.3.1)"] zstd = ["zstandard (==0.22.0)"] @@ -600,7 +616,7 @@ files = [ {file = "cffi-1.17.1-cp39-cp39-win_amd64.whl", hash = "sha256:d016c76bdd850f3c626af19b0542c9677ba156e4ee4fccfdd7848803533ef662"}, {file = "cffi-1.17.1.tar.gz", hash = "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824"}, ] -markers = {main = "extra == \"server\" and platform_python_implementation != \"PyPy\""} +markers = {main = "(extra == \"worker\" or extra == \"server\") and platform_python_implementation != \"PyPy\""} [package.dependencies] pycparser = "*" @@ -718,7 +734,7 @@ files = [ {file = "charset_normalizer-3.4.1-py3-none-any.whl", hash = "sha256:d98b1668f06378c6dbefec3b92299716b931cd4e6061f3c875a71ced1780ab85"}, {file = "charset_normalizer-3.4.1.tar.gz", hash = "sha256:44251f18cd68a75b56585dd00dae26183e102cd5e0f9f1466e6df5da2ed64ea3"}, ] -markers = {main = "extra == \"server\""} +markers = {main = "extra == \"server\" or extra == \"worker\""} [[package]] name = "click" @@ -802,7 +818,7 @@ files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] -markers = {main = "extra == \"server\" and platform_system == \"Windows\" or extra == \"worker\" and platform_system == \"Windows\" or extra == \"scheduler\" and platform_system == \"Windows\"", dev = "platform_system == \"Windows\"", docs = "platform_system == \"Windows\" or sys_platform == \"win32\"", test = "sys_platform == \"win32\" or platform_system == \"Windows\""} +markers = {main = "(extra == \"server\" or extra == \"worker\" or extra == \"scheduler\") and platform_system == \"Windows\"", dev = "platform_system == \"Windows\"", docs = "platform_system == \"Windows\" or sys_platform == \"win32\"", test = "sys_platform == \"win32\" or platform_system == \"Windows\""} [[package]] name = "coloredlogs" @@ -897,7 +913,7 @@ files = [ ] [package.extras] -toml = ["tomli"] +toml = ["tomli ; python_full_version <= \"3.11.0a6\""] [[package]] name = "cryptography" @@ -937,21 +953,48 @@ files = [ {file = "cryptography-44.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:62901fb618f74d7d81bf408c8719e9ec14d863086efe4185afd07c352aee1d2c"}, {file = "cryptography-44.0.0.tar.gz", hash = "sha256:cd4e834f340b4293430701e772ec543b0fbe6c2dea510a5286fe0acabe153a02"}, ] -markers = {main = "extra == \"server\""} +markers = {main = "extra == \"worker\" or extra == \"server\""} [package.dependencies] cffi = {version = ">=1.12", markers = "platform_python_implementation != \"PyPy\""} [package.extras] -docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=3.0.0)"] +docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=3.0.0) ; python_version >= \"3.8\""] docstest = ["pyenchant (>=3)", "readme-renderer (>=30.0)", "sphinxcontrib-spelling (>=7.3.1)"] -nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2)"] -pep8test = ["check-sdist", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"] +nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2) ; python_version >= \"3.8\""] +pep8test = ["check-sdist ; python_version >= \"3.8\"", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"] sdist = ["build (>=1.0.0)"] ssh = ["bcrypt (>=3.1.5)"] test = ["certifi (>=2024)", "cryptography-vectors (==44.0.0)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"] test-randomorder = ["pytest-randomly"] +[[package]] +name = "data-horizon" +version = "1.0.2" +description = "Horizon REST API + client" +optional = true +python-versions = "<4.0,>=3.7" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "data_horizon-1.0.2-py3-none-any.whl", hash = "sha256:17cb862b2d3f03856b4b8236a42ebb487d5aa969b5e8344bbecb5f87378b704e"}, + {file = "data_horizon-1.0.2.tar.gz", hash = "sha256:339bc291668ac2e511ce1b03f8b0852d4ea3bdbee190e9749c8164fb2eb18635"}, +] + +[package.dependencies] +authlib = {version = ">=1.2.1", optional = true, markers = "python_version >= \"3.8\" and extra == \"client-sync\""} +pydantic = {version = "<3", markers = "python_version >= \"3.8\""} +python-jose = {version = "*", extras = ["cryptography"]} +requests = {version = "*", optional = true, markers = "extra == \"client-sync\""} +typing-extensions = {version = ">=4.0.0", markers = "python_version >= \"3.8\""} +urllib3 = {version = ">=1.26.0", optional = true, markers = "extra == \"client-sync\""} + +[package.extras] +backend = ["alembic (>=1.12.1) ; python_version >= \"3.8\"", "alembic (>=1.12.1,<2.0.0) ; python_version < \"3.8\"", "asgi-correlation-id (>=4.2.0) ; python_version >= \"3.8\"", "asgi-correlation-id (>=4.2.0,<5.0.0) ; python_version < \"3.8\"", "coloredlogs", "devtools", "fastapi (>=0.103.0) ; python_version >= \"3.8\"", "fastapi (>=0.103.2,<0.104.0) ; python_version < \"3.8\"", "importlib-resources (>=5.12.0) ; python_version >= \"3.8\"", "importlib-resources (>=5.12.0,<6.0.0) ; python_version < \"3.8\"", "passlib", "pydantic-settings (>=2.0.3) ; python_version >= \"3.8\"", "pydantic-settings (>=2.0.3,<3.0.0) ; python_version < \"3.8\"", "python-json-logger", "python-multipart", "pyyaml", "sqlalchemy (>=2.0)", "sqlalchemy-utils", "starlette (>=0.27.0) ; python_version >= \"3.8\"", "starlette (>=0.27.0,<0.28.0) ; python_version < \"3.8\"", "starlette-exporter (>=0.17.1) ; python_version >= \"3.8\"", "starlette-exporter (>=0.17.1,<0.18.0) ; python_version < \"3.8\"", "uuid6 (>=2022.10.25) ; python_version >= \"3.8\"", "uuid6 (>=2022.10.25,<2023.0.0) ; python_version < \"3.8\"", "uvicorn (>=0.22.0) ; python_version >= \"3.8\"", "uvicorn (>=0.22.0,<0.23.0) ; python_version < \"3.8\""] +client-sync = ["authlib (>=1.2.1) ; python_version >= \"3.8\"", "authlib (>=1.2.1,<2.0.0) ; python_version < \"3.8\"", "requests", "urllib3 (>=1.26.0)"] +ldap = ["argon2-cffi", "bonsai (>=1.5.3)"] +postgres = ["asyncpg (>=0.28.0) ; python_version >= \"3.8\"", "asyncpg (>=0.28.0,<0.29.0) ; python_version < \"3.8\""] + [[package]] name = "deprecation" version = "2.1.0" @@ -1003,6 +1046,26 @@ files = [ {file = "docutils-0.21.2.tar.gz", hash = "sha256:3a6b18732edf182daa3cd12775bbb338cf5691468f91eeeb109deff6ebfa986f"}, ] +[[package]] +name = "ecdsa" +version = "0.19.0" +description = "ECDSA cryptographic signature library (pure python)" +optional = true +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.6" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "ecdsa-0.19.0-py2.py3-none-any.whl", hash = "sha256:2cea9b88407fdac7bbeca0833b189e4c9c53f2ef1e1eaa29f6224dbc809b707a"}, + {file = "ecdsa-0.19.0.tar.gz", hash = "sha256:60eaad1199659900dd0af521ed462b793bbdf867432b3948e87416ae4caf6bf8"}, +] + +[package.dependencies] +six = ">=1.9.0" + +[package.extras] +gmpy = ["gmpy"] +gmpy2 = ["gmpy2"] + [[package]] name = "etl-entities" version = "2.5.0" @@ -1089,7 +1152,7 @@ files = [ [package.extras] docs = ["furo (>=2024.8.6)", "sphinx (>=8.1.3)", "sphinx-autodoc-typehints (>=3)"] testing = ["covdefaults (>=2.3)", "coverage (>=7.6.10)", "diff-cover (>=9.2.1)", "pytest (>=8.3.4)", "pytest-asyncio (>=0.25.2)", "pytest-cov (>=6)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.28.1)"] -typing = ["typing-extensions (>=4.12.2)"] +typing = ["typing-extensions (>=4.12.2) ; python_version < \"3.11\""] [[package]] name = "flake8" @@ -1259,11 +1322,11 @@ greenlet = {version = ">=3.1.1", markers = "platform_python_implementation == \" "zope.interface" = "*" [package.extras] -dnspython = ["dnspython (>=1.16.0,<2.0)", "idna"] +dnspython = ["dnspython (>=1.16.0,<2.0) ; python_version < \"3.10\"", "idna ; python_version < \"3.10\""] docs = ["furo", "repoze.sphinx.autointerface", "sphinx", "sphinxcontrib-programoutput", "zope.schema"] -monitor = ["psutil (>=5.7.0)"] -recommended = ["cffi (>=1.17.1)", "dnspython (>=1.16.0,<2.0)", "idna", "psutil (>=5.7.0)"] -test = ["cffi (>=1.17.1)", "coverage (>=5.0)", "dnspython (>=1.16.0,<2.0)", "idna", "objgraph", "psutil (>=5.7.0)", "requests"] +monitor = ["psutil (>=5.7.0) ; sys_platform != \"win32\" or platform_python_implementation == \"CPython\""] +recommended = ["cffi (>=1.17.1) ; platform_python_implementation == \"CPython\"", "dnspython (>=1.16.0,<2.0) ; python_version < \"3.10\"", "idna ; python_version < \"3.10\"", "psutil (>=5.7.0) ; sys_platform != \"win32\" or platform_python_implementation == \"CPython\""] +test = ["cffi (>=1.17.1) ; platform_python_implementation == \"CPython\"", "coverage (>=5.0) ; sys_platform != \"win32\"", "dnspython (>=1.16.0,<2.0) ; python_version < \"3.10\"", "idna ; python_version < \"3.10\"", "objgraph", "psutil (>=5.7.0) ; sys_platform != \"win32\" or platform_python_implementation == \"CPython\"", "requests"] [[package]] name = "greenlet" @@ -1347,7 +1410,7 @@ files = [ {file = "greenlet-3.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:3319aa75e0e0639bc15ff54ca327e8dc7a6fe404003496e3c6925cd3142e0e22"}, {file = "greenlet-3.1.1.tar.gz", hash = "sha256:4ce3ac6cdb6adf7946475d7ef31777c26d94bccc377e070a7986bd2d5c515467"}, ] -markers = {main = "extra == \"server\" and python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\") or extra == \"worker\" and python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\") or extra == \"scheduler\" and python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")", dev = "python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")", test = "platform_python_implementation == \"CPython\""} +markers = {main = "python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\") and (extra == \"server\" or extra == \"worker\" or extra == \"scheduler\")", dev = "python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")", test = "platform_python_implementation == \"CPython\""} [package.extras] docs = ["Sphinx", "furo"] @@ -1386,6 +1449,23 @@ avro = ["fastavro (>=0.21.19)"] dataframe = ["fastavro (>=0.21.19)", "pandas (>=0.14.1)"] kerberos = ["requests-kerberos (>=0.7.0)"] +[[package]] +name = "horizon-hwm-store" +version = "1.1.0" +description = "onETL Plugin for Horizon store" +optional = true +python-versions = ">=3.7" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "horizon_hwm_store-1.1.0-py3-none-any.whl", hash = "sha256:6a61cb7a8a6b2672ff50b64e1ce32d17345277b59c6b12cf520b81ecd439cb22"}, + {file = "horizon_hwm_store-1.1.0.tar.gz", hash = "sha256:ff82a4c1135637b563f9c7bca79b24e9ddc96519c8de04f3ff6d7ffd853586ea"}, +] + +[package.dependencies] +data-horizon = {version = ">=1.0.0,<1.1", extras = ["client-sync"]} +etl-entities = ">=2.1.0,<2.6.0" + [[package]] name = "httpcore" version = "1.0.7" @@ -1428,7 +1508,7 @@ idna = "*" sniffio = "*" [package.extras] -brotli = ["brotli", "brotlicffi"] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] @@ -1525,12 +1605,12 @@ markers = {main = "extra == \"worker\""} zipp = ">=3.20" [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] enabler = ["pytest-enabler (>=2.2)"] perf = ["ipython"] -test = ["flufl.flake8", "importlib_resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] type = ["pytest-mypy"] [[package]] @@ -1583,11 +1663,11 @@ description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" groups = ["main", "docs"] +markers = "extra == \"server\" or extra == \"worker\"" files = [ {file = "jinja2-3.1.5-py3-none-any.whl", hash = "sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb"}, {file = "jinja2-3.1.5.tar.gz", hash = "sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\""} [package.dependencies] MarkupSafe = ">=2.0" @@ -1635,7 +1715,7 @@ azureservicebus = ["azure-servicebus (>=7.10.0)"] azurestoragequeues = ["azure-identity (>=1.12.0)", "azure-storage-queue (>=12.6.0)"] confluentkafka = ["confluent-kafka (>=2.2.0)"] consul = ["python-consul2 (==0.1.5)"] -librabbitmq = ["librabbitmq (>=2.0.0)"] +librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""] mongodb = ["pymongo (>=4.1.1)"] msgpack = ["msgpack (==1.1.0)"] pyro = ["pyro4 (==4.82)"] @@ -1643,7 +1723,7 @@ qpid = ["qpid-python (>=0.26)", "qpid-tools (>=0.26)"] redis = ["redis (>=4.5.2,!=4.5.5,!=5.0.2)"] slmq = ["softlayer-messaging (>=1.0.3)"] sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] -sqs = ["boto3 (>=1.26.143)", "pycurl (>=7.43.0.5)", "urllib3 (>=1.26.16)"] +sqs = ["boto3 (>=1.26.143)", "pycurl (>=7.43.0.5) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\"", "urllib3 (>=1.26.16)"] yaml = ["PyYAML (>=3.10)"] zookeeper = ["kazoo (>=2.8.0)"] @@ -1830,6 +1910,7 @@ description = "Safely add untrusted strings to HTML/XML markup." optional = false python-versions = ">=3.9" groups = ["main", "docs"] +markers = "extra == \"server\" or extra == \"worker\"" files = [ {file = "MarkupSafe-3.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7e94c425039cde14257288fd61dcfb01963e658efbc0ff54f5306b06054700f8"}, {file = "MarkupSafe-3.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9e2d922824181480953426608b81967de705c3cef4d1af983af849d7bd619158"}, @@ -1893,7 +1974,6 @@ files = [ {file = "MarkupSafe-3.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:6e296a513ca3d94054c2c881cc913116e90fd030ad1c656b3869762b754f5f8a"}, {file = "markupsafe-3.0.2.tar.gz", hash = "sha256:ee55d3edf80167e48ea11a923c7386f4669df67d7994554387f84e7d8b0a2bf0"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\""} [[package]] name = "mccabe" @@ -2085,7 +2165,7 @@ sphinx = ">=6" tabulate = ">=0.8.10" [package.extras] -developer = ["pre-commit (>=3.3)", "tomli"] +developer = ["pre-commit (>=3.3)", "tomli ; python_version < \"3.11\""] doc = ["intersphinx-registry", "matplotlib (>=3.5)", "numpy (>=1.22)", "pydata-sphinx-theme (>=0.13.3)", "sphinx (>=7)"] test = ["matplotlib", "pytest", "pytest-cov"] @@ -2201,8 +2281,8 @@ cryptography = ">=3.3" pynacl = ">=1.5" [package.extras] -all = ["gssapi (>=1.4.1)", "invoke (>=2.0)", "pyasn1 (>=0.1.7)", "pywin32 (>=2.1.8)"] -gssapi = ["gssapi (>=1.4.1)", "pyasn1 (>=0.1.7)", "pywin32 (>=2.1.8)"] +all = ["gssapi (>=1.4.1) ; platform_system != \"Windows\"", "invoke (>=2.0)", "pyasn1 (>=0.1.7)", "pywin32 (>=2.1.8) ; platform_system == \"Windows\""] +gssapi = ["gssapi (>=1.4.1) ; platform_system != \"Windows\"", "pyasn1 (>=0.1.7)", "pywin32 (>=2.1.8) ; platform_system == \"Windows\""] invoke = ["invoke (>=2.0)"] [[package]] @@ -2353,8 +2433,8 @@ typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""} tzdata = {version = "*", markers = "sys_platform == \"win32\""} [package.extras] -binary = ["psycopg-binary (==3.2.5)"] -c = ["psycopg-c (==3.2.5)"] +binary = ["psycopg-binary (==3.2.5) ; implementation_name != \"pypy\""] +c = ["psycopg-c (==3.2.5) ; implementation_name != \"pypy\""] dev = ["ast-comments (>=1.1.2)", "black (>=24.1.0)", "codespell (>=2.2)", "dnspython (>=2.1)", "flake8 (>=4.0)", "isort-psycopg", "isort[colors] (>=6.0)", "mypy (>=1.14)", "pre-commit (>=4.0.1)", "types-setuptools (>=57.4)", "wheel (>=0.37)"] docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"] pool = ["psycopg-pool"] @@ -2367,7 +2447,7 @@ description = "PostgreSQL database adapter for Python -- C optimisation distribu optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"server\" and implementation_name != \"pypy\" or extra == \"worker\" and implementation_name != \"pypy\"" +markers = "(extra == \"server\" or extra == \"worker\") and implementation_name != \"pypy\"" files = [ {file = "psycopg_binary-3.2.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a82211a43372cba9b1555a110e84e679deec2dc9463ae4c736977dad99dca5ed"}, {file = "psycopg_binary-3.2.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e7d215a43343d91ba08301865f059d9518818d66a222a85fb425e4156716f5a6"}, @@ -2455,11 +2535,12 @@ version = "0.6.1" description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["main", "test"] files = [ {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, ] +markers = {main = "extra == \"worker\""} [[package]] name = "pycodestyle" @@ -2484,7 +2565,7 @@ files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, ] -markers = {main = "extra == \"server\" and platform_python_implementation != \"PyPy\""} +markers = {main = "(extra == \"worker\" or extra == \"server\") and platform_python_implementation != \"PyPy\""} [[package]] name = "pycryptodome" @@ -2547,7 +2628,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata"] +timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] [[package]] name = "pydantic-core" @@ -2669,11 +2750,11 @@ description = "Settings management using Pydantic" optional = false python-versions = ">=3.8" groups = ["main", "docs"] +markers = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\"" files = [ {file = "pydantic_settings-2.8.0-py3-none-any.whl", hash = "sha256:c782c7dc3fb40e97b238e713c25d26f64314aece2e91abcff592fcac15f71820"}, {file = "pydantic_settings-2.8.0.tar.gz", hash = "sha256:88e2ca28f6e68ea102c99c3c401d6c9078e68a5df600e97b43891c34e089500a"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\""} [package.dependencies] pydantic = ">=2.7.0" @@ -2764,7 +2845,7 @@ description = "A python implementation of GNU readline." optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"server\" and sys_platform == \"win32\" or extra == \"worker\" and sys_platform == \"win32\" or extra == \"scheduler\" and sys_platform == \"win32\"" +markers = "sys_platform == \"win32\" and (extra == \"server\" or extra == \"worker\" or extra == \"scheduler\")" files = [ {file = "pyreadline3-3.5.4-py3-none-any.whl", hash = "sha256:eaf8e6cc3c49bcccf145fc6067ba8643d1df34d604a1ec0eccbf7a18e6d3fae6"}, {file = "pyreadline3-3.5.4.tar.gz", hash = "sha256:8d57d53039a1c75adba8e50dd3d992b28143480816187ea5efbd5c78e6c885b7"}, @@ -2936,15 +3017,39 @@ description = "Read key-value pairs from a .env file and set them as environment optional = false python-versions = ">=3.8" groups = ["main", "docs"] +markers = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\"" files = [ {file = "python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca"}, {file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\""} [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-jose" +version = "3.3.0" +description = "JOSE implementation in Python" +optional = true +python-versions = "*" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "python-jose-3.3.0.tar.gz", hash = "sha256:55779b5e6ad599c6336191246e95eb2293a9ddebd555f796a65f838f07e5d78a"}, + {file = "python_jose-3.3.0-py2.py3-none-any.whl", hash = "sha256:9b1376b023f8b298536eedd47ae1089bcdb848f1535ab30555cd92002d78923a"}, +] + +[package.dependencies] +cryptography = {version = ">=3.4.0", optional = true, markers = "extra == \"cryptography\""} +ecdsa = "!=0.15" +pyasn1 = "*" +rsa = "*" + +[package.extras] +cryptography = ["cryptography (>=3.4.0)"] +pycrypto = ["pyasn1", "pycrypto (>=2.6.0,<2.7.0)"] +pycryptodome = ["pyasn1", "pycryptodome (>=3.3.1,<4.0.0)"] + [[package]] name = "python-json-logger" version = "3.2.1" @@ -2959,7 +3064,7 @@ files = [ ] [package.extras] -dev = ["backports.zoneinfo", "black", "build", "freezegun", "mdx_truly_sane_lists", "mike", "mkdocs", "mkdocs-awesome-pages-plugin", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-material (>=8.5)", "mkdocstrings[python]", "msgspec", "msgspec-python313-pre", "mypy", "orjson", "pylint", "pytest", "tzdata", "validate-pyproject[all]"] +dev = ["backports.zoneinfo ; python_version < \"3.9\"", "black", "build", "freezegun", "mdx_truly_sane_lists", "mike", "mkdocs", "mkdocs-awesome-pages-plugin", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-material (>=8.5)", "mkdocstrings[python]", "msgspec ; implementation_name != \"pypy\" and python_version < \"3.13\"", "msgspec-python313-pre ; implementation_name != \"pypy\" and python_version == \"3.13\"", "mypy", "orjson ; implementation_name != \"pypy\"", "pylint", "pytest", "tzdata", "validate-pyproject[all]"] [[package]] name = "python-keycloak" @@ -3058,7 +3163,7 @@ files = [ {file = "PyYAML-6.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:39693e1f8320ae4f43943590b49779ffb98acb81f788220ea932a6b6c51004d8"}, {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\""} +markers = {main = "extra == \"worker\" or extra == \"server\" or extra == \"scheduler\""} [[package]] name = "requests" @@ -3071,7 +3176,7 @@ files = [ {file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"}, {file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"}, ] -markers = {main = "extra == \"server\""} +markers = {main = "extra == \"server\" or extra == \"worker\""} [package.dependencies] certifi = ">=2017.4.17" @@ -3117,7 +3222,23 @@ requests = ">=2.30.0,<3.0" urllib3 = ">=1.25.10,<3.0" [package.extras] -tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"] +tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli ; python_version < \"3.11\"", "tomli-w", "types-PyYAML", "types-requests"] + +[[package]] +name = "rsa" +version = "4.9" +description = "Pure-Python RSA implementation" +optional = true +python-versions = ">=3.6,<4" +groups = ["main"] +markers = "extra == \"worker\"" +files = [ + {file = "rsa-4.9-py3-none-any.whl", hash = "sha256:90260d9058e514786967344d0ef75fa8727eed8a7d2e43ce9f4bcf1b536174f7"}, + {file = "rsa-4.9.tar.gz", hash = "sha256:e38464a49c6c85d7f1351b0126661487a7e0a14a50f1675ec50eb34d4f20ef21"}, +] + +[package.dependencies] +pyasn1 = ">=0.1.3" [[package]] name = "setuptools" @@ -3132,13 +3253,13 @@ files = [ ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"] -core = ["importlib_metadata (>=6)", "jaraco.collections", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] +core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.collections", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] enabler = ["pytest-enabler (>=2.2)"] -test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] -type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] [[package]] name = "six" @@ -3602,8 +3723,8 @@ intervals = ["intervals (>=0.7.1)"] password = ["passlib (>=1.6,<2.0)"] pendulum = ["pendulum (>=2.0.5)"] phone = ["phonenumbers (>=5.9.2)"] -test = ["Jinja2 (>=2.3)", "Pygments (>=1.2)", "backports.zoneinfo", "docutils (>=0.10)", "flake8 (>=2.4.0)", "flexmock (>=0.9.7)", "isort (>=4.2.2)", "pg8000 (>=1.12.4)", "psycopg (>=3.1.8)", "psycopg2 (>=2.5.1)", "psycopg2cffi (>=2.8.1)", "pymysql", "pyodbc", "pytest (==7.4.4)", "python-dateutil (>=2.6)", "pytz (>=2014.2)"] -test-all = ["Babel (>=1.3)", "Jinja2 (>=2.3)", "Pygments (>=1.2)", "arrow (>=0.3.4)", "backports.zoneinfo", "colour (>=0.0.4)", "cryptography (>=0.6)", "docutils (>=0.10)", "flake8 (>=2.4.0)", "flexmock (>=0.9.7)", "furl (>=0.4.1)", "intervals (>=0.7.1)", "isort (>=4.2.2)", "passlib (>=1.6,<2.0)", "pendulum (>=2.0.5)", "pg8000 (>=1.12.4)", "phonenumbers (>=5.9.2)", "psycopg (>=3.1.8)", "psycopg2 (>=2.5.1)", "psycopg2cffi (>=2.8.1)", "pymysql", "pyodbc", "pytest (==7.4.4)", "python-dateutil", "python-dateutil (>=2.6)", "pytz (>=2014.2)"] +test = ["Jinja2 (>=2.3)", "Pygments (>=1.2)", "backports.zoneinfo ; python_version < \"3.9\"", "docutils (>=0.10)", "flake8 (>=2.4.0)", "flexmock (>=0.9.7)", "isort (>=4.2.2)", "pg8000 (>=1.12.4)", "psycopg (>=3.1.8)", "psycopg2 (>=2.5.1)", "psycopg2cffi (>=2.8.1)", "pymysql", "pyodbc", "pytest (==7.4.4)", "python-dateutil (>=2.6)", "pytz (>=2014.2)"] +test-all = ["Babel (>=1.3)", "Jinja2 (>=2.3)", "Pygments (>=1.2)", "arrow (>=0.3.4)", "backports.zoneinfo ; python_version < \"3.9\"", "colour (>=0.0.4)", "cryptography (>=0.6)", "docutils (>=0.10)", "flake8 (>=2.4.0)", "flexmock (>=0.9.7)", "furl (>=0.4.1)", "intervals (>=0.7.1)", "isort (>=4.2.2)", "passlib (>=1.6,<2.0)", "pendulum (>=2.0.5)", "pg8000 (>=1.12.4)", "phonenumbers (>=5.9.2)", "psycopg (>=3.1.8)", "psycopg2 (>=2.5.1)", "psycopg2cffi (>=2.8.1)", "pymysql", "pyodbc", "pytest (==7.4.4)", "python-dateutil", "python-dateutil (>=2.6)", "pytz (>=2014.2)"] timezone = ["python-dateutil"] url = ["furl (>=0.4.1)"] @@ -3735,7 +3856,7 @@ files = [ {file = "tzdata-2025.1-py2.py3-none-any.whl", hash = "sha256:7e127113816800496f027041c570f50bcd464a020098a3b6b199517772303639"}, {file = "tzdata-2025.1.tar.gz", hash = "sha256:24894909e88cdb28bd1636c6887801df64cb485bd593f2fd83ef29075a81d694"}, ] -markers = {main = "extra == \"server\" or extra == \"worker\" or extra == \"scheduler\""} +markers = {main = "extra == \"scheduler\" or extra == \"server\" or extra == \"worker\""} [[package]] name = "tzlocal" @@ -3767,10 +3888,10 @@ files = [ {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, ] -markers = {main = "extra == \"server\""} +markers = {main = "extra == \"server\" or extra == \"worker\""} [package.extras] -brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] @@ -3806,7 +3927,7 @@ click = ">=7.0" h11 = ">=0.8" [package.extras] -standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] +standard = ["colorama (>=0.4) ; sys_platform == \"win32\"", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1) ; sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\"", "watchfiles (>=0.13)", "websockets (>=10.4)"] [[package]] name = "vine" @@ -3840,7 +3961,7 @@ platformdirs = ">=3.9.1,<5" [package.extras] docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] -test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"] +test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8) ; platform_python_implementation == \"PyPy\" or platform_python_implementation == \"CPython\" and sys_platform == \"win32\" and python_version >= \"3.13\"", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10) ; platform_python_implementation == \"CPython\""] [[package]] name = "wcwidth" @@ -3885,11 +4006,11 @@ files = [ markers = {main = "extra == \"worker\""} [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] enabler = ["pytest-enabler (>=2.2)"] -test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +test = ["big-O", "importlib-resources ; python_version < \"3.9\"", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] type = ["pytest-mypy"] [[package]] @@ -3969,9 +4090,9 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [extras] scheduler = ["apscheduler", "asyncpg", "celery", "coloredlogs", "pydantic-settings", "python-json-logger", "pyyaml", "sqlalchemy", "sqlalchemy-utils"] server = ["alembic", "asgi-correlation-id", "asyncpg", "celery", "coloredlogs", "fastapi", "itsdangerous", "jinja2", "psycopg", "pydantic-settings", "pyjwt", "python-json-logger", "python-keycloak", "python-multipart", "pyyaml", "sqlalchemy", "sqlalchemy-utils", "starlette-exporter", "uuid6", "uvicorn"] -worker = ["asgi-correlation-id", "celery", "coloredlogs", "jinja2", "onetl", "psycopg", "pydantic-settings", "python-json-logger", "pyyaml", "sqlalchemy", "sqlalchemy-utils", "uuid6"] +worker = ["asgi-correlation-id", "celery", "coloredlogs", "horizon-hwm-store", "jinja2", "onetl", "psycopg", "pydantic-settings", "python-json-logger", "pyyaml", "sqlalchemy", "sqlalchemy-utils", "uuid6"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "264fff2f44fdf31849c06d28353464a54a8dc305a04a3f2f90b8a6fc2792dd71" +content-hash = "49fad46823b2a1875af497d0945398371a029a14352757164946d9295c4d7373" diff --git a/pyproject.toml b/pyproject.toml index de8ba437..fcff321f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ apscheduler = { version = "^3.10.4", optional = true } starlette-exporter = {version = "^0.23.0", optional = true} itsdangerous = {version = "*", optional = true} python-keycloak = {version = ">=4.7,<6.0", optional = true} +horizon-hwm-store = {version = ">=1.1.0", optional = true } [tool.poetry.extras] server = [ @@ -109,6 +110,7 @@ worker = [ "coloredlogs", "python-json-logger", "pyyaml", + "horizon-hwm-store", ] scheduler = [ diff --git a/syncmaster/dto/transfers.py b/syncmaster/dto/transfers.py index 5652806f..849c1cd4 100644 --- a/syncmaster/dto/transfers.py +++ b/syncmaster/dto/transfers.py @@ -6,6 +6,8 @@ from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet +from syncmaster.dto.transfers_strategy import FullStrategy, IncrementalStrategy + @dataclass class TransferDTO: @@ -14,14 +16,24 @@ class TransferDTO: @dataclass class DBTransferDTO(TransferDTO): + id: int table_name: str + strategy: FullStrategy | IncrementalStrategy transformations: list[dict] | None = None + options: dict | None = None + + def __post_init__(self): + if self.options is None: + self.options = {} + self.options.setdefault("if_exists", "replace_entire_table") @dataclass class FileTransferDTO(TransferDTO): + id: int directory_path: str file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet + strategy: FullStrategy | IncrementalStrategy options: dict file_name_template: str | None = None df_schema: dict | None = None @@ -43,7 +55,7 @@ def __post_init__(self): if isinstance(self.df_schema, str): self.df_schema = json.loads(self.df_schema) - self.options.setdefault("if_exists", "replace_entire_directory") # TODO: use "append" for incremental strategy + self.options.setdefault("if_exists", "replace_overlapping_partitions") def _get_file_format(self, file_format: dict) -> CSV | JSONLine | JSON | Excel | XML | ORC | Parquet: file_type = file_format.pop("type", None) @@ -87,6 +99,10 @@ class MySQLTransferDTO(DBTransferDTO): class HiveTransferDTO(DBTransferDTO): type: ClassVar[str] = "hive" + def __post_init__(self): + super().__post_init__() + self.options.setdefault("if_exists", "replace_overlapping_partitions") + @dataclass class S3TransferDTO(FileTransferDTO): diff --git a/syncmaster/dto/transfers_strategy.py b/syncmaster/dto/transfers_strategy.py new file mode 100644 index 00000000..39e202d8 --- /dev/null +++ b/syncmaster/dto/transfers_strategy.py @@ -0,0 +1,33 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class Strategy: + type: str + + @classmethod + def from_dict(cls, data: dict) -> Strategy: + strategy_classes = { + "full": FullStrategy, + "incremental": IncrementalStrategy, + } + + strategy_type = data.get("type") + if strategy_type not in strategy_classes: + raise ValueError(f"Unknown strategy type: {strategy_type}") + + return strategy_classes[strategy_type](**data) + + +@dataclass +class FullStrategy(Strategy): + pass + + +@dataclass +class IncrementalStrategy(Strategy): + increment_by: str diff --git a/syncmaster/schemas/v1/transfers/__init__.py b/syncmaster/schemas/v1/transfers/__init__.py index 5675539a..bb4b29d8 100644 --- a/syncmaster/schemas/v1/transfers/__init__.py +++ b/syncmaster/schemas/v1/transfers/__init__.py @@ -255,8 +255,18 @@ def validate_increment_by(cls, values): source_type = values.source_params.type increment_by = values.strategy_params.increment_by - if source_type in FILE_CONNECTION_TYPES and increment_by != "modified_since": - raise ValueError("Field 'increment_by' must be equal to 'modified_since' for file source types") + if source_type in FILE_CONNECTION_TYPES and increment_by not in ("file_modified_since", "file_name"): + raise ValueError( + "Field 'increment_by' must be equal to 'file_modified_since' or 'file_name' for file source types", + ) + + return values + + @model_validator(mode="after") + def validate_strategy(cls, values): + + if values.source_params.type in ("s3", "hdfs") and isinstance(values.strategy_params, IncrementalStrategy): + raise ValueError("S3 and HDFS sources do not support incremental strategy for now") return values diff --git a/syncmaster/schemas/v1/transfers/file/base.py b/syncmaster/schemas/v1/transfers/file/base.py index e3a47212..b9c3641c 100644 --- a/syncmaster/schemas/v1/transfers/file/base.py +++ b/syncmaster/schemas/v1/transfers/file/base.py @@ -94,6 +94,9 @@ def validate_file_name_template(cls, value: str) -> str: if missing_keys: raise ValueError(f"Missing required placeholders: {', '.join(missing_keys)}") + if "{run_id}" not in value and "{run_created_at}" not in value: + raise ValueError("At least one of placeholders must be present: {run_id} or {run_created_at}") + try: value.format(index="", extension="", run_created_at="", run_id="") except KeyError as e: diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index 9eea3110..f5c0e553 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -4,6 +4,10 @@ from tempfile import TemporaryDirectory from typing import Any +from horizon.client.auth import LoginPassword +from horizon_hwm_store import HorizonHWMStore +from onetl.strategy import IncrementalStrategy + from syncmaster.db.models import Connection, Run from syncmaster.dto.connections import ( ClickhouseConnectionDTO, @@ -36,6 +40,7 @@ SFTPTransferDTO, WebDAVTransferDTO, ) +from syncmaster.dto.transfers_strategy import Strategy from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError from syncmaster.worker.handlers.base import Handler from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler @@ -139,11 +144,13 @@ class TransferController: + settings: WorkerAppSettings source_handler: Handler target_handler: Handler def __init__( self, + settings: WorkerAppSettings, run: Run, source_connection: Connection, source_auth_data: dict, @@ -152,11 +159,14 @@ def __init__( ): self.temp_dir = TemporaryDirectory(prefix=f"syncmaster_{run.id}_") + self.settings = settings self.run = run self.source_handler = self.get_handler( connection_data=source_connection.data, run_data={"id": run.id, "created_at": run.created_at}, + transfer_id=run.transfer.id, transfer_params=run.transfer.source_params, + strategy_params=run.transfer.strategy_params, transformations=run.transfer.transformations, connection_auth_data=source_auth_data, temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"), @@ -164,15 +174,17 @@ def __init__( self.target_handler = self.get_handler( connection_data=target_connection.data, run_data={"id": run.id, "created_at": run.created_at}, + transfer_id=run.transfer.id, transfer_params=run.transfer.target_params, + strategy_params=run.transfer.strategy_params, transformations=run.transfer.transformations, connection_auth_data=target_auth_data, temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"), ) - def perform_transfer(self, settings: WorkerAppSettings) -> None: + def perform_transfer(self) -> None: try: - spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION( + spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION( run=self.run, source=self.source_handler.connection_dto, target=self.target_handler.connection_dto, @@ -182,6 +194,9 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None: self.source_handler.connect(spark) self.target_handler.connect(spark) + if self.source_handler.transfer_dto.strategy.type == "incremental": + return self._perform_incremental_transfer() + df = self.source_handler.read() self.target_handler.write(df) finally: @@ -192,7 +207,9 @@ def get_handler( connection_data: dict[str, Any], connection_auth_data: dict, run_data: dict[str, Any], + transfer_id: int, transfer_params: dict[str, Any], + strategy_params: dict[str, Any], transformations: list[dict], temp_dir: TemporaryDirectory, ) -> Handler: @@ -207,7 +224,35 @@ def get_handler( return handler( connection_dto=connection_dto(**connection_data), - transfer_dto=transfer_dto(**transfer_params, transformations=transformations), + transfer_dto=transfer_dto( + id=transfer_id, + strategy=Strategy.from_dict(strategy_params), + transformations=transformations, + **transfer_params, + ), run_dto=run_dto(**run_data), temp_dir=temp_dir, ) + + def _perform_incremental_transfer(self) -> None: + with HorizonHWMStore( + api_url=self.settings.hwm_store.url, + auth=LoginPassword(login=self.settings.hwm_store.user, password=self.settings.hwm_store.password), + namespace=self.settings.hwm_store.namespace, + ).force_create_namespace() as hwm_store: + + with IncrementalStrategy(): + hwm_name = "_".join( + [ + str(self.source_handler.transfer_dto.id), + self.source_handler.connection_dto.type, + self.source_handler.transfer_dto.directory_path, + ], + ) + hwm = hwm_store.get_hwm(hwm_name) + + self.source_handler.hwm = hwm + self.target_handler.hwm = hwm + + df = self.source_handler.read() + self.target_handler.write(df) diff --git a/syncmaster/worker/handlers/base.py b/syncmaster/worker/handlers/base.py index e1012518..9868ac8a 100644 --- a/syncmaster/worker/handlers/base.py +++ b/syncmaster/worker/handlers/base.py @@ -7,6 +7,8 @@ from tempfile import TemporaryDirectory from typing import TYPE_CHECKING +from etl_entities.hwm import HWM + from syncmaster.dto.connections import ConnectionDTO from syncmaster.dto.runs import RunDTO from syncmaster.dto.transfers import TransferDTO @@ -17,6 +19,8 @@ class Handler(ABC): + hwm: HWM | None = None + def __init__( self, connection_dto: ConnectionDTO, diff --git a/syncmaster/worker/handlers/db/base.py b/syncmaster/worker/handlers/db/base.py index 4a713450..f54a261a 100644 --- a/syncmaster/worker/handlers/db/base.py +++ b/syncmaster/worker/handlers/db/base.py @@ -19,6 +19,7 @@ class DBHandler(Handler): connection: BaseDBConnection transfer_dto: DBTransferDTO + _operators = { "is_null": "IS NULL", "is_not_null": "IS NOT NULL", @@ -44,9 +45,13 @@ def read(self) -> DataFrame: return reader.run() def write(self, df: DataFrame) -> None: + if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value: + self.transfer_dto.options["if_exists"] = "append" + writer = DBWriter( connection=self.connection, table=self.transfer_dto.table_name, + options=self.transfer_dto.options, ) return writer.run(df=self._normalize_column_names(df)) diff --git a/syncmaster/worker/handlers/db/clickhouse.py b/syncmaster/worker/handlers/db/clickhouse.py index 01d23c6b..bfb92bf5 100644 --- a/syncmaster/worker/handlers/db/clickhouse.py +++ b/syncmaster/worker/handlers/db/clickhouse.py @@ -48,14 +48,15 @@ def write(self, df: DataFrame) -> None: ) quoted_sort_column = f'"{sort_column}"' + self.transfer_dto.options["createTableOptions"] = f"ENGINE = MergeTree() ORDER BY {quoted_sort_column}" + + if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value: + self.transfer_dto.options["if_exists"] = "append" + writer = DBWriter( connection=self.connection, table=self.transfer_dto.table_name, - options=( - Clickhouse.WriteOptions(createTableOptions=f"ENGINE = MergeTree() ORDER BY {quoted_sort_column}") - if self.transfer_dto.type == "clickhouse" - else None - ), + options=self.transfer_dto.options, ) return writer.run(df=normalized_df) diff --git a/syncmaster/worker/handlers/file/base.py b/syncmaster/worker/handlers/file/base.py index 1d0f9a39..8d1f253d 100644 --- a/syncmaster/worker/handlers/file/base.py +++ b/syncmaster/worker/handlers/file/base.py @@ -4,30 +4,17 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING from onetl.base.base_file_df_connection import BaseFileDFConnection -from onetl.file import FileDFReader, FileDFWriter, FileMover -from onetl.file.filter import Glob from syncmaster.dto.connections import ConnectionDTO from syncmaster.dto.transfers import FileTransferDTO from syncmaster.worker.handlers.base import Handler -if TYPE_CHECKING: - from pyspark.sql.dataframe import DataFrame - COLUMN_FORMATS = ("parquet", "orc") class FileHandler(Handler): - """ - TODO: FileHandler is actually handler for FileDFWriter with remote FS (direct write). - FileProtocolHandler is handler for FileDFWriter with local FS (write via upload). - Maybe we should keep here only common methods, - like file name generator and split other ones to classes where the method is really used. - """ - df_connection: BaseFileDFConnection connection_dto: ConnectionDTO transfer_dto: FileTransferDTO @@ -64,52 +51,6 @@ class FileHandler(Handler): "orc": "orc", } - def read(self) -> DataFrame: - from pyspark.sql.types import StructType - - reader = FileDFReader( - connection=self.df_connection, - format=self.transfer_dto.file_format, - source_path=self.transfer_dto.directory_path, - df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, - options=self.transfer_dto.options, - ) - df = reader.run() - - rows_filter_expression = self._get_rows_filter_expression() - if rows_filter_expression: - df = df.where(rows_filter_expression) - - columns_filter_expressions = self._get_columns_filter_expressions() - if columns_filter_expressions: - df = df.selectExpr(*columns_filter_expressions) - - return df - - def write(self, df: DataFrame) -> None: - tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id)) - try: - writer = FileDFWriter( - connection=self.df_connection, - format=self.transfer_dto.file_format, - target_path=tmp_path, - options=self.transfer_dto.options, - ) - writer.run(df=df) - - self._rename_files(tmp_path) - - mover = FileMover( - connection=self.file_connection, - source_path=tmp_path, - target_path=self.transfer_dto.directory_path, - # ignore .crc and other metadata files - filters=[Glob(f"*.{self._get_file_extension()}")], - ) - mover.run() - finally: - self.file_connection.remove_dir(tmp_path, recursive=True) - def _rename_files(self, tmp_path: str) -> None: files = self.file_connection.list_dir(tmp_path) diff --git a/syncmaster/worker/handlers/file/ftp.py b/syncmaster/worker/handlers/file/ftp.py index 7059d7e3..f1c27232 100644 --- a/syncmaster/worker/handlers/file/ftp.py +++ b/syncmaster/worker/handlers/file/ftp.py @@ -8,13 +8,13 @@ from onetl.connection import FTP, SparkLocalFS from syncmaster.dto.connections import FTPConnectionDTO -from syncmaster.worker.handlers.file.protocol import FileProtocolHandler +from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class FTPHandler(FileProtocolHandler): +class FTPHandler(LocalDFFileHandler): connection_dto: FTPConnectionDTO def connect(self, spark: SparkSession) -> None: diff --git a/syncmaster/worker/handlers/file/ftps.py b/syncmaster/worker/handlers/file/ftps.py index 6082bae2..ca8bd048 100644 --- a/syncmaster/worker/handlers/file/ftps.py +++ b/syncmaster/worker/handlers/file/ftps.py @@ -8,13 +8,13 @@ from onetl.connection import FTPS, SparkLocalFS from syncmaster.dto.connections import FTPSConnectionDTO -from syncmaster.worker.handlers.file.protocol import FileProtocolHandler +from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class FTPSHandler(FileProtocolHandler): +class FTPSHandler(LocalDFFileHandler): connection_dto: FTPSConnectionDTO def connect(self, spark: SparkSession) -> None: diff --git a/syncmaster/worker/handlers/file/hdfs.py b/syncmaster/worker/handlers/file/hdfs.py index d6838605..9384a004 100644 --- a/syncmaster/worker/handlers/file/hdfs.py +++ b/syncmaster/worker/handlers/file/hdfs.py @@ -8,13 +8,13 @@ from onetl.connection import HDFS, SparkHDFS from syncmaster.dto.connections import HDFSConnectionDTO -from syncmaster.worker.handlers.file.base import FileHandler +from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class HDFSHandler(FileHandler): +class HDFSHandler(RemoteDFFileHandler): connection_dto: HDFSConnectionDTO def connect(self, spark: SparkSession): diff --git a/syncmaster/worker/handlers/file/protocol.py b/syncmaster/worker/handlers/file/local_df.py similarity index 85% rename from syncmaster/worker/handlers/file/protocol.py rename to syncmaster/worker/handlers/file/local_df.py index 67d0ba08..81e1f78a 100644 --- a/syncmaster/worker/handlers/file/protocol.py +++ b/syncmaster/worker/handlers/file/local_df.py @@ -6,6 +6,7 @@ import os from typing import TYPE_CHECKING +from etl_entities.hwm import FileListHWM, FileModifiedTimeHWM from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader from onetl.file.filter import FileSizeRange, Glob, Regexp @@ -15,17 +16,27 @@ from pyspark.sql import DataFrame -class FileProtocolHandler(FileHandler): +class LocalDFFileHandler(FileHandler): def read(self) -> DataFrame: from pyspark.sql.types import StructType + downloader_params = {} + if self.transfer_dto.strategy.type == "incremental": + hwm_name = f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.directory_path}" + if self.transfer_dto.strategy.increment_by == "file_modified_since": + downloader_params["hwm"] = FileModifiedTimeHWM(name=hwm_name) + elif self.transfer_dto.strategy.increment_by == "file_name": + downloader_params["hwm"] = FileListHWM(name=hwm_name) + downloader = FileDownloader( connection=self.file_connection, source_path=self.transfer_dto.directory_path, local_path=self.temp_dir.name, filters=self._get_file_metadata_filters(), + **downloader_params, ) + downloader.run() reader = FileDFReader( @@ -47,11 +58,12 @@ def read(self) -> DataFrame: return df def write(self, df: DataFrame) -> None: + writer = FileDFWriter( connection=self.local_df_connection, format=self.transfer_dto.file_format, target_path=self.temp_dir.name, - options={"if_exists": "replace_entire_directory"}, + options=self.transfer_dto.options, ) writer.run(df=df) @@ -66,7 +78,6 @@ def write(self, df: DataFrame) -> None: connection=self.file_connection, local_path=self.temp_dir.name, target_path=self.transfer_dto.directory_path, - options=self.transfer_dto.options, ) uploader.run() diff --git a/syncmaster/worker/handlers/file/remote_df.py b/syncmaster/worker/handlers/file/remote_df.py new file mode 100644 index 00000000..57dd9a6b --- /dev/null +++ b/syncmaster/worker/handlers/file/remote_df.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from onetl.file import FileDFReader, FileDFWriter, FileMover +from onetl.file.filter import Glob + +from syncmaster.worker.handlers.file.base import FileHandler + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + + +class RemoteDFFileHandler(FileHandler): + + def read(self) -> DataFrame: + from pyspark.sql.types import StructType + + reader = FileDFReader( + connection=self.df_connection, + format=self.transfer_dto.file_format, + source_path=self.transfer_dto.directory_path, + df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, + options=self.transfer_dto.options, + ) + df = reader.run() + + rows_filter_expression = self._get_rows_filter_expression() + if rows_filter_expression: + df = df.where(rows_filter_expression) + + columns_filter_expressions = self._get_columns_filter_expressions() + if columns_filter_expressions: + df = df.selectExpr(*columns_filter_expressions) + + return df + + def write(self, df: DataFrame) -> None: + tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id)) + try: + writer = FileDFWriter( + connection=self.df_connection, + format=self.transfer_dto.file_format, + target_path=tmp_path, + ) + writer.run(df=df) + + self._rename_files(tmp_path) + + mover = FileMover( + connection=self.file_connection, + source_path=tmp_path, + target_path=self.transfer_dto.directory_path, + # ignore .crc and other metadata files + filters=[Glob(f"*.{self._get_file_extension()}")], + ) + mover.run() + finally: + self.file_connection.remove_dir(tmp_path, recursive=True) diff --git a/syncmaster/worker/handlers/file/s3.py b/syncmaster/worker/handlers/file/s3.py index 074ca754..c20ccda0 100644 --- a/syncmaster/worker/handlers/file/s3.py +++ b/syncmaster/worker/handlers/file/s3.py @@ -9,13 +9,13 @@ from onetl.file import FileDFReader from syncmaster.dto.connections import S3ConnectionDTO -from syncmaster.worker.handlers.file.base import FileHandler +from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler if TYPE_CHECKING: from pyspark.sql import DataFrame, SparkSession -class S3Handler(FileHandler): +class S3Handler(RemoteDFFileHandler): connection_dto: S3ConnectionDTO def connect(self, spark: SparkSession): diff --git a/syncmaster/worker/handlers/file/samba.py b/syncmaster/worker/handlers/file/samba.py index e2a31b2c..38de3286 100644 --- a/syncmaster/worker/handlers/file/samba.py +++ b/syncmaster/worker/handlers/file/samba.py @@ -8,13 +8,13 @@ from onetl.connection import Samba, SparkLocalFS from syncmaster.dto.connections import SambaConnectionDTO -from syncmaster.worker.handlers.file.protocol import FileProtocolHandler +from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class SambaHandler(FileProtocolHandler): +class SambaHandler(LocalDFFileHandler): connection_dto: SambaConnectionDTO def connect(self, spark: SparkSession) -> None: diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index e1fe68e7..f465a51e 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -8,13 +8,13 @@ from onetl.connection import SFTP, SparkLocalFS from syncmaster.dto.connections import SFTPConnectionDTO -from syncmaster.worker.handlers.file.protocol import FileProtocolHandler +from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class SFTPHandler(FileProtocolHandler): +class SFTPHandler(LocalDFFileHandler): connection_dto: SFTPConnectionDTO def connect(self, spark: SparkSession) -> None: diff --git a/syncmaster/worker/handlers/file/webdav.py b/syncmaster/worker/handlers/file/webdav.py index 0738cbeb..cb6da80a 100644 --- a/syncmaster/worker/handlers/file/webdav.py +++ b/syncmaster/worker/handlers/file/webdav.py @@ -8,13 +8,13 @@ from onetl.connection import SparkLocalFS, WebDAV from syncmaster.dto.connections import WebDAVConnectionDTO -from syncmaster.worker.handlers.file.protocol import FileProtocolHandler +from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: from pyspark.sql import SparkSession -class WebDAVHandler(FileProtocolHandler): +class WebDAVHandler(LocalDFFileHandler): connection_dto: WebDAVConnectionDTO def connect(self, spark: SparkSession) -> None: diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index ba51081c..95cc29ca 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -10,6 +10,7 @@ LoggingSettings, RabbitMQSettings, ) +from syncmaster.worker.settings.hwm_store import HWMStoreSettings class WorkerSettings(BaseSettings): @@ -68,6 +69,7 @@ class WorkerAppSettings(BaseSettings): default_factory=CredentialsEncryptionSettings, description="Settings for encrypting credential data", ) + hwm_store: HWMStoreSettings = Field(default_factory=HWMStoreSettings, description="HWM Store settings") class Config: env_prefix = "SYNCMASTER__" diff --git a/syncmaster/worker/settings/hwm_store.py b/syncmaster/worker/settings/hwm_store.py new file mode 100644 index 00000000..ee7312ce --- /dev/null +++ b/syncmaster/worker/settings/hwm_store.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from typing import Literal + +from pydantic import BaseModel, Field + + +class HWMStoreSettings(BaseModel): + """HWM Store settings. + + HWM Store is used for incremental strategy. See `etl-entities documentation `_. + + Examples + -------- + + .. code-block:: bash + + # Set the HWM Store connection URL + SYNCMASTER__HWM_STORE__URL=http://horizon:8000 + """ + + type: Literal["horizon"] = Field( + description=("HWM Store type"), + ) + url: str = Field( + description=("HWM Store URL"), + ) + user: str = Field( + description=("HWM Store user"), + ) + password: str = Field( + description=("HWM Store password"), + ) + namespace: str = Field( + description=("HWM Store namespace"), + ) diff --git a/syncmaster/worker/transfer.py b/syncmaster/worker/transfer.py index c2677c8b..2c72f97a 100644 --- a/syncmaster/worker/transfer.py +++ b/syncmaster/worker/transfer.py @@ -61,13 +61,14 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings): try: controller = TransferController( + settings=WORKER_SETTINGS, run=run, source_connection=run.transfer.source_connection, target_connection=run.transfer.target_connection, source_auth_data=source_auth_data, target_auth_data=target_auth_data, ) - controller.perform_transfer(WORKER_SETTINGS) + controller.perform_transfer() except Exception: run.status = Status.FAILED logger.exception("Run %r was failed", run.id) diff --git a/tests/resources/file_connection/csv/for_file_filtering/file.csv b/tests/resources/file_connection/csv/for_file_filtering/document.csv similarity index 100% rename from tests/resources/file_connection/csv/for_file_filtering/file.csv rename to tests/resources/file_connection/csv/for_file_filtering/document.csv diff --git a/tests/resources/file_connection/csv/for_file_filtering/file.txt b/tests/resources/file_connection/csv/for_file_filtering/document.txt similarity index 100% rename from tests/resources/file_connection/csv/for_file_filtering/file.txt rename to tests/resources/file_connection/csv/for_file_filtering/document.txt diff --git a/tests/resources/file_connection/csv/for_file_filtering/large_file.csv b/tests/resources/file_connection/csv/for_file_filtering/large_document.csv similarity index 100% rename from tests/resources/file_connection/csv/for_file_filtering/large_file.csv rename to tests/resources/file_connection/csv/for_file_filtering/large_document.csv diff --git a/tests/resources/file_connection/csv/for_file_filtering/small_file.csv b/tests/resources/file_connection/csv/for_file_filtering/small_document.csv similarity index 100% rename from tests/resources/file_connection/csv/for_file_filtering/small_file.csv rename to tests/resources/file_connection/csv/for_file_filtering/small_document.csv diff --git a/tests/resources/file_connection/csv/for_file_filtering/unknown_name.csv b/tests/resources/file_connection/csv/for_file_filtering/unknown_name.csv index 68eb6998..bb2ba5c7 100644 --- a/tests/resources/file_connection/csv/for_file_filtering/unknown_name.csv +++ b/tests/resources/file_connection/csv/for_file_filtering/unknown_name.csv @@ -25,4 +25,3 @@ ID,PHONE_NUMBER,REGION,NUMBER,BIRTH_DATE,REGISTERED_AT,ACCOUNT_BALANCE 1794,001-933-371-2198,с. Приморско-Ахтарск,24,2013-05-19,1998-10-29T17:15:46.207609+00:00,8753.7965 5096,001-499-346-5978x39210,ст. Лотошино,25,1995-01-18,1993-03-29T00:11:40.463076+00:00,4226.1755 5605,8 (413) 673-89-27,Wolfeville,26,1990-03-05,1994-10-20T17:38:21.039941+00:00,5931.4869 -9845,+7 366 768 7792,East Zachary,27,2017-05-12,2001-08-28T13:15:21.965741+00:00,8716.9687 diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py index 61f9d710..7534c855 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py @@ -14,6 +14,7 @@ init_df_with_mixed_column_naming, ) from tests.test_integration.test_run_transfer.connection_fixtures.file_storage_fixtures import ( + file_format_flavor, resource_path, source_file_format, target_file_format, @@ -118,6 +119,11 @@ from tests.test_integration.test_run_transfer.connection_fixtures.spark_fixtures import ( spark, ) +from tests.test_integration.test_run_transfer.connection_fixtures.strategy_fixtures import ( + full_strategy, + incremental_strategy_by_file_modified_since, + incremental_strategy_by_file_name, +) from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import ( prepare_webdav, webdav_connection, diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/file_storage_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/file_storage_fixtures.py index a9957b38..3c40b269 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/file_storage_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/file_storage_fixtures.py @@ -12,6 +12,11 @@ def resource_path(): return path +@pytest.fixture(params=[""]) +def file_format_flavor(request: FixtureRequest): + return request.param + + @pytest.fixture() def source_file_format(request: FixtureRequest): name, params = request.param diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py index 27a59823..e62f8d35 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py @@ -119,7 +119,7 @@ def file_metadata_filter_transformations(): }, { "type": "name_regexp", - "value": r"\bfile\b", + "value": r"\bdoc", }, { "type": "file_size_min", diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py new file mode 100644 index 00000000..5feff4d5 --- /dev/null +++ b/tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py @@ -0,0 +1,24 @@ +import pytest + + +@pytest.fixture +def full_strategy(): + return { + "type": "full", + } + + +@pytest.fixture +def incremental_strategy_by_file_modified_since(): + return { + "type": "incremental", + "increment_by": "file_modified_since", + } + + +@pytest.fixture +def incremental_strategy_by_file_name(): + return { + "type": "incremental", + "increment_by": "file_name", + } diff --git a/tests/test_integration/test_run_transfer/test_ftp.py b/tests/test_integration/test_run_transfer/test_ftp.py index 8e222a65..0705a3ee 100644 --- a/tests/test_integration/test_run_transfer/test_ftp.py +++ b/tests/test_integration/test_run_transfer/test_ftp.py @@ -9,24 +9,24 @@ from onetl.db import DBReader from onetl.file import FileDFReader, FileDownloader from pyspark.sql import DataFrame -from pyspark.sql.functions import col, date_format, date_trunc, to_timestamp -from pytest import FixtureRequest +from pyspark.sql.functions import col, date_format, to_timestamp +from pytest_lazy_fixtures import lf from sqlalchemy.ext.asyncio import AsyncSession from syncmaster.db.models import Connection, Group, Queue, Status from syncmaster.db.models.transfer import Transfer from tests.mocks import MockUser from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end +from tests.utils import ( + add_increment_to_files_and_upload, + get_run_on_end, + prepare_dataframes_for_comparison, + run_transfer_and_verify, +) pytestmark = [pytest.mark.asyncio, pytest.mark.worker] -@pytest.fixture(params=[""]) -def file_format_flavor(request: FixtureRequest): - return request.param - - @pytest_asyncio.fixture async def ftp_to_postgres( session: AsyncSession, @@ -38,6 +38,7 @@ async def ftp_to_postgres( prepare_ftp, source_file_format, file_format_flavor: str, + strategy: dict, ): format_name, file_format = source_file_format format_name_in_path = "xlsx" if format_name == "excel" else format_name @@ -63,6 +64,7 @@ async def ftp_to_postgres( "type": "postgres", "table_name": "public.target_table", }, + strategy_params=strategy, queue_id=queue.id, ) yield result @@ -109,81 +111,106 @@ async def postgres_to_ftp( @pytest.mark.parametrize( - "source_file_format, file_format_flavor", + "source_file_format, file_format_flavor, strategy", [ pytest.param( ("csv", {}), "with_header", + lf("full_strategy"), id="csv", ), pytest.param( ("json", {}), "without_compression", + lf("full_strategy"), id="json", ), pytest.param( ("jsonline", {}), "without_compression", + lf("full_strategy"), id="jsonline", ), pytest.param( ("excel", {}), "with_header", + lf("full_strategy"), id="excel", ), pytest.param( ("orc", {}), "without_compression", + lf("full_strategy"), id="orc", ), pytest.param( ("parquet", {}), "without_compression", + lf("full_strategy"), id="parquet", ), pytest.param( ("xml", {}), "without_compression", + lf("full_strategy"), id="xml", ), ], indirect=["source_file_format", "file_format_flavor"], ) -async def test_run_transfer_ftp_to_postgres( +async def test_run_transfer_ftp_to_postgres_with_full_strategy( prepare_postgres, group_owner: MockUser, init_df: DataFrame, client: AsyncClient, ftp_to_postgres: Transfer, - source_file_format, - file_format_flavor, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, ): - # Arrange postgres, _ = prepare_postgres file_format, _ = source_file_format - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": ftp_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 + await run_transfer_and_verify(client, group_owner, ftp_to_postgres.id) - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, + reader = DBReader( + connection=postgres, + table="public.target_table", ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + df = reader.run() - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() + + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor, strategy", + [ + pytest.param( + ("csv", {}), + "with_header", + lf("incremental_strategy_by_file_name"), + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_ftp_to_postgres_with_incremental_strategy( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + ftp_to_postgres: Transfer, + ftp_file_connection: FTP, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + tmp_path: Path, +): + postgres, _ = prepare_postgres + file_format, _ = source_file_format + + await run_transfer_and_verify(client, group_owner, ftp_to_postgres.id) reader = DBReader( connection=postgres, @@ -191,15 +218,27 @@ async def test_run_transfer_ftp_to_postgres( ) df = reader.run() - # as Excel does not support datetime values with precision greater than milliseconds - if file_format == "excel": - df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) - init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() + df_count = df.count() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) + add_increment_to_files_and_upload( + file_connection=ftp_file_connection, + remote_path=f"/data/file_df_connection/{file_format}/{file_format_flavor}", + tmp_path=tmp_path, + ) - assert df.sort("id").collect() == init_df.sort("id").collect() + await run_transfer_and_verify(client, group_owner, ftp_to_postgres.id) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df_with_increment = reader.run() + + df_with_increment, init_df = prepare_dataframes_for_comparison(df_with_increment, init_df, file_format) + assert df_with_increment.count() > df_count + assert df_with_increment.sort("id").collect() == init_df.union(init_df).sort("id").collect() @pytest.mark.parametrize( diff --git a/tests/test_integration/test_run_transfer/test_ftps.py b/tests/test_integration/test_run_transfer/test_ftps.py index 119f50bb..7d7b4f2a 100644 --- a/tests/test_integration/test_run_transfer/test_ftps.py +++ b/tests/test_integration/test_run_transfer/test_ftps.py @@ -9,23 +9,23 @@ from onetl.db import DBReader from onetl.file import FileDFReader, FileDownloader from pyspark.sql import DataFrame -from pytest import FixtureRequest +from pytest_lazy_fixtures import lf from sqlalchemy.ext.asyncio import AsyncSession from syncmaster.db.models import Connection, Group, Queue, Status from syncmaster.db.models.transfer import Transfer from tests.mocks import MockUser from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end +from tests.utils import ( + add_increment_to_files_and_upload, + get_run_on_end, + prepare_dataframes_for_comparison, + run_transfer_and_verify, +) pytestmark = [pytest.mark.asyncio, pytest.mark.worker] -@pytest.fixture(params=[""]) -def file_format_flavor(request: FixtureRequest): - return request.param - - @pytest_asyncio.fixture async def ftps_to_postgres( session: AsyncSession, @@ -37,6 +37,7 @@ async def ftps_to_postgres( prepare_ftps, source_file_format, file_format_flavor: str, + strategy: dict, ): format_name, file_format = source_file_format format_name_in_path = "xlsx" if format_name == "excel" else format_name @@ -62,6 +63,7 @@ async def ftps_to_postgres( "type": "postgres", "table_name": "public.target_table", }, + strategy_params=strategy, queue_id=queue.id, ) yield result @@ -108,51 +110,70 @@ async def postgres_to_ftps( @pytest.mark.parametrize( - "source_file_format, file_format_flavor", + "source_file_format, file_format_flavor, strategy", [ pytest.param( ("csv", {}), "with_header", + lf("full_strategy"), id="csv", ), ], indirect=["source_file_format", "file_format_flavor"], ) -async def test_run_transfer_ftps_to_postgres( +async def test_run_transfer_ftps_to_postgres_with_full_strategy( prepare_postgres, group_owner: MockUser, init_df: DataFrame, client: AsyncClient, ftps_to_postgres: Transfer, - source_file_format, - file_format_flavor, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, ): - # Arrange postgres, _ = prepare_postgres file_format, _ = source_file_format - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": ftps_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 + await run_transfer_and_verify(client, group_owner, ftps_to_postgres.id) - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, + reader = DBReader( + connection=postgres, + table="public.target_table", ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + df = reader.run() - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() + + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor, strategy", + [ + pytest.param( + ("csv", {}), + "with_header", + lf("incremental_strategy_by_file_name"), + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_ftp_to_postgres_with_incremental_strategy( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + ftps_to_postgres: Transfer, + ftps_file_connection: FTPS, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + tmp_path: Path, +): + postgres, _ = prepare_postgres + file_format, _ = source_file_format + + await run_transfer_and_verify(client, group_owner, ftps_to_postgres.id) reader = DBReader( connection=postgres, @@ -160,10 +181,27 @@ async def test_run_transfer_ftps_to_postgres( ) df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) assert df.sort("id").collect() == init_df.sort("id").collect() + df_count = df.count() + + add_increment_to_files_and_upload( + file_connection=ftps_file_connection, + remote_path=f"/data/file_df_connection/{file_format}/{file_format_flavor}", + tmp_path=tmp_path, + ) + + await run_transfer_and_verify(client, group_owner, ftps_to_postgres.id) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df_with_increment = reader.run() + + df_with_increment, init_df = prepare_dataframes_for_comparison(df_with_increment, init_df, file_format) + assert df_with_increment.count() > df_count + assert df_with_increment.sort("id").collect() == init_df.union(init_df).sort("id").collect() @pytest.mark.parametrize( diff --git a/tests/test_integration/test_run_transfer/test_samba.py b/tests/test_integration/test_run_transfer/test_samba.py index e6312360..df9d355b 100644 --- a/tests/test_integration/test_run_transfer/test_samba.py +++ b/tests/test_integration/test_run_transfer/test_samba.py @@ -9,23 +9,23 @@ from onetl.db import DBReader from onetl.file import FileDFReader, FileDownloader from pyspark.sql import DataFrame -from pytest import FixtureRequest +from pytest_lazy_fixtures import lf from sqlalchemy.ext.asyncio import AsyncSession from syncmaster.db.models import Connection, Group, Queue, Status from syncmaster.db.models.transfer import Transfer from tests.mocks import MockUser from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end +from tests.utils import ( + add_increment_to_files_and_upload, + get_run_on_end, + prepare_dataframes_for_comparison, + run_transfer_and_verify, +) pytestmark = [pytest.mark.asyncio, pytest.mark.worker] -@pytest.fixture(params=[""]) -def file_format_flavor(request: FixtureRequest): - return request.param - - @pytest_asyncio.fixture async def samba_to_postgres( session: AsyncSession, @@ -37,6 +37,7 @@ async def samba_to_postgres( prepare_samba, source_file_format, file_format_flavor: str, + strategy: dict, ): format_name, file_format = source_file_format format_name_in_path = "xlsx" if format_name == "excel" else format_name @@ -62,6 +63,7 @@ async def samba_to_postgres( "type": "postgres", "table_name": "public.target_table", }, + strategy_params=strategy, queue_id=queue.id, ) yield result @@ -108,51 +110,70 @@ async def postgres_to_samba( @pytest.mark.parametrize( - "source_file_format, file_format_flavor", + "source_file_format, file_format_flavor, strategy", [ pytest.param( ("csv", {}), "with_header", + lf("full_strategy"), id="csv", ), ], indirect=["source_file_format", "file_format_flavor"], ) -async def test_run_transfer_samba_to_postgres( +async def test_run_transfer_samba_to_postgres_with_full_strategy( prepare_postgres, group_owner: MockUser, init_df: DataFrame, client: AsyncClient, samba_to_postgres: Transfer, - source_file_format, - file_format_flavor, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, ): - # Arrange postgres, _ = prepare_postgres file_format, _ = source_file_format - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": samba_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 + await run_transfer_and_verify(client, group_owner, samba_to_postgres.id) - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, + reader = DBReader( + connection=postgres, + table="public.target_table", ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + df = reader.run() - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() + + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor, strategy", + [ + pytest.param( + ("csv", {}), + "with_header", + lf("incremental_strategy_by_file_name"), + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_samba_to_postgres_with_incremental_strategy( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + samba_to_postgres: Transfer, + samba_file_connection: Samba, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + tmp_path: Path, +): + postgres, _ = prepare_postgres + file_format, _ = source_file_format + + await run_transfer_and_verify(client, group_owner, samba_to_postgres.id) reader = DBReader( connection=postgres, @@ -160,10 +181,27 @@ async def test_run_transfer_samba_to_postgres( ) df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) assert df.sort("id").collect() == init_df.sort("id").collect() + df_count = df.count() + + add_increment_to_files_and_upload( + file_connection=samba_file_connection, + remote_path=f"/data/file_df_connection/{file_format}/{file_format_flavor}", + tmp_path=tmp_path, + ) + + await run_transfer_and_verify(client, group_owner, samba_to_postgres.id) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df_with_increment = reader.run() + + df_with_increment, init_df = prepare_dataframes_for_comparison(df_with_increment, init_df, file_format) + assert df_with_increment.count() > df_count + assert df_with_increment.sort("id").collect() == init_df.union(init_df).sort("id").collect() @pytest.mark.parametrize( diff --git a/tests/test_integration/test_run_transfer/test_sftp.py b/tests/test_integration/test_run_transfer/test_sftp.py index 8c08cc2a..c7dcb3a5 100644 --- a/tests/test_integration/test_run_transfer/test_sftp.py +++ b/tests/test_integration/test_run_transfer/test_sftp.py @@ -9,7 +9,6 @@ from onetl.db import DBReader from onetl.file import FileDFReader, FileDownloader from pyspark.sql import DataFrame -from pytest import FixtureRequest from pytest_lazy_fixtures import lf from sqlalchemy.ext.asyncio import AsyncSession @@ -17,16 +16,16 @@ from syncmaster.db.models.transfer import Transfer from tests.mocks import MockUser from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end +from tests.utils import ( + add_increment_to_files_and_upload, + get_run_on_end, + prepare_dataframes_for_comparison, + run_transfer_and_verify, +) pytestmark = [pytest.mark.asyncio, pytest.mark.worker] -@pytest.fixture(params=[""]) -def file_format_flavor(request: FixtureRequest): - return request.param - - @pytest_asyncio.fixture async def sftp_to_postgres( session: AsyncSession, @@ -38,6 +37,7 @@ async def sftp_to_postgres( prepare_sftp, source_file_format, file_format_flavor: str, + strategy: dict, transformations: list[dict], ): format_name, file_format = source_file_format @@ -64,6 +64,7 @@ async def sftp_to_postgres( "type": "postgres", "table_name": "public.target_table", }, + strategy_params=strategy, transformations=transformations, queue_id=queue.id, ) @@ -111,53 +112,74 @@ async def postgres_to_sftp( @pytest.mark.parametrize( - "source_file_format, file_format_flavor, transformations", + "source_file_format, file_format_flavor, strategy, transformations", [ pytest.param( ("csv", {}), "for_file_filtering", + lf("full_strategy"), lf("file_metadata_filter_transformations"), id="csv", ), ], indirect=["source_file_format", "file_format_flavor"], ) -async def test_run_transfer_sftp_to_postgres( +async def test_run_transfer_sftp_to_postgres_with_full_strategy( prepare_postgres, group_owner: MockUser, init_df: DataFrame, client: AsyncClient, sftp_to_postgres: Transfer, - source_file_format, - file_format_flavor, - transformations, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + transformations: list[dict], ): - # Arrange postgres, _ = prepare_postgres file_format, _ = source_file_format - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": sftp_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 + await run_transfer_and_verify(client, group_owner, sftp_to_postgres.id) - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, + reader = DBReader( + connection=postgres, + table="public.target_table", ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + df = reader.run() - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() + + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor, strategy, transformations", + [ + pytest.param( + ("csv", {}), + "for_file_filtering", + lf("incremental_strategy_by_file_modified_since"), + lf("file_metadata_filter_transformations"), + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_sftp_to_postgres_with_incremental_strategy( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + sftp_to_postgres: Transfer, + sftp_file_connection: SFTP, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + transformations: list[dict], + tmp_path: Path, +): + postgres, _ = prepare_postgres + file_format, _ = source_file_format + + await run_transfer_and_verify(client, group_owner, sftp_to_postgres.id) reader = DBReader( connection=postgres, @@ -165,10 +187,27 @@ async def test_run_transfer_sftp_to_postgres( ) df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) assert df.sort("id").collect() == init_df.sort("id").collect() + df_count = df.count() + + add_increment_to_files_and_upload( + file_connection=sftp_file_connection, + remote_path=f"/config/data/file_connection/{file_format}/{file_format_flavor}", + tmp_path=tmp_path, + ) + + await run_transfer_and_verify(client, group_owner, sftp_to_postgres.id) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df_with_increment = reader.run() + + df_with_increment, init_df = prepare_dataframes_for_comparison(df_with_increment, init_df, file_format) + assert df_with_increment.count() > df_count + assert df_with_increment.sort("id").collect() == init_df.union(init_df).sort("id").collect() @pytest.mark.parametrize( diff --git a/tests/test_integration/test_run_transfer/test_webdav.py b/tests/test_integration/test_run_transfer/test_webdav.py index 5adb448a..9aade053 100644 --- a/tests/test_integration/test_run_transfer/test_webdav.py +++ b/tests/test_integration/test_run_transfer/test_webdav.py @@ -9,23 +9,23 @@ from onetl.db import DBReader from onetl.file import FileDFReader, FileDownloader from pyspark.sql import DataFrame -from pytest import FixtureRequest +from pytest_lazy_fixtures import lf from sqlalchemy.ext.asyncio import AsyncSession from syncmaster.db.models import Connection, Group, Queue, Status from syncmaster.db.models.transfer import Transfer from tests.mocks import MockUser from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end +from tests.utils import ( + add_increment_to_files_and_upload, + get_run_on_end, + prepare_dataframes_for_comparison, + run_transfer_and_verify, +) pytestmark = [pytest.mark.asyncio, pytest.mark.worker] -@pytest.fixture(params=[""]) -def file_format_flavor(request: FixtureRequest): - return request.param - - @pytest_asyncio.fixture async def webdav_to_postgres( session: AsyncSession, @@ -62,6 +62,10 @@ async def webdav_to_postgres( "type": "postgres", "table_name": "public.target_table", }, + strategy_params={ + "type": "incremental", + "increment_by": "file_modified_since", + }, queue_id=queue.id, ) yield result @@ -108,50 +112,70 @@ async def postgres_to_webdav( @pytest.mark.parametrize( - "source_file_format, file_format_flavor", + "source_file_format, file_format_flavor, strategy", [ pytest.param( ("csv", {}), "with_header", + lf("full_strategy"), id="csv", ), ], indirect=["source_file_format", "file_format_flavor"], ) -async def test_run_transfer_webdav_to_postgres( +async def test_run_transfer_webdav_to_postgres_with_full_strategy( prepare_postgres, group_owner: MockUser, init_df: DataFrame, client: AsyncClient, webdav_to_postgres: Transfer, - source_file_format, - file_format_flavor, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, ): - # Arrange postgres, _ = prepare_postgres + file_format, _ = source_file_format - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": webdav_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 + await run_transfer_and_verify(client, group_owner, webdav_to_postgres.id) - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, + reader = DBReader( + connection=postgres, + table="public.target_table", ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + df = reader.run() + + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) + assert df.sort("id").collect() == init_df.sort("id").collect() - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor, strategy", + [ + pytest.param( + ("csv", {}), + "with_header", + lf("incremental_strategy_by_file_modified_since"), + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_webdav_to_postgres_with_incremental_strategy( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + webdav_to_postgres: Transfer, + webdav_file_connection: WebDAV, + source_file_format: tuple[str, dict], + file_format_flavor: str, + strategy: dict, + tmp_path: Path, +): + postgres, _ = prepare_postgres + file_format, _ = source_file_format + + await run_transfer_and_verify(client, group_owner, webdav_to_postgres.id) reader = DBReader( connection=postgres, @@ -159,10 +183,27 @@ async def test_run_transfer_webdav_to_postgres( ) df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - + df, init_df = prepare_dataframes_for_comparison(df, init_df, file_format) assert df.sort("id").collect() == init_df.sort("id").collect() + df_count = df.count() + + add_increment_to_files_and_upload( + file_connection=webdav_file_connection, + remote_path=f"/data/file_df_connection/{file_format}/{file_format_flavor}", + tmp_path=tmp_path, + ) + + await run_transfer_and_verify(client, group_owner, webdav_to_postgres.id) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df_with_increment = reader.run() + + df_with_increment, init_df = prepare_dataframes_for_comparison(df_with_increment, init_df, file_format) + assert df_with_increment.count() > df_count + assert df_with_increment.sort("id").collect() == init_df.union(init_df).sort("id").collect() @pytest.mark.parametrize( diff --git a/tests/test_unit/test_transfers/test_create_transfer.py b/tests/test_unit/test_transfers/test_create_transfer.py index c3f5002a..8c227ebf 100644 --- a/tests/test_unit/test_transfers/test_create_transfer.py +++ b/tests/test_unit/test_transfers/test_create_transfer.py @@ -48,7 +48,7 @@ async def test_developer_plus_can_create_transfer( "type": "csv", }, }, - "strategy_params": {"type": "incremental", "increment_by": "modified_since"}, + "strategy_params": {"type": "incremental", "increment_by": "file_modified_since"}, "transformations": [ { "type": "dataframe_rows_filter", @@ -449,7 +449,7 @@ async def test_superuser_can_create_transfer( { "location": ["body"], "message": ( - "Value error, Field 'increment_by' must be equal to 'modified_since' for file source types" + "Value error, Field 'increment_by' must be equal to 'file_modified_since' or 'file_name' for file source types" ), "code": "value_error", "context": {}, @@ -458,6 +458,35 @@ async def test_superuser_can_create_transfer( }, }, ), + ( + { + "source_params": { + "type": "s3", + "directory_path": "/source_path", + "file_format": { + "type": "csv", + }, + }, + "strategy_params": { + "type": "incremental", + "increment_by": "file_modified_since", + }, + }, + { + "error": { + "code": "invalid_request", + "message": "Invalid request", + "details": [ + { + "location": ["body"], + "message": ("Value error, S3 and HDFS sources do not support incremental strategy for now"), + "code": "value_error", + "context": {}, + }, + ], + }, + }, + ), ( { "source_params": { @@ -778,8 +807,15 @@ async def test_check_fields_validation_on_create_transfer( # Assert assert result.status_code == 422 - if (new_data == {"schedule": None}) or ( - "strategy_params" in new_data and new_data["strategy_params"].get("increment_by") == "unknown" + if ( + (new_data == {"schedule": None}) + or ("strategy_params" in new_data and new_data["strategy_params"].get("increment_by") == "unknown") + or ( + "strategy_params" in new_data + and new_data["strategy_params"].get("type") == "incremental" + and "source_params" in new_data + and new_data["source_params"].get("type") == "s3" + ) ): error_json["error"]["details"][0]["input"] = transfer_data diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py index 0d5433a7..8286719d 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py @@ -62,7 +62,7 @@ }, }, { - "file_name_template": "{index}.{extension}", + "file_name_template": "{run_id}-{index}.{extension}", }, ), ( @@ -251,7 +251,7 @@ async def test_developer_plus_can_create_s3_transfer( }, }, { - "file_name_template": "{index}.{extension}", + "file_name_template": "{run_id}-{index}.{extension}", }, ), ( @@ -589,6 +589,29 @@ async def test_cannot_create_file_transfer_with_relative_path( ], id="prohibited_symbol", ), + pytest.param( + { + "type": "s3", + "directory_path": "/some/path", + "file_format": { + "type": "excel", + "include_header": True, + }, + }, + { + "file_name_template": "{index}.{extension}", + }, + [ + { + "context": {}, + "input": "{index}.{extension}", + "location": ["body", "target_params", "s3", "file_name_template"], + "message": "Value error, At least one of placeholders must be present: {run_id} or {run_created_at}", + "code": "value_error", + }, + ], + id="missing_placeholders", + ), ], ) async def test_file_name_template_validation( diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py index 19cbda32..2b4353be 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py @@ -58,7 +58,7 @@ "options": {}, }, "target_params": { - "file_name_template": "{index}.{extension}", + "file_name_template": "{run_id}-{index}.{extension}", }, }, { @@ -106,7 +106,7 @@ }, "strategy_params": { "type": "incremental", - "increment_by": "modified_since", + "increment_by": "file_modified_since", }, }, ], diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py index bc84db46..7907983b 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py @@ -41,7 +41,7 @@ "options": {}, }, "target_params": { - "file_name_template": "{index}.{extension}", + "file_name_template": "{run_id}-{index}.{extension}", }, }, { @@ -156,12 +156,12 @@ async def test_developer_plus_can_update_s3_transfer( "type": "ftp", "directory_path": "/some/new/test/directory", "file_format": create_transfer_data["source_and_target_params"]["file_format"], - "file_name_template": "{index}.{extension}", + "file_name_template": "{run_id}--{index}.{extension}", "options": {"some": "option"}, }, "strategy_params": { "type": "incremental", - "increment_by": "modified_since", + "increment_by": "file_modified_since", }, "transformations": [ { diff --git a/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py b/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py index 05fd0456..50331e41 100644 --- a/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py +++ b/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py @@ -71,14 +71,14 @@ async def group_transfers( source_params["directory_path"] = "/path/to/source" target_params.update(common_params) target_params["directory_path"] = "/path/to/target" - target_params["file_name_template"] = "{run_created_at}_{index}.{extension}" + target_params["file_name_template"] = "{run_created_at}-{index}.{extension}" elif connection_type == ConnectionType.HDFS: common_params = {"options": {}} source_params.update(common_params) source_params["directory_path"] = "/path/to/source" target_params.update(common_params) target_params["directory_path"] = "/path/to/target" - target_params["file_name_template"] = "{run_created_at}_{index}.{extension}" + target_params["file_name_template"] = "{run_created_at}-{index}.{extension}" elif connection_type in [ ConnectionType.HIVE, ConnectionType.POSTGRES, diff --git a/tests/utils.py b/tests/utils.py index 7f496306..e61b9228 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,6 +1,7 @@ import asyncio import logging from datetime import datetime +from pathlib import Path from typing import Any from alembic.autogenerate import compare_metadata @@ -9,6 +10,10 @@ from alembic.runtime.migration import MigrationContext from alembic.script import ScriptDirectory from httpx import AsyncClient +from onetl.connection import FileConnection +from onetl.file import FileDownloader, FileUploader +from pyspark.sql import DataFrame +from pyspark.sql.functions import col, date_trunc from sqlalchemy import Connection as AlchConnection from sqlalchemy import MetaData, pool, text from sqlalchemy.ext.asyncio import ( @@ -19,6 +24,7 @@ from syncmaster.db.models import Status from syncmaster.server.settings import ServerAppSettings as Settings +from tests.mocks import MockUser logger = logging.getLogger(__name__) @@ -118,3 +124,72 @@ async def get_run_on_end( raise TimeoutError() await asyncio.sleep(1) + + +def verify_transfer_auth_data(run_data: dict[str, Any]) -> None: + source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] + target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + + assert source_auth_data["user"] + assert "password" not in source_auth_data + assert target_auth_data["user"] + assert "password" not in target_auth_data + + +async def run_transfer_and_verify(client: AsyncClient, user: MockUser, transfer_id: int) -> dict[str, Any]: + result = await client.post( + "v1/runs", + headers={"Authorization": f"Bearer {user.token}"}, + json={"transfer_id": transfer_id}, + ) + assert result.status_code == 200 + + run_data = await get_run_on_end( + client=client, + run_id=result.json()["id"], + token=user.token, + ) + assert run_data["status"] == Status.FINISHED.value + verify_transfer_auth_data(run_data) + + return run_data + + +def prepare_dataframes_for_comparison( + df: DataFrame, + init_df: DataFrame, + file_format: str, +) -> tuple[DataFrame, DataFrame]: + # as Excel does not support datetime values with precision greater than milliseconds + if file_format == "excel": + df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) + init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) + + for field in init_df.schema: + df = df.withColumn(field.name, df[field.name].cast(field.dataType)) + + return df, init_df + + +def add_increment_to_files_and_upload(file_connection: FileConnection, remote_path: str, tmp_path: Path) -> None: + downloader = FileDownloader( + connection=file_connection, + source_path=remote_path, + local_path=tmp_path, + ) + downloader.run() + + for file in tmp_path.iterdir(): + if file.is_file(): + # do not use file.suffix field, as extensions may include compression + stem, suffix = file.name.split(".", 1) + new_name = f"{stem}_increment.{suffix}" + new_path = file.with_name(new_name) + file.rename(new_path) + + uploader = FileUploader( + connection=file_connection, + local_path=tmp_path, + target_path=remote_path, + ) + uploader.run()