Skip to content

A metadata‑driven, audit‑ready ELT framework for SQL Server → Snowflake migration.

Notifications You must be signed in to change notification settings

SStephanJX/Employment_ELT_Framework

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Employment ELT Framework

A Production-Grade, Metadata-Driven ETL Pipeline for SQL Server to Snowflake Migration

Python SQL Server Snowflake Status


📋 Table of Contents


🎯 Overview

The Employment ELT Framework is an enterprise-grade data migration solution designed to extract data from SQL Server databases, load it into Snowflake, and provide comprehensive validation and audit logging. Built with a metadata-driven architecture, it enables seamless table additions without code modifications.

Why This Framework?

  • Zero Data Loss - Dual-phase validation ensures every row is accounted for
  • Full Audit Trail - Complete logging in SQL Server with extraction and load counts
  • Metadata-Driven - Add tables via database configuration, not code changes
  • Production Ready - Error handling, rollback support, and archiving built-in
  • CYA Architecture - Proves data integrity for compliance and auditing

🏗️ Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    EMPLOYMENT ELT FRAMEWORK                      │
└─────────────────────────────────────────────────────────────────┘

┌──────────────┐      ┌──────────────┐      ┌──────────────┐
│              │      │              │      │              │
│ SQL Server   │─────▶│  Python ETL  │─────▶│  Snowflake   │
│  (Source)    │      │   Pipeline   │      │  (Target)    │
│              │      │              │      │              │
└──────────────┘      └──────────────┘      └──────────────┘
       │                     │                      │
       │                     │                      │
       ▼                     ▼                      ▼
┌──────────────┐      ┌──────────────┐      ┌──────────────┐
│ ELT_CONTROL  │      │  CSV Files   │      │   LANDING    │
│ ELT_CONTROL  │      │  (output/)   │      │   Schema     │
│     _LOG     │      │              │      │              │
└──────────────┘      └──────────────┘      └──────────────┘
                            │
                            ▼
                      ┌──────────────┐
                      │   Archive    │
                      │  (output/    │
                      │   archive/)  │
                      └──────────────┘

Three-Phase Process

  1. EXTRACT (extractor.py) - SQL Server → CSV files + Log Rows_Extracted
  2. LOAD (loader.py) - CSV files → Snowflake + Initial logging
  3. VALIDATE (validator.py) - Verify counts + Update Rows_Loaded + Archive

⭐ Key Features

🔍 Dual-Phase Validation

  • Phase 1: Logs rows extracted from SQL Server
  • Phase 2: Validates and logs rows actually loaded in Snowflake
  • Result: Guarantees no silent data loss

📊 Comprehensive Logging

  • ELT_CONTROL_LOG tracks every migration attempt
  • Captures extraction count, load count, timestamps, and errors
  • ELT_CONTROL maintains last successful run and row counts
  • Full audit trail for compliance and debugging

🎯 Metadata-Driven Design

  • All table configurations stored in ELT_CONTROL table
  • Add new tables by inserting database records
  • No code modifications required for new tables
  • DDL stored as metadata for table creation

🗄️ Smart Archiving

  • CSV files archived only after successful validation
  • Timestamped archives prevent overwrites
  • Failed migrations leave CSVs for debugging
  • Automatic cleanup of output directory

🛡️ Enterprise Error Handling

  • Granular error logging per table
  • Rollback support for failed operations
  • Continues processing on individual table failures
  • Detailed error messages for troubleshooting

🔄 Resumable Operations

  • Failed tables can be reprocessed independently
  • Metadata tracks migration status
  • No need to rerun entire pipeline

🧩 Pipeline Components

1. extractor.py

Purpose: Extract data from SQL Server to CSV files

What It Does:

  • Connects to SQL Server using metadata from ELT_CONTROL
  • Executes SQL queries for tables marked to_migrate = 1
  • Writes data to CSV files in output/ directory
  • Logs Rows_Extracted to ELT_CONTROL_LOG
  • Handles errors gracefully and continues with remaining tables

Key Functions:

