Skip to content

thedbcooper/pipeline-proof-concept

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

109 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

๐Ÿงฌ Public Health Data Pipeline (Lightweight Data Lakehouse)

Python Azure Streamlit Polars Pydantic GitHub Actions

An automated, serverless data pipeline designed to ingest, validate, and aggregate data. This project implements a "Human-in-the-Loop" architecture where invalid data is automatically quarantined, fixed via a UI, and re-injected into the pipeline without code changes.

Note to Viewer: The "Real App" runs locally and is not deployed. This live demo uses Interface Abstraction to simulate Azure Blob Storage in memory, ensuring no connection to real cloud infrastructure.


๐Ÿ—๏ธ Architecture

Pipeline Workflow

flowchart TB
    subgraph USER["๐Ÿ‘ค User Interface (Streamlit)"]
        UPLOAD["๐Ÿ“ค Upload CSV"]
        FIX["๐Ÿ› ๏ธ Fix Quarantine"]
        DELETE_UPLOAD["๐Ÿ—‘๏ธ Upload Deletion Request"]
        TRIGGER_PIPE["โ–ถ๏ธ Trigger Pipeline"]
        TRIGGER_DEL["โ–ถ๏ธ Trigger Deletion"]
        MONITOR["๐Ÿ“Š Auto-Monitor<br/>(Fragments @ 15s)"]
    end

    subgraph GITHUB["๐Ÿ™ GitHub Actions"]
        DISPATCH_PIPE["workflow_dispatch"]
        DISPATCH_DEL["workflow_dispatch"]
        WEEKLY["weekly_pipeline.yaml<br/>(Cron: Weekly)"]
        DELWF["delete_records.yaml<br/>(Manual Only)"]
    end

    subgraph PIPELINE["โš™๏ธ ETL Pipeline"]
        VALIDATE["Pydantic Validation"]
        ROUTE{{"Route Data"}}
        UPSERT["Upsert Valid Records"]
        QUARANTINE_OP["Quarantine Invalid"]
        DELETE_OP["Remove Records"]
    end

    subgraph AZURE["โ˜๏ธ Azure Blob Storage"]
        LANDING[("๐Ÿ“‚ landing-zone<br/>(Raw CSVs)")]
        QUAR[("๐Ÿšจ quarantine<br/>(Failed Validation)")]
        DATA[("๐Ÿ“Š data<br/>(Partitioned Parquet)")]
        LOGS[("๐Ÿ“‹ logs<br/>(Execution History)")]
        DELREQ[("๐Ÿ—‘๏ธ deletion-requests<br/>(Pending Deletions)")]
    end

    subgraph OUTPUT["๐Ÿ“ˆ Outputs"]
        REPORT["final_cdc_export.csv"]
        AUDIT["Audit Trail"]
    end

    %% User actions
    UPLOAD --> LANDING
    FIX --> LANDING
    DELETE_UPLOAD --> DELREQ
    TRIGGER_PIPE --> DISPATCH_PIPE
    TRIGGER_DEL --> DISPATCH_DEL

    %% GitHub triggers
    DISPATCH_PIPE --> WEEKLY
    DISPATCH_DEL --> DELWF
    
    %% Pipeline flows
    WEEKLY --> VALIDATE
    LANDING --> VALIDATE
    VALIDATE --> ROUTE
    ROUTE -->|"โœ… Valid"| UPSERT
    ROUTE -->|"โŒ Invalid"| QUARANTINE_OP
    UPSERT --> DATA
    QUARANTINE_OP --> QUAR
    UPSERT --> LOGS
    
    %% Deletion flow
    DELWF --> DELETE_OP
    DELREQ --> DELETE_OP
    DELETE_OP --> DATA
    DELETE_OP --> LOGS
    
    %% Outputs
    DATA --> REPORT
    LOGS --> AUDIT

    %% Monitoring loop
    WEEKLY -.->|"Status API"| MONITOR
    DELWF -.->|"Status API"| MONITOR
    MONITOR -.->|"Poll"| GITHUB

    %% Quarantine loop
    QUAR -.->|"Human Review"| FIX

    %% Styling
    classDef userStyle fill:#e1f5fe,stroke:#01579b
    classDef githubStyle fill:#f3e5f5,stroke:#4a148c
    classDef pipelineStyle fill:#fff3e0,stroke:#e65100
    classDef azureStyle fill:#e3f2fd,stroke:#0d47a1
    classDef outputStyle fill:#e8f5e9,stroke:#1b5e20

    class UPLOAD,FIX,DELETE,TRIGGER,MONITOR userStyle
    class DISPATCH,WEEKLY,DELWF githubStyle
    class VALIDATE,ROUTE,UPSERT,QUARANTINE_OP,DELETE_OP pipelineStyle
    class LANDING,QUAR,DATA,LOGS,DELREQ azureStyle
    class REPORT,AUDIT outputStyle
Loading

