An automated, serverless data pipeline designed to ingest, validate, and aggregate data. This project implements a "Human-in-the-Loop" architecture where invalid data is automatically quarantined, fixed via a UI, and re-injected into the pipeline without code changes.
๐ฎ Live Portfolio Demo
Note to Viewer: The "Real App" runs locally and is not deployed. This live demo uses Interface Abstraction to simulate Azure Blob Storage in memory, ensuring no connection to real cloud infrastructure.
flowchart TB
subgraph USER["๐ค User Interface (Streamlit)"]
UPLOAD["๐ค Upload CSV"]
FIX["๐ ๏ธ Fix Quarantine"]
DELETE_UPLOAD["๐๏ธ Upload Deletion Request"]
TRIGGER_PIPE["โถ๏ธ Trigger Pipeline"]
TRIGGER_DEL["โถ๏ธ Trigger Deletion"]
MONITOR["๐ Auto-Monitor<br/>(Fragments @ 15s)"]
end
subgraph GITHUB["๐ GitHub Actions"]
DISPATCH_PIPE["workflow_dispatch"]
DISPATCH_DEL["workflow_dispatch"]
WEEKLY["weekly_pipeline.yaml<br/>(Cron: Weekly)"]
DELWF["delete_records.yaml<br/>(Manual Only)"]
end
subgraph PIPELINE["โ๏ธ ETL Pipeline"]
VALIDATE["Pydantic Validation"]
ROUTE{{"Route Data"}}
UPSERT["Upsert Valid Records"]
QUARANTINE_OP["Quarantine Invalid"]
DELETE_OP["Remove Records"]
end
subgraph AZURE["โ๏ธ Azure Blob Storage"]
LANDING[("๐ landing-zone<br/>(Raw CSVs)")]
QUAR[("๐จ quarantine<br/>(Failed Validation)")]
DATA[("๐ data<br/>(Partitioned Parquet)")]
LOGS[("๐ logs<br/>(Execution History)")]
DELREQ[("๐๏ธ deletion-requests<br/>(Pending Deletions)")]
end
subgraph OUTPUT["๐ Outputs"]
REPORT["final_cdc_export.csv"]
AUDIT["Audit Trail"]
end
%% User actions
UPLOAD --> LANDING
FIX --> LANDING
DELETE_UPLOAD --> DELREQ
TRIGGER_PIPE --> DISPATCH_PIPE
TRIGGER_DEL --> DISPATCH_DEL
%% GitHub triggers
DISPATCH_PIPE --> WEEKLY
DISPATCH_DEL --> DELWF
%% Pipeline flows
WEEKLY --> VALIDATE
LANDING --> VALIDATE
VALIDATE --> ROUTE
ROUTE -->|"โ
Valid"| UPSERT
ROUTE -->|"โ Invalid"| QUARANTINE_OP
UPSERT --> DATA
QUARANTINE_OP --> QUAR
UPSERT --> LOGS
%% Deletion flow
DELWF --> DELETE_OP
DELREQ --> DELETE_OP
DELETE_OP --> DATA
DELETE_OP --> LOGS
%% Outputs
DATA --> REPORT
LOGS --> AUDIT
%% Monitoring loop
WEEKLY -.->|"Status API"| MONITOR
DELWF -.->|"Status API"| MONITOR
MONITOR -.->|"Poll"| GITHUB
%% Quarantine loop
QUAR -.->|"Human Review"| FIX
%% Styling
classDef userStyle fill:#e1f5fe,stroke:#01579b
classDef githubStyle fill:#f3e5f5,stroke:#4a148c
classDef pipelineStyle fill:#fff3e0,stroke:#e65100
classDef azureStyle fill:#e3f2fd,stroke:#0d47a1
classDef outputStyle fill:#e8f5e9,stroke:#1b5e20
class UPLOAD,FIX,DELETE,TRIGGER,MONITOR userStyle
class DISPATCH,WEEKLY,DELWF githubStyle
class VALIDATE,ROUTE,UPSERT,QUARANTINE_OP,DELETE_OP pipelineStyle
class LANDING,QUAR,DATA,LOGS,DELREQ azureStyle
class REPORT,AUDIT outputStyle
- Landing Zone: User uploads raw CSVs via Streamlit Admin Console โ
landing-zonecontainer. - Automated Processing (GitHub Actions):
weekly_pipeline.yamltriggers on schedule or manual dispatch.- Validation:
Pydanticenforces strict schema (sample_id, test_date, result, viral_load). - Routing: Valid data โ
datacontainer (partitioned Parquet by week). Invalid data โquarantinecontainer (CSV). - Logging: Emoji-rich processing logs with detailed metrics saved to
logscontainer asexecution_TIMESTAMP.csv.
- Validation:
- Quarantine Resolution: Admins review errors in UI, fix data (e.g., "Positive" โ "POS"), reupload to
landing-zonefor automatic reprocessing. - Deletion Workflow: Two-step process for permanent record removal:
- Upload deletion request CSV (sample_id + test_date) โ
deletion-requestscontainer - Trigger
delete_records.yamlGitHub Action โ removes from partitioned data - Logs deleted sample IDs to
logscontainer asdeletion_TIMESTAMP.csv
- Upload deletion request CSV (sample_id + test_date) โ
- Reporting: Aggregated clean data exported to
final_cdc_export.csvwith complete audit trail.
- landing-zone: Raw CSV uploads from partners
- quarantine: Invalid records awaiting manual review
- data: Validated records in partitioned Parquet format (year=YYYY/week=WW/)
- logs: Processing and deletion execution logs (CSV with processing_details)
- deletion-requests: Pending deletion requests (CSV with sample_id and test_date)
.
โโโ .github/workflows/
โ โโโ weekly_pipeline.yaml # Scheduled pipeline automation (production)
โ โโโ delete_records.yaml # Manual deletion workflow trigger
โโโ admin_tools/
โ โโโ demo_app.py # ๐ฎ THE DEMO APP (Public Portfolio Frontend)
โ โโโ web_uploader.py # ๐ THE REAL APP (Local Production Admin Console)
โ โโโ mock_azure.py # Cloud Emulation Logic for Demo
โ โโโ fetch_errors.py # Utility: Download quarantine files
โ โโโ reingest_fixed_data.py # Utility: Re-upload fixed data
โ โโโ generate_and_upload_mock_data.py # Utility: Generate test data
โ โโโ test_connection.py # Utility: Test Azure connection
โโโ pipeline/
โ โโโ process_data_cloud.py # Core ETL Logic (Polars + Pydantic)
โ โโโ export_report.py # Generate final CDC aggregate report
โ โโโ delete_records.py # Process deletion requests from CSV
โโโ models.py # Pydantic Schema Definitions
โโโ pyproject.toml # Project dependencies (uv)
โโโ README.md
Most pipelines crash on bad data. This one side-steps it.
- Polars + Pydantic: Used for fast validation and data integrity.
- Quarantine: Invalid files move to
quarantine/and wait for human review. - Human-in-the-Loop: The Admin Console provides an Excel-like editor to fix the typo and retry.
- Detailed Logging: Every pipeline run saves comprehensive logs with emoji-rich processing details for easy debugging.
A dedicated workflow for data corrections:
- Two-Step Process: Upload deletion requests (CSV with sample_id and test_date), then trigger workflow.
- Partition-Aware: Automatically calculates which partitions to check based on test dates.
- Audit Trail: Logs which sample IDs were deleted from which partitions with full timestamp tracking.
- GitHub Actions Integration: Secure, authenticated deletion via automated workflow.
The Admin Console provides live status updates without full page reloads:
@st.fragment(run_every="15s"): Uses Streamlit's fragment feature to auto-poll GitHub Actions API every 15 seconds while a workflow is running.- Smart Workflow Detection: Compares UTC timestamps to distinguish between old runs and newly-triggered workflows, preventing false "success" messages from stale data.
- Session State Persistence: Stores workflow results in
st.session_stateso status messages persist across page interactions. - Auto-Stop Monitoring: Automatically stops polling when workflow completes (success, failure, or cancelled) and displays final status.
- Seamless UX: Users can trigger a pipeline, navigate to other tabs, and return to see live progress or final results.
To share this project publicly without exposing Azure credentials, I implemented a Mock Object Pattern:
- Interface Abstraction: The
mock_azure.pyclass perfectly mirrors the official Azure SDK methods (upload_blob,download_blob,list_blobs). - Safety: The Demo App injects these mock clients instead of real Azure clients, ensuring the full UI workflow runs safely in the browser's memory.
The Streamlit Admin Console provides a complete self-service interface:
| Page | Purpose |
|---|---|
| ๐ Start Here | Landing page with workflow diagrams and navigation guidance |
| ๐ค Upload New Data | Drag-and-drop CSV upload to landing zone with file preview |
| ๐ ๏ธ Fix Quarantine | Excel-like data editor to review and correct validation errors |
| ๐๏ธ Delete Records | Upload deletion requests and trigger deletion workflow |
| โ๏ธ Data Ingestion | Trigger pipeline, monitor progress, view execution history |
| ๐ Final Report | View/download the aggregated CDC export with all valid records |
| โน๏ธ About | Project context, technical implementation, and author info |
Simply visit the Live App. No setup required.
Note: Requires active Azure Credentials in .env
- Install dependencies:
uv sync
- Run the Admin Console:
uv run streamlit run admin_tools/web_uploader.py
- Trigger the Pipeline: Click the "Trigger Weekly Pipeline" button in the sidebar (requires GitHub Token) to process the files you upload.
To run the full production pipeline, you must establish a secure connection between Azure Storage and GitHub Actions.
Create a Storage Account and the following private containers, which serve as the structure for your Data Lake:
| Container Name | Purpose |
|---|---|
| landing-zone | Receives raw uploaded CSVs from the Admin Console. |
| quarantine | Holds CSV files that failed Pydantic validation (for human review). |
| data | Stores the finalized, cleaned data in partitioned parquet files. |
| logs | Stores execution and deletion logs as CSV files for audit trail. |
| deletion-requests | Holds pending deletion request CSVs before processing. |
The GitHub Action needs a Service Principal (SP) to act as the "robot" with specific access rights. This is the most secure way to grant CI/CD access to your cloud resources.
Run the Azure CLI command below to generate the necessary credentials:
az ad sp create-for-rbac \
--name "GitHubPipelineRobot" \
--role "Storage Blob Data Contributor" \
--scopes /subscriptions/YOUR_SUBSCRIPTION_ID/resourceGroups/YOUR_RESOURCE_GROUPThis command will output three values that you must save:
appId(This is your AZURE_CLIENT_ID)password(This is your AZURE_CLIENT_SECRET)tenant(This is your AZURE_TENANT_ID)
Navigate to your repository settings on GitHub (Settings Secrets and variables Actions). Add the following repository secrets based on the values you generated above:
| Secret Name | Source / Value | Used By |
|---|---|---|
AZURE_CLIENT_ID |
The appId value from the CLI. |
GitHub Actions workflows |
AZURE_CLIENT_SECRET |
The password value from the CLI. |
GitHub Actions workflows |
AZURE_TENANT_ID |
The tenant value from the CLI. |
GitHub Actions workflows |
AZURE_STORAGE_ACCOUNT |
Your storage account name (e.g., PHData01). |
GitHub Actions workflows |
For the Streamlit apps to authenticate with Azure and trigger/monitor GitHub Actions, create a .env file in the project root with these variables:
# Azure Authentication (same values as GitHub secrets)
AZURE_CLIENT_ID=<your_appId>
AZURE_CLIENT_SECRET=<your_password>
AZURE_TENANT_ID=<your_tenant>
AZURE_STORAGE_ACCOUNT=<your_storage_account_name>
# GitHub API Access (for triggering workflows from Streamlit)
GITHUB_TOKEN=<your_personal_access_token>
REPO_OWNER=<your_github_username>
REPO_NAME=<your_repository_name>Note: The GITHUB_TOKEN must be a Personal Access Token (PAT) with workflow scope to trigger Actions and read run status.
| Secret Name | Source / Value | Used By |
|---|---|---|
GITHUB_TOKEN |
A Fine-Grained PAT with actions:write scope. | Workflow Trigger Buttons |
REPO_OWNER |
Your GitHub username (e.g., thedbcooper). |
Workflow Trigger Buttons |
REPO_NAME |
Your repository name (e.g., pipeline-proof-concept). |
Workflow Trigger Buttons |
Epidemiologist & Analytics Engineer