get_sql_connection()          # SQL Server connection
get_tables_to_migrate(conn)   # Fetch metadata
extract_table()               # Extract single table
log_migration_result()        # Log to ELT_CONTROL_LOG

Output:

  • CSV files: output/TABLE_NAME.csv
  • Console: Detailed extraction report
  • Database: ELT_CONTROL_LOG entries with Rows_Extracted

2. loader.py

Purpose: Load CSV files into Snowflake

What It Does:

  • Connects to Snowflake and SQL Server
  • Creates Snowflake tables using DDL from ELT_CONTROL.Create_Table
  • Stages CSV files using Snowflake's PUT command
  • Loads data using COPY INTO with error handling
  • Updates ELT_CONTROL_LOG with initial load results
  • Updates ELT_CONTROL.Last_Row_Count and Last_Successful_Run

Key Functions:

get_snowflake_connection()    # Snowflake connection
get_tables_to_load()          # Fetch tables with CSVs
create_snowflake_table()      # Execute DDL
load_csv_to_snowflake()       # Stage and COPY INTO
update_load_log()             # Log results

Snowflake Process:

-- 1. Create stage
CREATE STAGE IF NOT EXISTS ELT_STAGE;

-- 2. Upload CSV
PUT 'file://output/TABLE_NAME.csv' @ELT_STAGE/TABLE_NAME/;

-- 3. Load data
COPY INTO "LANDING"."TABLE_NAME"
FROM @ELT_STAGE/TABLE_NAME/
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
ON_ERROR = 'CONTINUE';

Output:

  • Snowflake tables in LANDING schema
  • Console: Load statistics per table
  • Database: Updated ELT_CONTROL_LOG with load counts

3. validator.py

Purpose: Validate data integrity and update audit logs

What It Does:

  • Reads CSV row counts from output/ directory
  • Queries Snowflake for actual loaded row counts
  • Compares CSV vs Snowflake counts
  • Updates SQL Server ELT_CONTROL_LOG.Rows_Loaded ⭐ NEW!
  • Archives CSV files to output/archive/ (with timestamps)
  • Cleans up output directory
  • Generates comprehensive validation report

Key Functions:

get_csv_row_counts()          # Count rows in CSVs
get_snowflake_counts()        # Query Snowflake counts
compare_counts()              # Validate integrity
update_sql_server_log()       # Update Rows_Loaded ⭐ NEW!
archive_csv_files()           # Move to archive
generate_report()             # Final report

Validation Logic:

if csv_count == snowflake_count:
    ✅ Update Rows_Loaded in SQL ServerArchive CSV filesReport success
else:
    ❌ Skip archivingLeave CSVs for debuggingReport mismatch

Output:

  • Archived CSVs: output/archive/TABLE_NAME_YYYYMMDD_HHMMSS.csv
  • Console: Detailed validation report
  • Database: Rows_Loaded and End_Time_UTC updated in SQL Server

Command Line Options:

python validator.py                # Full validation
python validator.py update-log     # Update SQL logs only
python validator.py status         # Check archive status
python validator.py log-status     # Check SQL log status
python validator.py compare        # Compare Extracted vs Loaded

4. update_sql_log.py (Standalone Utility)

Purpose: Update SQL Server logs without CSV files

What It Does:

  • Gets table list from ELT_CONTROL (not CSV files)
  • Queries Snowflake for current row counts
  • Updates ELT_CONTROL_LOG.Rows_Loaded
  • Shows before/after comparison

Use Case: When CSVs are already archived and you need to update logs

Usage:

python update_sql_log.py                              # All tables
python update_sql_log.py --tables TABLE1 TABLE2       # Specific tables

🗄️ Database Schema

ELT_CONTROL Table

Purpose: Metadata repository for all table migrations