Core Data Flow

  1. Landing Zone: User uploads raw CSVs via Streamlit Admin Console โ†’ landing-zone container.
  2. Automated Processing (GitHub Actions): weekly_pipeline.yaml triggers on schedule or manual dispatch.
    • Validation: Pydantic enforces strict schema (sample_id, test_date, result, viral_load).
    • Routing: Valid data โ†’ data container (partitioned Parquet by week). Invalid data โ†’ quarantine container (CSV).
    • Logging: Emoji-rich processing logs with detailed metrics saved to logs container as execution_TIMESTAMP.csv.
  3. Quarantine Resolution: Admins review errors in UI, fix data (e.g., "Positive" โ†’ "POS"), reupload to landing-zone for automatic reprocessing.
  4. Deletion Workflow: Two-step process for permanent record removal:
    • Upload deletion request CSV (sample_id + test_date) โ†’ deletion-requests container
    • Trigger delete_records.yaml GitHub Action โ†’ removes from partitioned data
    • Logs deleted sample IDs to logs container as deletion_TIMESTAMP.csv
  5. Reporting: Aggregated clean data exported to final_cdc_export.csv with complete audit trail.

Storage Containers

  • landing-zone: Raw CSV uploads from partners
  • quarantine: Invalid records awaiting manual review
  • data: Validated records in partitioned Parquet format (year=YYYY/week=WW/)
  • logs: Processing and deletion execution logs (CSV with processing_details)
  • deletion-requests: Pending deletion requests (CSV with sample_id and test_date)

๐Ÿ“‚ Repository Structure

.
โ”œโ”€โ”€ .github/workflows/
โ”‚   โ”œโ”€โ”€ weekly_pipeline.yaml      # Scheduled pipeline automation (production)
โ”‚   โ””โ”€โ”€ delete_records.yaml       # Manual deletion workflow trigger
โ”œโ”€โ”€ admin_tools/
โ”‚   โ”œโ”€โ”€ demo_app.py               # ๐ŸŽฎ THE DEMO APP (Public Portfolio Frontend)
โ”‚   โ”œโ”€โ”€ web_uploader.py           # ๐Ÿ”’ THE REAL APP (Local Production Admin Console)
โ”‚   โ”œโ”€โ”€ mock_azure.py             # Cloud Emulation Logic for Demo
โ”‚   โ”œโ”€โ”€ fetch_errors.py           # Utility: Download quarantine files
โ”‚   โ”œโ”€โ”€ reingest_fixed_data.py    # Utility: Re-upload fixed data
โ”‚   โ”œโ”€โ”€ generate_and_upload_mock_data.py  # Utility: Generate test data
โ”‚   โ””โ”€โ”€ test_connection.py        # Utility: Test Azure connection
โ”œโ”€โ”€ pipeline/
โ”‚   โ”œโ”€โ”€ process_data_cloud.py     # Core ETL Logic (Polars + Pydantic)
โ”‚   โ”œโ”€โ”€ export_report.py          # Generate final CDC aggregate report
โ”‚   โ””โ”€โ”€ delete_records.py         # Process deletion requests from CSV
โ”œโ”€โ”€ models.py                     # Pydantic Schema Definitions
โ”œโ”€โ”€ pyproject.toml                # Project dependencies (uv)
โ””โ”€โ”€ README.md

๐ŸŒŸ Key Features

1. "Self-Healing" Data Quality

Most pipelines crash on bad data. This one side-steps it.

  • Polars + Pydantic: Used for fast validation and data integrity.
  • Quarantine: Invalid files move to quarantine/ and wait for human review.
  • Human-in-the-Loop: The Admin Console provides an Excel-like editor to fix the typo and retry.
  • Detailed Logging: Every pipeline run saves comprehensive logs with emoji-rich processing details for easy debugging.

2. Data Deletion Workflow

A dedicated workflow for data corrections:

  • Two-Step Process: Upload deletion requests (CSV with sample_id and test_date), then trigger workflow.
  • Partition-Aware: Automatically calculates which partitions to check based on test dates.
  • Audit Trail: Logs which sample IDs were deleted from which partitions with full timestamp tracking.
  • GitHub Actions Integration: Secure, authenticated deletion via automated workflow.

3. Real-Time Workflow Monitoring with Streamlit Fragments

The Admin Console provides live status updates without full page reloads:

  • @st.fragment(run_every="15s"): Uses Streamlit's fragment feature to auto-poll GitHub Actions API every 15 seconds while a workflow is running.
  • Smart Workflow Detection: Compares UTC timestamps to distinguish between old runs and newly-triggered workflows, preventing false "success" messages from stale data.
  • Session State Persistence: Stores workflow results in st.session_state so status messages persist across page interactions.
  • Auto-Stop Monitoring: Automatically stops polling when workflow completes (success, failure, or cancelled) and displays final status.
  • Seamless UX: Users can trigger a pipeline, navigate to other tabs, and return to see live progress or final results.

4. Cloud Abstraction (Security Highlight)

