- Overview
- Architecture Summary
- Dataset Description
- Step 1: Create S3 Structure
- Step 2: AWS Glue Catalog and Crawler
- Step 3: Create Athena Database
- Step 4: Create Raw Table
- Step 5: Convert & Aggregate Data into Parquet
- Step 6: Verify in Athena
- Step 7: Glue Crawler for Processed Data
- Step 8: Streaming and Lambda Components
- Step 9: SageMaker Integration
./ml/* - Folder Guides
- Bibliography
This repository documents an AWS-based analytics pipeline for processing Bitcoin (BTC/USD) pricing data. The workflow converts raw a CSV stored in Amazon S3 into partitioned Parquet datasets optimized for Athena queries and QuickSight visualization, having also the capability of near real time ingestion. ( mini-batch )
Data Flow:
S3 (raw btcusd.csv / stream/)
↓
AWS Glue Crawler → Glue Data Catalog
↓
AWS Athena (SQL Transformations)
↓
S3 (processed_partitioned/year=YYYY/) -> SageMaker (Training + Inference)
↓
QuickSight (Dashboards)
Each CSV file represents historical BTC/USD price ticks.
| Column | Type | Description |
|---|---|---|
| time | BIGINT | Unix timestamp in milliseconds |
| open | DOUBLE | Opening price |
| close | DOUBLE | Closing price |
| high | DOUBLE | Highest price |
| low | DOUBLE | Lowest price |
| volume | DOUBLE | Traded volume |
Each CSV file represents a minute-level BTC/USD price tick written by the streaming Lambda.
| Column | Type | Description |
|---|---|---|
| epoch_ms | BIGINT | Unix timestamp in milliseconds |
| iso_ts | STRING | ISO 8601 UTC timestamp |
| price_usd | DOUBLE | Current BTC price in USD |
| source | STRING | Data source (e.g. coingecko) |
s3://<your-bucket-name>/raw/
s3://<your-bucket-name>/processed_partitioned/
s3://<your-bucket-name>/quality_reports/
s3://<your-bucket-name>/models/volatility_model/
Upload your BTC CSV files into the raw/ folder for batch ingestion.
For streaming ingestion, Lambda automatically writes CSVs into raw/stream/ every minute.
- Open AWS Glue → Crawlers → Create crawler
- Name:
<your-bucket-name> - Source type: S3
- Include path:
s3://<your-bucket-name>/raw/
- Include path:
- Choose IAM Role:
AWSGlueServiceRoleDefault - Output Database:
your-database-name - Run crawler (the type should be on demand).
This automatically creates the table parquet_raw in the Glue Data Catalog.
CREATE DATABASE IF NOT EXISTS <yourdbname>
COMMENT 'BTC/USD historical analytics database'
LOCATION 's3://<your-bucket-name>/';CREATE EXTERNAL TABLE IF NOT EXISTS <yourdbname>.parquet_raw (
time bigint,
open double,
close double,
high double,
low double,
volume double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"escapeChar" = "\\",
"quoteChar" = "\""
)
LOCATION 's3://your-bucket-name/raw/'
TBLPROPERTIES ('skip.header.line.count'='1');CREATE EXTERNAL TABLE IF NOT EXISTS <yourdbname>.processed_partitioned (
day date,
avg_price_usd double,
min_price_usd double,
max_price_usd double
)
PARTITIONED BY (year int)
STORED AS PARQUET
LOCATION 's3://your-bucket-name/processed_partitioned/'
TBLPROPERTIES ('parquet.compression'='SNAPPY');Then aggregate from raw to daily summary:
INSERT INTO <yourdbname>.processed_partitioned
SELECT
date(from_unixtime(time/1000)) AS day,
avg(close) AS avg_price_usd,
min(low) AS min_price_usd,
max(high) AS max_price_usd,
year(from_unixtime(time/1000)) AS year
FROM <yourdbname>.parquet_raw
GROUP BY 1,5
ORDER BY 1;Repair partitions:
MSCK REPAIR TABLE <yourdbname>.processed_partitioned;SELECT * FROM <yourdbname>.processed_partitioned LIMIT 10;Expected columns:
day | avg_price_usd | min_price_usd | max_price_usd | year
Create another crawler btcprice-processed-crawler:
- Source:
s3://your-bucket-name/processed_partitioned/ - Database:
<yourdbname> - Schedule: daily (optional)
Run crawler → updates partition metadata automatically.
- File:
stream/stream.py - Trigger: Runs every minute
- Purpose: Writes BTC/USD minute-level CSV data to S3 (
raw/stream/)
- File:
stream/parquet_convert.py - Trigger: Daily scheduled Lambda
- Purpose: Converts daily mini-batch CSVs into partitioned Parquet files.
- Features:
- Handles duplicates and missing columns
- Detects missing raw days and reprocesses
- Writes output to
processed_partitioned/ - Repairs Athena partitions automatically
- File:
quality_assurance/script.py - Type: Glue ETL Job
- Trigger: Runs after daily parquet conversion
- Purpose: Validates data quality for the latest raw stream
- Checks schema, nulls, negatives, duplicates
- Writes daily validation reports to
s3://<your-bucket-name>/quality_reports/
SageMaker provides a continuous ML pipeline for volatility forecasting.
- Loads processed data from S3 (
processed_partitioned/) - Cleans and merges multi-year data
- Trains a Random Forest model predicting daily volatility
- Saves artifacts to S3 (
models/volatility_model/):volatility_model.joblibmetrics.jsontraining_manifest.jsonl.gz
Reference: AWS SageMaker Processing Jobs
- Triggered by new data via S3 → EventBridge → Lambda
- Launches a new Processing Job (
training_job.py) - Produces versioned model artifacts under
models/volatility_model/
Reference: Automating SageMaker with EventBridge
- Creates a new model from the latest S3 artifact
- Builds a new endpoint configuration
- Updates the endpoint to serve the latest model
- Waits until
InService - Optionally cleans up older models/configs
Trigger options:
- SageMaker job completion via EventBridge
- S3 upload to
models/volatility_model/
Reference: SageMaker Real-Time Endpoints
import boto3, json
runtime = boto3.client("sagemaker-runtime")
payload = {"avg_price_usd": [29500], "min_price_usd": [28900], "max_price_usd": [30000], "year": [2025]}
response = runtime.invoke_endpoint(
EndpointName="btc-volatility-endpoint",
ContentType="application/json",
Body=json.dumps(payload)
)
print(response["Body"].read().decode())- Logs MAE, RMSE, R² and model metadata
- Metrics stored at
s3://<your-bucket-name>/models/volatility_model/metrics.json - Viewable in QuickSight or CloudWatch
ml/— ML pipeline and endpoint: ml/README.mdathena/— SQL setup and helpers: athena/README.mdstream/— ingestion and parquet conversion: stream/README.mdquality_assurance/— data checks: quality_assurance/README.mddataset/— sample data: dataset/README.mdquicksight/— dashboard assets: quicksight/README.md