CREATE TABLE [dbo].[ELT_CONTROL] (
    [ELT_CONTROL_ID]      INT IDENTITY(1,1) PRIMARY KEY,
    [Database_Name]       VARCHAR(128) NOT NULL,
    [Source_Schema]       VARCHAR(128) NOT NULL,
    [Table_Name]          VARCHAR(128) NOT NULL,
    [Primary_Key]         VARCHAR(256) NULL,
    [SQL_Query]           NVARCHAR(MAX) NULL,        -- SELECT statement
    [Create_Table]        NVARCHAR(MAX) NULL,        -- Snowflake DDL
    [WHERE_CLAUSE]        NVARCHAR(500) NULL,
    [Load_Type]           VARCHAR(20) NOT NULL,      -- FULL, INCREMENTAL
    [Make_Table]          BIT NOT NULL DEFAULT(0),
    [Alter_Table]         BIT NOT NULL DEFAULT(0),
    [to_migrate]          BIT NOT NULL DEFAULT(1),   -- Flag for migration
    [Target_Database]     VARCHAR(128) NULL,         -- EMPLOYMENT_DW
    [Target_Schema]       VARCHAR(128) NULL,         -- LANDING
    [Target_Table]        VARCHAR(128) NULL,
    [Sequence_Number]     INT NOT NULL DEFAULT(1),   -- Execution order
    [Is_Active]           BIT NOT NULL DEFAULT(1),
    [Last_Load_Date]      DATETIME NULL,
    [Last_Successful_Run] DATETIME NULL,
    [Last_Row_Count]      INT NULL,
    [Error_Message]       NVARCHAR(MAX) NULL,
    [UTC_Created_Date]    DATETIME NOT NULL DEFAULT(GETUTCDATE()),
    [UTC_Updated_Date]    DATETIME NULL,
    [Created_By]          VARCHAR(128) NOT NULL DEFAULT(SUSER_SNAME()),
    [Updated_By]          VARCHAR(128) NULL
);

Key Columns:

  • SQL_Query - Query to extract data
  • Create_Table - Snowflake DDL for table creation
  • to_migrate - Flag to include in migration (1 = yes, 0 = no)
  • Target_Schema - Must be set (e.g., 'LANDING')
  • Sequence_Number - Controls execution order
  • Last_Row_Count - Updated by validator with Snowflake count

ELT_CONTROL_LOG Table

Purpose: Audit log for all migration attempts

CREATE TABLE [dbo].[ELT_CONTROL_LOG] (
    [ELT_CONTROL_LOG_ID]  INT IDENTITY(1,1) PRIMARY KEY,
    [ELT_CONTROL_ID]      INT NOT NULL,
    [Start_Time_UTC]      DATETIME2(3) NOT NULL DEFAULT(SYSUTCDATETIME()),
    [End_Time_UTC]        DATETIME2(3) NULL,
    [Status]              VARCHAR(20) NOT NULL,      -- SUCCESS, FAILED
    [Rows_Extracted]      BIGINT NULL,               -- From extractor.py
    [Rows_Loaded]         BIGINT NULL,               -- From validator.py ⭐
    [Error_Message]       NVARCHAR(MAX) NULL,
    [Created_By]          VARCHAR(128) NOT NULL DEFAULT(SUSER_SNAME()),
    [Created_Date_UTC]    DATETIME2(3) NOT NULL DEFAULT(SYSUTCDATETIME()),
    
    CONSTRAINT [FK_ELT_CONTROL_LOG_ELT_CONTROL] 
        FOREIGN KEY([ELT_CONTROL_ID]) 
        REFERENCES [dbo].[ELT_CONTROL]([ELT_CONTROL_ID])
);

Logging Flow:

  1. Extraction (extractor.py):

    INSERT INTO ELT_CONTROL_LOG (
        ELT_CONTROL_ID, Status, Rows_Extracted
    ) VALUES (1, 'SUCCESS', 116);
  2. Load (loader.py):

    UPDATE ELT_CONTROL_LOG 
    SET Rows_Loaded = 116
    WHERE ELT_CONTROL_LOG_ID = (most recent);
  3. Validation (validator.py): ⭐ NEW!

    UPDATE ELT_CONTROL_LOG 
    SET Rows_Loaded = 116,        -- Verified from Snowflake
        End_Time_UTC = SYSUTCDATETIME()
    WHERE ELT_CONTROL_LOG_ID = (most recent);

Result: Rows_Extracted = Rows_Loaded = 116PROOF OF INTEGRITY