To share this project publicly without exposing Azure credentials, I implemented a Mock Object Pattern:

  • Interface Abstraction: The mock_azure.py class perfectly mirrors the official Azure SDK methods (upload_blob, download_blob, list_blobs).
  • Safety: The Demo App injects these mock clients instead of real Azure clients, ensuring the full UI workflow runs safely in the browser's memory.

๐Ÿ–ฅ๏ธ Admin Console Pages

The Streamlit Admin Console provides a complete self-service interface:

Page Purpose
๐Ÿ  Start Here Landing page with workflow diagrams and navigation guidance
๐Ÿ“ค Upload New Data Drag-and-drop CSV upload to landing zone with file preview
๐Ÿ› ๏ธ Fix Quarantine Excel-like data editor to review and correct validation errors
๐Ÿ—‘๏ธ Delete Records Upload deletion requests and trigger deletion workflow
โš™๏ธ Data Ingestion Trigger pipeline, monitor progress, view execution history
๐Ÿ“Š Final Report View/download the aggregated CDC export with all valid records
โ„น๏ธ About Project context, technical implementation, and author info

๐Ÿš€ How to Run

Option A: The Portfolio Demo (Browser)

Simply visit the Live App. No setup required.

Option B: The Real Production App (Local)

Note: Requires active Azure Credentials in .env

  1. Install dependencies:
    uv sync
  2. Run the Admin Console:
    uv run streamlit run admin_tools/web_uploader.py
  3. Trigger the Pipeline: Click the "Trigger Weekly Pipeline" button in the sidebar (requires GitHub Token) to process the files you upload.

๐Ÿš€ Deployment Prerequisites (Required for Real App & Pipeline)

To run the full production pipeline, you must establish a secure connection between Azure Storage and GitHub Actions.

1. Azure Storage Setup ๐ŸŸฆ

Create a Storage Account and the following private containers, which serve as the structure for your Data Lake:

Container Name Purpose
landing-zone Receives raw uploaded CSVs from the Admin Console.
quarantine Holds CSV files that failed Pydantic validation (for human review).
data Stores the finalized, cleaned data in partitioned parquet files.
logs Stores execution and deletion logs as CSV files for audit trail.
deletion-requests Holds pending deletion request CSVs before processing.

2. Generating Secrets (Service Principal) ๐Ÿ”‘

The GitHub Action needs a Service Principal (SP) to act as the "robot" with specific access rights. This is the most secure way to grant CI/CD access to your cloud resources.

Run the Azure CLI command below to generate the necessary credentials:

az ad sp create-for-rbac \
--name "GitHubPipelineRobot" \
--role "Storage Blob Data Contributor" \
--scopes /subscriptions/YOUR_SUBSCRIPTION_ID/resourceGroups/YOUR_RESOURCE_GROUP

This command will output three values that you must save:

  • appId (This is your AZURE_CLIENT_ID)
  • password (This is your AZURE_CLIENT_SECRET)
  • tenant (This is your AZURE_TENANT_ID)

3. Adding Secrets to GitHub Actions ๐Ÿ™

Navigate to your repository settings on GitHub (Settings $\rightarrow$ Secrets and variables $\rightarrow$ Actions). Add the following repository secrets based on the values you generated above:

Secret Name Source / Value Used By
AZURE_CLIENT_ID The appId value from the CLI. GitHub Actions workflows
AZURE_CLIENT_SECRET The password value from the CLI. GitHub Actions workflows
AZURE_TENANT_ID The tenant value from the CLI. GitHub Actions workflows
AZURE_STORAGE_ACCOUNT Your storage account name (e.g., PHData01). GitHub Actions workflows

4. Configuring Local Environment (.env) ๐Ÿ”ง

For the Streamlit apps to authenticate with Azure and trigger/monitor GitHub Actions, create a .env file in the project root with these variables:

# Azure Authentication (same values as GitHub secrets)
AZURE_CLIENT_ID=<your_appId>
AZURE_CLIENT_SECRET=<your_password>
AZURE_TENANT_ID=<your_tenant>
AZURE_STORAGE_ACCOUNT=<your_storage_account_name>

# GitHub API Access (for triggering workflows from Streamlit)
GITHUB_TOKEN=<your_personal_access_token>
REPO_OWNER=<your_github_username>
REPO_NAME=<your_repository_name>

Note: The GITHUB_TOKEN must be a Personal Access Token (PAT) with workflow scope to trigger Actions and read run status.

Secret Name Source / Value Used By
GITHUB_TOKEN A Fine-Grained PAT with actions:write scope. Workflow Trigger Buttons
REPO_OWNER Your GitHub username (e.g., thedbcooper). Workflow Trigger Buttons
REPO_NAME Your repository name (e.g., pipeline-proof-concept). Workflow Trigger Buttons

๐Ÿ‘จโ€๐Ÿ’ป Created by Daniel Cooper

LinkedIn GitHub ORCID

Epidemiologist & Analytics Engineer