A powerful, production-ready data synchronization tool for modern data engineering workflows. SyncTool provides intelligent, incremental data synchronization between databases with advanced features like automatic DDL management, distributed scheduling, and pipeline-based processing.
synchub.mp4
- Multiple Sync Strategies: Full, Delta (timestamp-based), and Hash-based synchronization
- Intelligent Strategy Selection: Automatically selects the best sync strategy based on time gap and historical patterns
- Resumable Syncs: Automatically resume from last successful partition on failure - no data loss or duplication
- Distributed Architecture: Scale horizontally with Redis-backed job queue and multiple workers
- Pipeline-Based Processing: Modular, memory-efficient streaming data processing with configurable stages
- Universal Schema Mapping: Automatic type conversion between different database systems
- PostgreSQL - Full support with DDL generation
- StarRocks - High-performance OLAP database
- MySQL - Traditional RDBMS support
- ClickHouse - Columnar database (planned)
- DuckDB - Embedded analytics (planned)
- S3/MinIO - Object storage for data lakes
- Automatic DDL Management: Generate and track DDL changes, with safe deployment workflows
- Build & Deployment System: Validate, build, and deploy configurations with change detection
- Data Enrichment: Join and enrich data from multiple sources with caching
- Partitioned Processing: Efficient handling of large datasets with automatic partitioning
- Execution Locking: Prevent concurrent runs and DDL conflicts with Redis-based distributed locks
- Retry Logic: Slot-aware retry with exponential backoff for failed jobs
- Monitoring & Observability: Built-in metrics, logging, and HTTP API for monitoring
- Scheduler & Cron: Schedule pipelines with cron expressions
- Configuration Management: Store configs in files or databases with versioning
- Architecture
- Installation
- Quick Start
- Configuration
- Sync Modes
- Usage
- CLI Commands
- HTTP API
- Examples
- Deployment
- Monitoring
- Troubleshooting
- Contributing
SyncTool uses a distributed, scalable architecture:
βββββββββββββββββββ ββββββββββββββββ βββββββββββββββββββ
β Config Files ββββββΆβ Scheduler ββββββΆβ Redis Queue β
β (YAML/DB) β β (Enhanced β β (ARQ Jobs) β
βββββββββββββββββββ β ARQ) β βββββββββββββββββββ
ββββββββββββββββ β
β
ββββββββββββββββ β
β Workers ββββββββββββββββ
β (Scalable) β
ββββββββββββββββ
β
ββββββββΌββββββββ
β Pipeline β
β Processor β
ββββββββββββββββ
- Scheduler: Reads configurations, determines which strategies to run, enqueues jobs to Redis
- Workers: Pull jobs from Redis queue, execute pipelines, report back to scheduler
- Pipeline Processor: Executes sync jobs with configurable processing stages
- Config Manager: Manages pipeline configurations with validation and versioning
- State Manager: Tracks pipeline execution state and history
Data flows through modular, configurable stages:
ChangeDetection β DataFetch β Transform β Enrich β Batcher β Populate
(Required) (Optional) (Optional) (Optional) (Optional) (Required)
Each stage can be independently enabled/disabled and configured for optimal performance.
- Python 3.8 or higher
- Redis 5.0 or higher
- PostgreSQL, MySQL, or other supported database
# Clone the repository
git clone https://github.com/yourusername/synctool.git
cd synctool
# Install dependencies
pip install -r requirements.txt
# Install in development mode
pip install -e .# Start all services (scheduler, workers, and Redis)
docker-compose -f docker-compose-arq.yml up -d
# Scale workers as needed
docker-compose -f docker-compose-arq.yml up -d --scale worker=5# Using Docker
docker run -d -p 6379:6379 redis:7-alpine
# Or use Docker Compose
docker-compose -f docker-compose-arq.yml up -d redisCreate examples/configs/datastores.yaml:
datastores:
source_db:
type: 'postgres'
connection:
host: 'localhost'
port: 5432
user: 'your_user'
password: 'your_password'
dbname: 'source_database'
schema: 'public'
dest_db:
type: 'postgres'
connection:
host: 'localhost'
port: 5432
user: 'your_user'
password: 'your_password'
dbname: 'dest_database'
schema: 'public'Create examples/configs/pipelines/my_first_pipeline.yaml:
name: "users_sync"
description: "Sync users from source to destination"
partition_step: 10000
partition_column: "id"
source_provider:
data_backend:
type: "postgres"
datastore_name: "source_db"
table: "users"
schema: "public"
destination_provider:
data_backend:
type: "postgres"
datastore_name: "dest_db"
table: "users_copy"
schema: "public"
column_map:
- source: "id"
dest: "id"
roles: ["unique_column", "partition_column"]
data_type: "int"
- source: "name"
dest: "name"
data_type: "string"
- source: "email"
dest: "email"
data_type: "string"
- source: "updated_at"
dest: "updated_at"
data_type: "timestamp"
strategies:
- name: "hourly_delta"
type: "delta"
column: "updated_at"
enabled: true
cron: "0 * * * *" # Run every hour
page_size: 1000python -m synctool.cli.arq_scheduler_cli start \
--config-dir ./examples/configs/pipelines# Start a single worker
python -m synctool.worker.worker_cli start
# Or start multiple workers
python -m synctool.worker.worker_cli start --max-jobs 4 &
python -m synctool.worker.worker_cli start --max-jobs 4 &
python -m synctool.worker.worker_cli start --max-jobs 4 &# Check pipeline status
python -m synctool.cli.arq_scheduler_cli status
# View pipeline history
python -m synctool.cli.arq_scheduler_cli history users_sync
# Check via HTTP API
curl http://localhost:8001/health
curl http://localhost:8001/api/pipelines/states | jqEdit synctool/config/global_config.yaml:
redis:
url: "redis://localhost:6379"
db: 0
scheduler:
enabled: true
http_port: 8001
check_interval: 60
slot_definitions:
high_priority: 5
normal: 10
low_priority: 3
worker:
max_jobs: 4
health_check_interval: 30
storage:
state_dir: "./data/pipeline_states"
logs_dir: "./data/logs"
metrics_dir: "./data/metrics"
build_system:
user_config_dir: "./examples/configs/pipelines"
built_config_dir: "./examples/configs/built"
datastores_path: "./examples/configs/datastores.yaml"
auto_build_on_startup: true
ddl_check_on_build: "required" # Options: required, optional, skip
on_build_failure: "keep_old" # Options: keep_old, removeA complete pipeline configuration example:
name: "advanced_sync_pipeline"
description: "Advanced pipeline with all features"
partition_step: 50000
partition_column: "id"
max_concurrent_partitions: 4
source_provider:
data_backend:
type: "postgres"
datastore_name: "source_db"
table: "orders"
alias: "o"
schema: "public"
join:
- table: "customers"
alias: "c"
on: "o.customer_id = c.id"
type: "left"
filters:
- "o.status = 'active'"
destination_provider:
data_backend:
type: "postgres"
datastore_name: "dest_db"
table: "orders_enriched"
schema: "analytics"
supports_update: true
column_map:
- source: "o.id"
dest: "order_id"
roles: ["unique_column", "partition_column"]
data_type: "int"
- source: "o.order_date"
dest: "order_date"
data_type: "timestamp"
- source: "o.total"
dest: "total_amount"
data_type: "decimal"
precision: 10
scale: 2
- source: "c.name"
dest: "customer_name"
data_type: "string"
- source: "o.updated_at"
dest: "updated_at"
data_type: "timestamp"
strategies:
- name: "delta_sync"
type: "delta"
column: "updated_at"
enabled: true
cron: "*/15 * * * *" # Every 15 minutes
page_size: 1000
use_sub_partition: true
sub_partition_step: 100
- name: "hash_sync"
type: "hash"
enabled: true
cron: "0 2 * * *" # Daily at 2 AM
page_size: 5000
prevent_update_unless_changed: true
- name: "full_sync"
type: "full"
enabled: true
cron: "0 0 * * 0" # Weekly on Sunday
page_size: 10000
enrichment:
enabled: true
cache_backend: "redis"
cache_config:
maxsize: 100000
ttl: 3600
dimensions:
- name: "customer_segment"
join_key: "customer_id"
source:
type: "postgres"
config:
datastore_name: "analytics_db"
table: "customer_segments"
fields:
- source: "segment"
dest: "customer_segment"
- source: "score"
dest: "customer_score"
transformations:
- columns: ["total_amount"]
transform: "lambda x: round(x['total_amount'] * 1.1, 2)"
dest: "total_with_tax"SyncTool supports two powerful sync modes: Row-Level Sync for individual records and Aggregate Sync for pre-aggregated data with intelligent change detection.
Traditional row-by-row synchronization where each record is processed individually. Ideal for transactional data that needs to be replicated as-is.
Configuration Example:
name: "users_row_sync"
sync_type: "row-level" # Default mode
source_provider:
data_backend:
type: "postgres"
datastore_name: "source_db"
table: "users"
schema: "public"
destination_provider:
data_backend:
type: "postgres"
datastore_name: "dest_db"
table: "users_copy"
schema: "public"
column_map:
- source: "id"
dest: "id"
roles: ["unique_column"]
data_type: "int"
- source: "name"
dest: "name"
data_type: "string"
- source: "updated_at"
dest: "updated_at"
data_type: "timestamp"
strategies:
- name: "delta_sync"
type: "delta"
primary_partitions:
- column: "updated_at"
step: 7200 # 2 hours in seconds
data_type: "datetime"
page_size: 1000A revolutionary approach for syncing pre-aggregated data (e.g., daily summaries, rollups, totals) with intelligent change detection. Instead of re-computing entire aggregates, SyncTool identifies which source rows changed and only updates affected aggregates.
Key Features:
- Delta Partitions: Identifies changed source records and maps them to aggregate keys
- Smart Re-aggregation: Only recomputes aggregates for rows that changed
- Multi-dimensional Partitioning: Handles complex aggregations across multiple dimensions
- Deduplication: Automatically deduplicates within aggregation windows for delta syncs
- Column-Level Expressions: Use SQL aggregation functions (
SUM,COUNT,MAX,AVG, etc.)
How It Works:
- Change Detection: Identifies source rows that changed (e.g., orders with
updated_at > last_sync) - Delta Partitions: Extracts unique combinations of aggregate keys (e.g.,
user_id,date) - Re-aggregation: For each affected aggregate key, re-runs the aggregation query on ALL relevant source rows
- Update Destination: Updates or inserts the re-computed aggregates
Configuration Example - Daily User Orders Aggregate:
name: "user_daily_orders_aggregate"
description: "Sync daily order aggregates per user with change detection"
sync_type: "aggregate" # Enable aggregate mode
enabled: true
max_concurrent_partitions: 4
strategies:
- name: "delta"
type: "delta"
enabled: true
cron: "*/15 * * * *" # Every 15 minutes
# Primary partition: Detect changes in this time range
primary_partitions:
- column: "updated_at"
step: 2 # Check last 2 hours
step_unit: "hour"
data_type: "datetime"
bounded: true # Only look at recent data
# Delta partitions: Extract affected aggregate keys from changed rows
delta_partitions:
- column: "user_id"
type: "value" # Extract distinct values
step: -1 # -1 means "extract all distinct values"
data_type: "integer"
- column: "order_date"
step: 1
step_unit: "day"
data_type: "datetime"
page_size: 1000
use_pagination: true
backends:
- name: "source_backend"
type: "postgres"
datastore_name: "local_postgres"
table: "orders"
schema: "public"
columns:
- name: "user_id"
data_type: "integer"
- name: "amount"
data_type: "decimal"
- name: "discount"
data_type: "decimal"
- name: "order_date"
data_type: "datetime"
- name: "updated_at"
data_type: "datetime"
- name: "checksum_int"
data_type: "bigint"
hash_key: true
virtual: true
- name: "destination_backend"
type: "postgres"
datastore_name: "local_postgres"
table: "user_daily_orders_aggregate"
schema: "analytics"
columns:
- name: "user_id"
data_type: "integer"
unique_column: true # Part of composite key
- name: "date"
data_type: "date"
unique_column: true # Part of composite key
- name: "amount"
data_type: "decimal"
- name: "discount"
data_type: "decimal"
- name: "num_orders"
data_type: "integer"
- name: "last_updated_at"
data_type: "datetime"
- name: "checksum_int"
data_type: "bigint"
hash_key: true
- name: "order_date"
expr: "date" # Virtual column mapping
data_type: "date"
virtual: true
stages:
- name: "partition_generator"
type: "partition"
enabled: true
source:
name: source_backend
destination:
name: destination_backend
- name: "change_detection"
type: "change_detection"
enabled: true
source:
name: source_backend
destination:
name: destination_backend
# Key stage: Defines aggregation logic
- name: "data_fetch"
type: "data_fetch"
enabled: true
source:
name: source_backend
columns:
- name: "user_id" # GROUP BY column
- name: "date"
expr: "CAST(order_date AS DATE)" # GROUP BY column
- name: "num_orders"
expr: "count(1)" # Aggregation
- name: "amount"
expr: "sum(amount)" # Aggregation
- name: "discount"
expr: "sum(discount)" # Aggregation
- name: "last_updated_at"
expr: "max(updated_at)" # Track latest update
group_by: ["user_id", "date"] # Defines aggregation granularity
destination:
name: destination_backend
# Important: Dedup for delta syncs (when multiple orders for same user/day change)
- name: "dedup"
type: "dedup"
enabled: true
applicable_on: "delta" # Only applies to delta strategy
config:
dedup_keys: ["user_id", "date"] # Ensure one aggregate per key
- name: "data_batcher"
type: "batcher"
enabled: true
config:
target_batch_size: 100
- name: "populate"
type: "populate"
enabled: true
destination:
name: destination_backendExample Flow:
Imagine we have these orders:
-- Initial orders
order_id | user_id | amount | order_date | updated_at
---------|---------|--------|------------|------------
1 | 101 | 50.00 | 2024-01-15 | 2024-01-15 10:00
2 | 101 | 30.00 | 2024-01-15 | 2024-01-15 11:00
3 | 102 | 75.00 | 2024-01-15 | 2024-01-15 12:00Aggregate table after initial sync:
user_id | date | num_orders | amount | last_updated_at
--------|------------|------------|--------|----------------
101 | 2024-01-15 | 2 | 80.00 | 2024-01-15 11:00
102 | 2024-01-15 | 1 | 75.00 | 2024-01-15 12:00Now, order #2 is updated:
UPDATE orders SET amount = 40.00, updated_at = '2024-01-15 14:00' WHERE order_id = 2;Delta sync detects change:
- Primary partition check: Find rows where
updated_at > last_syncβ finds order #2 - Extract delta partitions: From changed row, extract
user_id=101anddate=2024-01-15 - Re-aggregate: Run aggregation query for
user_id=101ANDdate=2024-01-15:SELECT user_id, CAST(order_date AS DATE) as date, COUNT(1) as num_orders, SUM(amount) as amount, MAX(updated_at) as last_updated_at FROM orders WHERE user_id = 101 AND CAST(order_date AS DATE) = '2024-01-15' GROUP BY user_id, date
- Update aggregate: Result is
num_orders=2, amount=90.00β updates existing aggregate
Final aggregate table:
user_id | date | num_orders | amount | last_updated_at
--------|------------|------------|--------|----------------
101 | 2024-01-15 | 2 | 90.00 | 2024-01-15 14:00 β Updated!
102 | 2024-01-15 | 1 | 75.00 | 2024-01-15 12:00 β UnchangedAdvanced: Nested Multi-Dimensional Aggregates
For complex aggregations with multiple dimensions:
strategies:
- name: "hash"
type: "hash"
enabled: true
# Multiple primary partition dimensions
primary_partitions:
- column: "order_date"
step: 1
step_unit: "month"
data_type: "datetime"
- column: "user_id"
step: 50
data_type: "integer"
# Multiple secondary partition dimensions for finer granularity
secondary_partitions:
- column: "order_date"
step: 1
step_unit: "day"
data_type: "datetime"
- column: "user_id"
step: 5
data_type: "integer"
page_size: 1000This creates a hierarchical partitioning:
- Month β Day (for dates)
- 50 users β 5 users (for user IDs)
SyncTool tracks changes at the column level using hash-based detection, allowing for:
- Selective Updates: Only update rows where specific columns changed
- Hash Columns: Configure which columns contribute to the row hash
- Virtual Columns: Computed columns that don't exist in source (e.g., checksums)
- Prevent Unnecessary Updates: Skip updates when only non-tracked columns change
Configuration:
backends:
- name: "source_backend"
columns:
- name: "user_id"
data_type: "integer"
hash_column: true # Include in hash
- name: "name"
data_type: "string"
hash_column: true # Include in hash
- name: "email"
data_type: "string"
hash_column: true # Include in hash
- name: "last_login"
data_type: "timestamp"
hash_column: false # Don't include in hash (changes don't trigger update)
- name: "checksum"
data_type: "varchar"
hash_key: true # This column stores the hash
virtual: true # Computed, not in source tableHash Algorithms:
SyncTool supports multiple hash algorithms:
# Global configuration
hash_algo: "md5_sum_hash" # Options: md5_sum_hash, hash_md5_hash
strategies:
- name: "hash_sync"
type: "hash"
# Hash sync compares checksums between source and destination
# Only rows with different hashes are syncedHow Column-Level Detection Works:
- Hash Computation: For each row, compute hash from specified
hash_column=truecolumns - Comparison: Compare source hash vs destination hash
- Change Detection:
- New rows: Hash exists in source, not in destination β INSERT
- Modified rows: Hash differs between source and destination β UPDATE
- Deleted rows: Hash exists in destination, not in source β DELETE
- Unchanged rows: Hash matches β SKIP
- Selective Updates: Only columns where
hash_column=trueand value changed trigger updates
Benefits:
- Reduces unnecessary database writes
- Improves sync performance for large datasets
- Allows tracking specific columns (e.g., ignore audit columns like
last_accessed_at) - Supports compliance requirements (e.g., only sync when PII columns change)
Example: Ignore Audit Columns
columns:
# Business columns - tracked
- name: "product_id"
hash_column: true
- name: "name"
hash_column: true
- name: "price"
hash_column: true
- name: "category"
hash_column: true
# Audit columns - not tracked
- name: "created_at"
hash_column: false # Don't trigger sync on creation timestamp changes
- name: "created_by"
hash_column: false # Don't trigger sync on audit field changes
- name: "last_viewed_at"
hash_column: false # Don't trigger sync when users view products
- name: "view_count"
hash_column: false # Don't trigger sync on view counter incrementsWith this configuration:
- Changes to
name,price, orcategoryβ Triggers sync - Changes to
view_countorlast_viewed_atβ Does NOT trigger sync
SyncTool syncs are fully resumable - if a sync job fails or is interrupted, it automatically resumes from the last successfully completed partition, ensuring no data loss or duplication.
How Resumability Works:
- Partition-Level Tracking: Data is divided into partitions (time ranges, ID ranges, etc.)
- Progress Persistence: Each completed partition is tracked in the pipeline state
- Automatic Resume: On restart, the sync skips already-completed partitions
- No Duplication: Idempotent operations ensure re-running the same partition doesn't create duplicates
State Management:
# Pipeline state files stored in data/pipeline_states/
data/pipeline_states/
βββ my_pipeline_state.json # Current sync state
βββ my_pipeline_history.json # Historical runs
βββ my_pipeline_metrics.json # Performance metricsExample State File:
{
"pipeline_id": "users_sync",
"strategy": "delta",
"run_id": "20240115_140000_abc123",
"status": "running",
"started_at": "2024-01-15T14:00:00Z",
"last_completed_partition": {
"partition_id": "updated_at_2024-01-15T12:00:00_2024-01-15T14:00:00",
"completed_at": "2024-01-15T14:15:30Z"
},
"progress": {
"total_partitions": 24,
"completed_partitions": 15,
"failed_partitions": 0,
"rows_processed": 150000
}
}Resume Scenario Example:
Initial Run:
ββ Partition 1 (IDs 1-10000) β Completed
ββ Partition 2 (IDs 10001-20000) β Completed
ββ Partition 3 (IDs 20001-30000) β Completed
ββ Partition 4 (IDs 30001-40000) β Failed (network error)
ββ Partition 5-10 βΈ Not started
Resume Run (automatic):
ββ Partition 1-3 β Skipped (already completed)
ββ Partition 4 (IDs 30001-40000) π Retry from here
ββ Partition 5 (IDs 40001-50000) βΆοΈ Continue
ββ Partition 6-10 βΆοΈ Complete remaining
Benefits:
- Fault Tolerance: Network failures, database timeouts, or worker crashes don't lose progress
- Cost Efficiency: Don't re-process successfully synced data
- Time Savings: Large syncs can be split across multiple runs without starting over
- Safe Retries: Failed partitions can be retried without affecting completed ones
Manual State Management:
# View current state
python -m synctool.cli.state_cli show my_pipeline
# View state for specific strategy
python -m synctool.cli.state_cli show my_pipeline --strategy delta
# Reset state (start fresh)
python -m synctool.cli.state_cli reset my_pipeline
# Reset only specific strategy
python -m synctool.cli.state_cli reset my_pipeline --strategy deltaConfiguration Options:
strategies:
- name: "delta_sync"
type: "delta"
# Partition configuration affects resumability granularity
primary_partitions:
- column: "updated_at"
step: 7200 # 2-hour partitions = finer resume granularity
data_type: "datetime"
secondary_partitions:
- column: "id"
step: 10000 # 10K ID partitions = even finer granularity
data_type: "integer"
# Smaller partitions = more resume points, but more overhead
# Larger partitions = fewer resume points, better performanceBest Practices:
-
Partition Size: Balance between resume granularity and overhead
- Too small: Excessive state tracking overhead
- Too large: More data to re-process on failure
- Sweet spot: 1000-10000 rows per partition
-
Monitoring: Watch for frequently failing partitions
# Check failed partitions curl http://localhost:8001/api/pipelines/my_pipeline/failed-partitions -
Cleanup: Old state files are retained for history
# Cleanup old states (keeps last 30 days by default) python -m synctool.cli.state_cli cleanup --days 30 -
Concurrent Safety: Multiple workers can process different partitions of the same pipeline simultaneously without conflicts
Idempotency Guarantees:
SyncTool ensures safe re-execution through:
- Upsert Operations: INSERT ON CONFLICT UPDATE for databases that support it
- Delete-Then-Insert: For databases without native upsert support
- Hash-Based Detection: Only update rows that actually changed
- Transaction Boundaries: Each partition processed in a transaction
Example: Interrupted Sync Recovery
# Sync interrupted at partition 45 of 100
2024-01-15 14:30:15 | INFO | Processing partition 45/100
2024-01-15 14:30:18 | ERROR | Database connection timeout
2024-01-15 14:30:18 | INFO | Marking partition 44 as completed
2024-01-15 14:30:18 | INFO | Saving state to disk
# Automatic resume on next run
2024-01-15 14:35:00 | INFO | Loading previous state
2024-01-15 14:35:00 | INFO | Resuming from partition 45
2024-01-15 14:35:00 | INFO | Skipping 44 completed partitions
2024-01-15 14:35:01 | INFO | Processing partition 45/100# Using CLI
synctool run --config ./examples/configs/pipelines/my_pipeline.yaml --strategy delta
# Or using Python
python -m synctool.main ./examples/configs/pipelines/my_pipeline.yaml --strategy deltaThe recommended way to run SyncTool in production:
# Terminal 1: Start Scheduler
python -m synctool.cli.arq_scheduler_cli start \
--config-dir ./examples/configs/pipelines \
--redis-url redis://localhost:6379
# Terminal 2: Start Worker(s)
python -m synctool.worker.worker_cli start \
--redis-url redis://localhost:6379 \
--max-jobs 4
# Terminal 3: Monitor
python -m synctool.cli.arq_scheduler_cli status# Start all services
docker-compose -f docker-compose-arq.yml up -d
# View logs
docker-compose -f docker-compose-arq.yml logs -f scheduler
docker-compose -f docker-compose-arq.yml logs -f worker
# Scale workers
docker-compose -f docker-compose-arq.yml up -d --scale worker=5
# Stop all services
docker-compose -f docker-compose-arq.yml downimport asyncio
from synctool.config.config_loader import ConfigLoader
from synctool.sync.sync_job_manager import SyncJobManager
async def run_sync():
# Load configuration
config = ConfigLoader.load_from_yaml("./my_pipeline.yaml")
# Validate
issues = ConfigLoader.validate_config(config)
if issues:
print(f"Configuration issues: {issues}")
return
# Create job manager
job_manager = SyncJobManager(max_concurrent_jobs=2)
# Run sync
result = await job_manager.run_sync_job(
config=config,
strategy_name="delta"
)
print(f"Sync completed: {result}")
# Run the sync
asyncio.run(run_sync())# Start scheduler
synctool scheduler start --config-dir <path> [--redis-url <url>] [--http-port <port>]
# Check status
synctool scheduler status [--global-config <path>]
# View pipeline history
synctool scheduler history <pipeline_name> [--limit 50]
# List all jobs
synctool scheduler list-jobs --config-dir <path># Start worker
synctool worker start [--redis-url <url>] [--max-jobs 4]
# Check worker health
synctool worker health# Check deployment status
synctool deploy status
# Deploy a specific config
synctool deploy config --config <pipeline_name> [--yes]
# Deploy all pending configs
synctool deploy all [--ddl-only] [--yes]
# Build configs without deploying
synctool build --config-dir <path># Validate a configuration file
synctool config validate <config_file>
# Show resolved configuration (with datastore substitution)
synctool config show <config_file>
# List all configurations
synctool config list --config-dir <path># Clear Redis queue
synctool redis clear-queue [--queue-name <name>]
# View queue status
synctool redis queue-status
# Reset pipeline state
synctool state reset <pipeline_name> [--strategy <strategy_name>]
# View pipeline state
synctool state show <pipeline_name> [--strategy <strategy_name>]The scheduler exposes an HTTP API (default port: 8001) for monitoring and control.
# Health check
GET /health
# Scheduler status
GET /api/scheduler/status# List all pipelines
GET /api/pipelines/states
# Get pipeline details
GET /api/pipelines/{pipeline_id}/state
# Get pipeline strategies
GET /api/pipelines/{pipeline_id}/strategies
# Get strategy history
GET /api/pipelines/{pipeline_id}/strategies/{strategy_name}/history?limit=50
# Get logs for a run
GET /api/pipelines/{pipeline_id}/runs/{run_id}/logs# Get deployment status
GET /api/deploy/status
# Get pending configs
GET /api/deploy/pending
# Deploy a config
POST /api/deploy/config/{config_name}
{
"apply_ddl": true,
"if_not_exists": false
}
# Deploy all pending
POST /api/deploy/all
{
"ddl_only": false
}# List all strategies across all pipelines
GET /api/strategies/all
# Get strategy state
GET /api/strategies/{pipeline_id}/{strategy_name}
# Get execution locks
GET /api/locks/pipelines
GET /api/locks/tablesimport requests
# Check health
response = requests.get("http://localhost:8001/health")
print(response.json())
# Get all pipeline states
response = requests.get("http://localhost:8001/api/pipelines/states")
states = response.json()
print(f"Found {len(states)} pipelines")
# Get specific pipeline history
response = requests.get(
"http://localhost:8001/api/pipelines/users_sync/history",
params={"limit": 10}
)
history = response.json()
for run in history:
print(f"Run {run['run_id']}: {run['status']} at {run['started_at']}")The examples/ directory contains comprehensive examples:
example.py: Basic sync job with in-code configurationconfig_manager_usage.py: Using the ConfigManager for storing configsenhanced_scheduler_example.py: Advanced scheduler usage with state managementdatastore_usage_example.py: Working with datastoresexample_uid.py: Using UUID and virtual columns
Located in examples/configs/:
pipeline_example.yaml: Basic pipeline configurationenhanced_pipeline_example.yaml: Advanced pipeline with all featurespostgres_to_s3_delta_sync.yaml: Delta sync from PostgreSQL to S3postgres_to_s3_hash_sync.yaml: Hash-based sync to S3scheduled_postgres_sync.yaml: Scheduled sync with cronuuid_virtual_example.yaml: UUID and virtual column handling
Located in examples/configs/pipelines/:
postgres_postgres.yaml: Simple PostgreSQL to PostgreSQL syncpostgres_postgres_join.yaml: Sync with table joinspostgres_postgres_aggregate_*.yaml: Syncs with aggregationspostgres_postgres_md5sum.yaml: Hash-based sync with MD5postgres_starrocks*.yaml: PostgreSQL to StarRocks sync
# Run basic example
python examples/example.py
# Run config manager example
python examples/config_manager_usage.py
# Run scheduler example (requires Redis)
python examples/enhanced_scheduler_example.py
# Run a pipeline configuration
python -m synctool.main examples/configs/pipelines/postgres_postgres.yaml-
Configuration Management
- Use environment variables for secrets
- Store configs in version control (exclude
built/directory) - Set up proper Redis instance (not Docker for production)
-
Resource Planning
- Calculate worker count based on pipeline concurrency needs
- Size Redis appropriately for queue depth
- Plan database connection pool sizes
-
Monitoring Setup
- Configure log aggregation (e.g., ELK, Splunk)
- Set up metrics collection (metrics stored in JSON files)
- Create alerts for failed pipelines
-
Security
- Use encrypted connections for databases
- Secure Redis with password authentication
- Use secrets management (e.g., Vault, AWS Secrets Manager)
Example Kubernetes manifests:
# scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: synctool-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: synctool-scheduler
template:
metadata:
labels:
app: synctool-scheduler
spec:
containers:
- name: scheduler
image: synctool:latest
command:
- python
- -m
- synctool.cli.arq_scheduler_cli
- start
- --config-dir
- /configs
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
volumeMounts:
- name: configs
mountPath: /configs
- name: data
mountPath: /app/data
volumes:
- name: configs
configMap:
name: synctool-configs
- name: data
persistentVolumeClaim:
claimName: synctool-data
---
# worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: synctool-worker
spec:
replicas: 3
selector:
matchLabels:
app: synctool-worker
template:
metadata:
labels:
app: synctool-worker
spec:
containers:
- name: worker
image: synctool:latest
command:
- python
- -m
- synctool.worker.worker_cli
- start
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
volumeMounts:
- name: data
mountPath: /app/data
volumes:
- name: data
persistentVolumeClaim:
claimName: synctool-data# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
RUN pip install -e .
# Create data directories
RUN mkdir -p /app/data/logs /app/data/metrics /app/data/pipeline_states
# Non-root user
RUN useradd -m synctool && chown -R synctool:synctool /app
USER synctool
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8001/health || exit 1
CMD ["python", "-m", "synctool.cli.arq_scheduler_cli", "start"]SyncTool tracks and stores metrics for each pipeline run:
- Execution Metrics: Start/end times, duration, status
- Data Metrics: Rows processed, inserted, updated, deleted
- Performance Metrics: Partition processing times, batch sizes
- Error Metrics: Failure counts, error messages
Metrics are stored in JSON format at data/metrics/<pipeline>/<run_id>.json:
{
"run_id": "20240101_120000_abc123",
"pipeline": "users_sync",
"strategy": "delta",
"status": "completed",
"started_at": "2024-01-01T12:00:00Z",
"completed_at": "2024-01-01T12:05:30Z",
"duration_seconds": 330,
"rows_processed": 15000,
"rows_inserted": 500,
"rows_updated": 200,
"rows_deleted": 10,
"partitions_processed": 15,
"errors": 0
}Logs are stored in JSON Lines format at data/logs/<pipeline>/<run_id>.log:
{"timestamp": "2024-01-01T12:00:00Z", "level": "INFO", "message": "Starting sync", "run_id": "..."}
{"timestamp": "2024-01-01T12:00:05Z", "level": "INFO", "message": "Processing partition 1-10000", "partition": "1-10000"}
{"timestamp": "2024-01-01T12:00:15Z", "level": "INFO", "message": "Processed 5000 rows", "rows": 5000}-
Set up alerts for:
- Failed pipeline runs
- Long-running jobs
- High retry counts
- DDL lock contention
-
Track trends for:
- Data volume over time
- Execution duration
- Success/failure rates
- Worker utilization
-
Use the HTTP API to build dashboards showing:
- Current pipeline states
- Recent run history
- Active execution locks
- Queue depth
# Check if pipeline is enabled
grep "enabled" examples/configs/built/my_pipeline.yaml
# Check scheduler logs
tail -f data/logs/scheduler.log
# Check Redis connection
redis-cli ping
# Verify config was built
ls -la examples/configs/built/my_pipeline*# Check build report in scheduler logs
grep "BUILD REPORT" data/logs/scheduler.log
# Validate configuration manually
python -m synctool.cli.config_cli validate examples/configs/pipelines/my_pipeline.yaml
# Check DDL requirements
python -m synctool.cli.deploy_cli status# Check worker logs
tail -f data/logs/worker.log
# Verify Redis queue
python -m synctool.cli.redis_cli queue-status
# Check for execution locks
curl http://localhost:8001/api/locks/pipelines# Check metrics for slow partitions
cat data/metrics/my_pipeline/latest_run.json | jq '.partition_times'
# Reduce partition size
# Edit config: partition_step: 5000 # was 50000
# Increase worker count
docker-compose -f docker-compose-arq.yml up -d --scale worker=5
# Enable pagination for large datasets
# Edit config: page_size: 1000# Reduce batch sizes in pipeline config
# Edit strategy config:
pipeline_config:
batch_size: 500 # was 5000
max_concurrent_batches: 5 # was 10
# Enable pagination
data_fetch:
enabled: true
use_pagination: true
page_size: 500Enable detailed logging:
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('synctool').setLevel(logging.DEBUG)Or set environment variable:
export SYNCTOOL_LOG_LEVEL=DEBUG
python -m synctool.cli.arq_scheduler_cli start ...-
Check existing documentation in the repository:
QUICKSTART_ARQ.md- Quick start guidePIPELINE_ARCHITECTURE.md- Pipeline system detailsBUILD_DEPLOYMENT_QUICKSTART.md- Build system guideENHANCED_SCHEDULER_GUIDE.md- Scheduler documentation
-
Enable debug logging and check logs
-
Check metrics and state files in
data/directory -
Use HTTP API to inspect current state
-
Open an issue on GitHub with logs and configuration
- Quick Start Guide - Get started in 5 minutes
- Pipeline Architecture - Deep dive into pipeline stages
- Build & Deployment - Config build system
- Scheduler Guide - Advanced scheduler features
- Retry Behavior - Understanding retry logic
- DDL Generation - DDL management
- Lock Management - Execution locks
- Precision & Scale - Numeric type handling
- UUID & Virtual Columns - Special column types
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run tests (
pytest) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone repository
git clone https://github.com/yourusername/synctool.git
cd synctool
# Create virtual environment
python -m venv venv
source venv/bin/activate # or `venv\Scripts\activate` on Windows
# Install in development mode with all dependencies
pip install -e .
pip install -r requirements.txt
# Run tests
pytest
# Run specific test
pytest tests/test_sync_strategies.py
# Run with coverage
pytest --cov=synctool tests/This project is licensed under the MIT License - see the LICENSE file for details.
- Built with ARQ for distributed task queue
- Uses asyncpg for PostgreSQL async operations
- Inspired by modern data engineering best practices
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: developers@synctool.example
Made with β€οΈ for data engineers who value reliability and performance