🔧 Installation

Prerequisites

  • Python 3.13+
  • SQL Server 2019+
  • Snowflake Enterprise Account
  • ODBC Driver 17 for SQL Server

Python Dependencies

# Create virtual environment
python -m venv venv

# Activate (Windows)
venv\Scripts\activate

# Activate (Linux/Mac)
source venv/bin/activate

# Install packages
pip install pyodbc
pip install snowflake-connector-python
pip install python-dotenv
pip install pandas

Database Setup

  1. Create ELT_CONTROL and ELT_CONTROL_LOG tables in SQL Server
  2. Populate ELT_CONTROL with table metadata
  3. Create LANDING schema in Snowflake:
    CREATE DATABASE IF NOT EXISTS EMPLOYMENT_DW;
    CREATE SCHEMA IF NOT EXISTS EMPLOYMENT_DW.LANDING;

⚙️ Configuration

.env File

Create a .env file in the project root:

# SQL Server Configuration
SQL_SERVER=your_server_name
SQL_DATABASE=Employment
SQL_TRUSTED_CONNECTION=yes
# SQL_USERNAME=your_username     # If not using trusted connection
# SQL_PASSWORD=your_password     # If not using trusted connection

# Snowflake Configuration
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_USER=your_username
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_WAREHOUSE=COMPUTE_WH
SNOWFLAKE_DATABASE=EMPLOYMENT_DW
SNOWFLAKE_SCHEMA=LANDING
SNOWFLAKE_ROLE=ACCOUNTADMIN
SNOWFLAKE_AUTHENTICATOR=snowflake

Directory Structure

Employment_ELT_Framework/
│
├── .env                              # Environment variables (gitignored)
├── .env.example                      # Template for credentials
├── .gitignore                        # Git ignore rules
├── readme.md                         # This file
├── requirements.txt                  # Python dependencies
│
├── .venv/                            # Virtual environment (gitignored)
│
├── Documentation/                    # Additional documentation
│   ├── SETUP.md                      # Setup guide
│   ├── ARCHITECTURE.md               # Architecture details
│   └── TROUBLESHOOTING.md            # Common issues
│
├── Python/                           # ETL Scripts
│   ├── extractor.py                  # Extract from SQL Server
│   ├── loader.py                     # Load to Snowflake
│   ├── validator.py                  # Validate + Log + Archive
│   ├── update_sql_log.py             # Standalone SQL log updater
│   ├── run_elt.py                    # Pipeline orchestrator
│   ├── debug_validator.py            # Debug utility
│   ├── __pycache__/                  # Python cache (gitignored)
│   └── output/                       # Working directory
│       ├── .gitkeep                  # Preserves folder in Git
│       ├── *.csv                     # Extracted CSV files (gitignored)
│       └── archive/
│           ├── .gitkeep              # Preserves folder in Git
│           └── *.csv                 # Archived CSVs (gitignored)
│
└── SQL/                              # Database Setup Scripts
    ├── SQL_README.md                 # SQL setup documentation
    │
    ├── 01_Schema/                    # Table definitions
    │   ├── dbo.ELT_CONTROL.Table.sql
    │   └── dbo.ELT_CONTROL_LOG.Table.sql
    │
    ├── 02_Stored_Procedures/         # Stored procedures
    │   ├── dbo.Generate_ELT_SQL.StoredProcedure.sql
    │   ├── dbo.Mark_Table_Migrated.StoredProcedure.sql
    │   └── dbo.Reset_ELT_Control.StoredProcedure.sql
    │
    ├── 03_Sample_Data/               # Usage examples
    │   ├── Generate_ELT_Metadata.sql
    │   └── Generate_ELT_Metadata_Examples.sql
    │
    └── Snowflake/                    # Snowflake setup
        └── 01_Snowflake_Minimal_Setup.sql

Key Notes:

  • .env contains actual credentials (never commit!)
  • .env.example is the template (safe to commit)
  • validator.py is the enhanced version (renamed for simplicity)
  • SQL scripts organized by deployment order (01, 02, 03)
  • Generate_ELT_SQL stored procedure automates metadata creation

🚀 Usage

Full Pipeline Execution

# 1. Extract data from SQL Server
python extractor.py

# 2. Load data to Snowflake
python loader.py

# 3. Validate, update logs, and archive
python validator.py

Individual Steps

Extract Only:

python extractor.py

Load Only (requires CSV files):

python loader.py

Validate Only:

python validator.py

Update SQL Logs Only (without CSV files):

python update_sql_log.py

Validation Commands

# Full validation with archiving
python validator.py

# Update SQL Server logs only
python validator.py update-log

# Check archive status
python validator.py status

# Check SQL Server log status
python validator.py log-status

# Compare Rows_Extracted vs Rows_Loaded
python validator.py compare

# Clean up output directory
python validator.py clean

🔄 Data Flow

Complete Migration Flow

┌─────────────────────────────────────────────────────────────────┐
│ PHASE 1: EXTRACTION                                              │
└─────────────────────────────────────────────────────────────────┘

1. Read ELT_CONTROL (to_migrate = 1)
2. Execute SQL_Query for each table
3. Write data to CSV files (output/)
4. Log to ELT_CONTROL_LOG:
   - Rows_Extracted = 116
   - Status = 'SUCCESS'

┌─────────────────────────────────────────────────────────────────┐
│ PHASE 2: LOAD                                                    │
└─────────────────────────────────────────────────────────────────┘

1. Read ELT_CONTROL for tables with CSV files
2. Create Snowflake tables (using Create_Table DDL)
3. Stage CSV files (PUT command)
4. Load data (COPY INTO)
5. Update ELT_CONTROL_LOG:
   - Rows_Loaded = 116 (initial)

┌─────────────────────────────────────────────────────────────────┐
│ PHASE 3: VALIDATION ⭐ THE MAGIC HAPPENS HERE                    │
└─────────────────────────────────────────────────────────────────┘

1. Count rows in CSV files (116)
2. Query Snowflake for actual counts (116)
3. Compare:
   - CSV: 116
   - Snowflake: 116
   - Match: ✅ YES

4. Update SQL Server ELT_CONTROL_LOG: ⭐ NEW!
   - Rows_Loaded = 116 (verified from Snowflake)
   - End_Time_UTC = current timestamp

5. Archive CSV files:
   - Move to output/archive/
   - Add timestamp to filename
   - Clean up output directory

6. Generate report:
   ✅ All counts match
   ✅ Logs updated
   ✅ Files archived

Validation Before/After

BEFORE Validation:

ELT_CONTROL_LOG:
┌────────────┬────────────────┬─────────────┬────────┐
│ Table_Name │ Rows_Extracted │ Rows_Loaded │ Status │
├────────────┼────────────────┼─────────────┼────────┤
│ TBLROLES   │            116 │        NULL │ ???    │
└────────────┴────────────────┴─────────────┴────────┘

AFTER Validation:

ELT_CONTROL_LOG:
┌────────────┬────────────────┬─────────────┬────────┐
│ Table_Name │ Rows_Extracted │ Rows_Loaded │ Status │
├────────────┼────────────────┼─────────────┼────────┤
│ TBLROLES   │            116 │         116 │ ✅ MATCH│
└────────────┴────────────────┴─────────────┴────────┘

CYA PROOF: Rows_Extracted = Rows_Loaded 🎯


✅ Validation & Logging

Why Dual-Phase Validation?

The Problem: How do you PROVE data didn't get lost in migration?

The Solution: Two-phase logging

  1. Phase 1 - Log what you EXTRACTED (extractor.py)
  2. Phase 2 - Log what ACTUALLY LOADED (validator.py)

Validation Report Example

============================================================
🔍 CSV vs SNOWFLAKE COMPARISON
============================================================

Table                  CSV Rows  Snowflake Rows  Status
------------------------------------------------------------
TBLEMPLOYEEEDUCATION          3               3  ✅ MATCH
TBLKEYWORDS                 116             116  ✅ MATCH
TBLRESUMEBULLETPOINTS       164             164  ✅ MATCH
...

============================================================
📝 UPDATING SQL SERVER ELT_CONTROL_LOG
============================================================
  ✅ TBLEMPLOYEEEDUCATION       - Updated with 3 rows
  ✅ TBLKEYWORDS                - Updated with 116 rows
  ✅ TBLRESUMEBULLETPOINTS      - Updated with 164 rows

📊 SQL Server Update Summary:
   ✅ Successfully updated: 12
   ❌ Failed: 0

============================================================
📋 FINAL VALIDATION REPORT
============================================================
📅 Validation Date: 2026-01-29 21:36:18
🎯 Target Schema: EMPLOYMENT_DW.LANDING

📊 DATA VALIDATION:
   ✅ CSV Total Rows: 489
   ✅ Snowflake Total Rows: 489
   🎯 Row Counts: ✅ PERFECT MATCH (489 rows)

📝 SQL SERVER LOG UPDATE:
   ✅ ELT_CONTROL_LOG updated successfully

🗄️  ARCHIVING STATUS:
   ✅ CSV files archived to: output\archive

🏁 OVERALL STATUS:
   🎉 COMPLETE SUCCESS: Data validated, logged, and archived!

Audit Queries

Find any data discrepancies:

SELECT 
    Table_Name,
    Rows_Extracted,
    Rows_Loaded,
    Rows_Extracted - Rows_Loaded AS Discrepancy
FROM ELT_CONTROL_LOG
WHERE Rows_Extracted != Rows_Loaded;

Performance analysis:

SELECT 
    c.Table_Name,
    l.Rows_Extracted,
    DATEDIFF(SECOND, l.Start_Time_UTC, l.End_Time_UTC) AS Duration_Seconds,
    l.Rows_Extracted / NULLIF(DATEDIFF(SECOND, l.Start_Time_UTC, l.End_Time_UTC), 0) AS Rows_Per_Second
FROM ELT_CONTROL_LOG l
JOIN ELT_CONTROL c ON l.ELT_CONTROL_ID = c.ELT_CONTROL_ID
ORDER BY Duration_Seconds DESC;

Migration history:

SELECT 
    c.Table_Name,
    l.Created_Date_UTC,
    l.Status,
    l.Rows_Extracted,
    l.Rows_Loaded
FROM ELT_CONTROL_LOG l
JOIN ELT_CONTROL c ON l.ELT_CONTROL_ID = c.ELT_CONTROL_ID
ORDER BY l.Created_Date_UTC DESC;

🛡️ Error Handling

Granular Error Management

Each component handles errors independently:

Extractor:

  • Logs errors per table
  • Continues with remaining tables
  • Records error messages in ELT_CONTROL_LOG

Loader:

  • Validates Target_Schema before loading
  • Handles Snowflake connection errors
  • Continues on table-level failures

Validator:

  • Skips archiving on validation failure
  • Leaves CSV files for debugging
  • Updates logs only on successful validation

Error Scenarios

Scenario 1: Extraction Fails

Table: TBLKEYWORDS
Status: FAILED
Rows_Extracted: NULL
Error: Timeout expired

Action: Fix query, rerun extractor for failed table

Scenario 2: Load Fails

Table: TBLKEYWORDS
Status: FAILED
Error: Invalid CSV format

Action: Fix CSV, rerun loader

Scenario 3: Validation Mismatch

CSV Rows: 116
Snowflake Rows: 115
Status: ❌ MISMATCH

Action:

  • CSV NOT archived (preserved for debugging)
  • Investigate missing row in Snowflake
  • Fix and rerun loader + validator

📌 Best Practices

Adding New Tables

  1. Insert into ELT_CONTROL:
INSERT INTO dbo.ELT_CONTROL (
    Database_Name,
    Source_Schema,
    Table_Name,
    SQL_Query,
    Create_Table,
    Load_Type,
    to_migrate,
    Target_Database,
    Target_Schema,
    Target_Table,
    Sequence_Number,
    Is_Active
) VALUES (
    'Employment',
    'dbo',
    'NEWTABLE',
    'SELECT * FROM dbo.NEWTABLE',
    'CREATE TABLE "LANDING"."NEWTABLE" (ID INT, NAME VARCHAR)',
    'FULL',
    1,                    -- Include in migration
    'EMPLOYMENT_DW',
    'LANDING',            -- ⚠️ REQUIRED!
    'NEWTABLE',
    100,                  -- Execution order
    1                     -- Active
);
  1. Run pipeline - no code changes needed!

Incremental Loads (Future)

For tables that need incremental updates:

UPDATE ELT_CONTROL
SET SQL_Query = 'SELECT * FROM dbo.NEWTABLE WHERE ModifiedDate > ?',
    Load_Type = 'INCREMENTAL'
WHERE Table_Name = 'NEWTABLE';

Monitoring

Daily validation query:

-- Check today's migrations
SELECT 
    c.Table_Name,
    l.Status,
    l.Rows_Extracted,
    l.Rows_Loaded,
    CASE 
        WHEN l.Rows_Loaded IS NULL THEN '⚠️ Not Validated'
        WHEN l.Rows_Extracted = l.Rows_Loaded THEN '✅ Valid'
        ELSE '❌ Mismatch'
    END AS Validation
FROM ELT_CONTROL_LOG l
JOIN ELT_CONTROL c ON l.ELT_CONTROL_ID = c.ELT_CONTROL_ID
WHERE CAST(l.Created_Date_UTC AS DATE) = CAST(GETUTCDATE() AS DATE);

🔧 Troubleshooting

Common Issues

1. "No CSV files found"

Cause: CSVs already archived or extractor didn't run Solution:

# Check archive
dir output\archive\*.csv

# Re-extract if needed
python extractor.py

2. "Object does not exist in Snowflake"

Cause: CSV filenames have timestamps Solution: Remove timestamps from CSV filenames

Get-ChildItem output\*.csv | ForEach-Object {
    $newName = $_.Name -replace '_\d{8}_\d{6}', ''
    Rename-Item $_.FullName -NewName $newName
}

3. "Target_Schema is NULL"

Cause: Missing Target_Schema in ELT_CONTROL Solution:

UPDATE dbo.ELT_CONTROL
SET Target_Schema = 'LANDING'
WHERE Target_Schema IS NULL;

4. "Module has no attribute 'main'"

Cause: Python import cache or file not saved Solution:

# Clear cache
del __pycache__
# Ensure file is saved
# Run directly (not import)
python validator.py

5. Rows_Loaded still NULL after validation

Cause: Validator didn't run or validation failed Solution:

# Check if CSVs are in output/ (not archive/)
dir output\*.csv

# Run standalone updater
python update_sql_log.py

🚀 Future Enhancements

Planned Features

  1. Incremental Loads

    • Watermark-based extraction
    • Merge/Upsert logic in Snowflake
    • Change Data Capture (CDC) support
  2. Parallel Processing

    • Multi-threaded extraction
    • Concurrent Snowflake loads
    • Configurable parallelism
  3. Data Quality Checks

    • Column-level validation
    • Data type verification
    • NULL value checks
    • Custom business rules
  4. Scheduling & Orchestration

    • Apache Airflow DAGs
    • Windows Task Scheduler integration
    • Cron job support
  5. Notifications

    • Email alerts on failures
    • Slack/Teams integration
    • Dashboard for monitoring
  6. Advanced Archiving

    • Configurable retention periods
    • Automatic cleanup of old archives
    • Compression options
  7. Performance Optimizations

    • Parquet file format support
    • Snowflake COPY optimization
    • Batch size tuning

📊 Performance Metrics

Current Throughput

Based on POC with 12 tables (489 total rows):

  • Extraction: ~489 rows in <10 seconds
  • Load: ~489 rows in <30 seconds
  • Validation: ~489 rows in <10 seconds
  • Total Pipeline: ~60 seconds

Scalability

Expected performance (production):

  • 1 million rows: ~5-10 minutes
  • 10 million rows: ~30-60 minutes
  • With parallelization: 50-70% reduction

📝 License

Proprietary Software License Agreement

Copyright © 2026 Stephen Stephan. All Rights Reserved.


1. Grant of License

This Employment ELT Framework (the "Software") is proprietary and confidential. The copyright holder, Stephen Stephan ("Licensor"), retains all rights, title, and interest in and to the Software, including all intellectual property rights therein.

2. Permitted Use

Personal Use: You may use the Software for personal, educational, or portfolio demonstration purposes without charge.

Professional/Commercial Use: Any professional, commercial, or business use of the Software, including but not limited to:

  • Use within a corporate or organizational environment
  • Integration into commercial products or services
  • Use for client projects or consulting work
  • Deployment in production systems for business purposes
  • Any use that generates revenue or business value

REQUIRES PRIOR WRITTEN PERMISSION from the Licensor.

3. Permission Request Process

To request permission for professional or commercial use:

  1. Contact: Submit a written request to Stephen Stephan
  2. Information Required:
    • Intended use case and scope
    • Organization/company name
    • Estimated duration of use
    • Number of users/deployments
    • Description of business application
  3. Review: Licensor will review within 15 business days
  4. Terms: If approved, specific terms and licensing fees (if applicable) will be negotiated

4. Restrictions

You may NOT, without express written permission:

  • ❌ Use the Software for commercial purposes
  • ❌ Distribute, sublicense, or sell the Software or derivatives
  • ❌ Remove or modify copyright notices or this license
  • ❌ Claim authorship or ownership of the Software
  • ❌ Use the Software in any manner that competes with the Licensor's business interests
  • ❌ Reverse engineer, decompile, or disassemble the Software (except as permitted by law)
  • ❌ Create derivative works for commercial purposes without written consent

5. Attribution

If you reference, demonstrate, or discuss this Software in any public forum, presentation, or documentation, you must:

  • ✅ Credit "Stephen Stephan" as the original author
  • ✅ Include a link to the original repository (if publicly available)
  • ✅ Clearly indicate any modifications made to the original Software

6. Warranty Disclaimer

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NONINFRINGEMENT. IN NO EVENT SHALL THE LICENSOR BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, OUT OF, OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

7. Limitation of Liability

IN NO EVENT SHALL THE LICENSOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

8. Termination

This license is effective until terminated. Your rights under this license will terminate automatically without notice if you fail to comply with any term of this license. Upon termination, you must destroy all copies of the Software in your possession.

9. Governing Law

This license shall be governed by and construed in accordance with the laws of the jurisdiction in which the Licensor resides, without regard to its conflict of law provisions.

10. Entire Agreement

This license constitutes the entire agreement between you and the Licensor regarding the Software and supersedes all prior or contemporaneous understandings regarding such subject matter.


TL;DR (Summary - Not Legally Binding)

  • Personal/Portfolio Use: FREE - Go ahead!
  • Learning/Education: FREE - Study and learn from it
  • Code Review/Interview: FREE - Show it off!
  • ⚠️ Professional/Work Use: REQUIRES PERMISSION - Contact Stephen first
  • ⚠️ Commercial Products: REQUIRES PERMISSION - Let's talk licensing
  • Redistribution: NOT ALLOWED without permission
  • 📧 Questions?: Reach out for clarification

By using this Software, you acknowledge that you have read this license agreement, understand it, and agree to be bound by its terms and conditions.


For licensing inquiries or permission requests, please contact Stephen Stephan.


👤 Author

Stephen Stephan


🙏 Acknowledgments

Built with:

  • Python 3.13
  • SQL Server 2019
  • Snowflake Enterprise
  • Love for clean architecture and bulletproof data pipelines

📞 Support

For issues or questions:

  1. Check the Troubleshooting section
  2. Review SQL Server ELT_CONTROL_LOG for error messages
  3. Enable verbose logging in Python scripts
  4. Check archived CSV files for data inspection

Remember: This framework proves data integrity through dual-phase validation. Always verify Rows_Extracted = Rows_Loaded in your audit logs! 🎯


Last Updated: 2026-01-29

About

A metadata‑driven, audit‑ready ELT framework for SQL Server → Snowflake migration.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published