-
Sparkify startup requested to automate and monitor their data warehouse ETL pipelines.
-
Using Apache Airflow, Extract, Transform and Load (ETL) pipeline extracts JSON data files from Amazon Simple Storage Service (S3) data storage, processes and loads them into a star schema relational database on Amazon Redshift data warehouse.
-
Data quality tests run after the ETL steps have been executed to catch any discrepancies in the datasets.
Data for song and user activities reside in S3 as JSON files:
- Song data: s3://udacity-dend/song_data
- Log data: s3://udacity-dend/log_data
- Log data json path: s3://udacity-dend/log_json_path.json
- subset of real data from the Million Song Dataset.
- each file is in JSON format
- contains metadata about a song and the artist of that song.
- files are partitioned by the first three letters of each song's track ID.
- example of file paths to two files in song dataset.
song_data/A/B/C/TRABCEI128F424C983.json
song_data/A/A/B/TRAABJL12903CDCF1A.json
- example of single song file, TRAABJL12903CDCF1A.json:
{"num_songs": 1,
"artist_id": "ARJIE2Y1187B994AB7",
"artist_latitude": null,
"artist_longitude": null,
"artist_location": "",
"artist_name": "Line Renaud",
"song_id": "SOUPIRU12A6D4FA1E1",
"title": "Der Kleine Dompfaff",
"duration": 152.92036,
"year": 0}
- log files in JSON format generated by event simulator based on the songs in the song dataset.
- these simulate activity logs from a music streaming app based on specified configurations.
- log files are partitioned by year and month.
- example, here are filepaths to two files in log dataset.
log_data/2018/11/2018-11-12-events.json
log_data/2018/11/2018-11-13-events.json
- example of single log file, 2018-11-12-events.json:
{"artist":null,
"auth":"Logged In",
"firstName":"Celeste",
"gender":"F",
"itemInSession":0,
"lastName":"Williams",
"length":null,
"level":"free",
"location":"Klamath Falls, OR",
"method":"GET",
"page":"Home",
"registration":1541077528796.0,
"sessionId":438,
"song":null,
"status":200,
"ts":1541990217796,
"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/37.0.2062.103 Safari\/537.36\"",
"userId":"53"}
staging table for the song data
| Column | Type |
|---|---|
| num_songs | varchar |
| artist_id | varchar |
| latitude | float |
| longitude | float |
| location | varchar |
| name | varchar |
| song_id | varchar |
| title | varchar |
| duration | float |
| year | int |
staging table for the log data
| Column | Type |
|---|---|
| artist | varchar |
| auth | varchar |
| first_name | varchar |
| gender | char(1) |
| item_in_session | int |
| last_name | varchar |
| length | float |
| level | varchar |
| location | varchar |
| method | varchar |
| page | varchar |
| registration | varchar |
| session_id | int |
| song | varchar |
| status | int |
| ts | timestamp |
| user_agent | varchar |
| user_id | int |
- Star schema that contains 1 fact table (songplays) and 4 dimension tables (users, songs, artists and time)
records in log data associated with song plays i.e. records with page NextSong
| Column | Type | Nullable |
|---|---|---|
| songplay_id (PK) | SERIAL | NOT NULL |
| start_time | timestamp | NOT NULL |
| user_id | int | NOT NULL |
| level | varchar | |
| song_id | varchar | |
| artist_id | varchar | |
| session_id | int | |
| location | varchar | |
| user_agent | varchar |
- Distribution Style: KEY start_time
- Sorting key: start_time
users in the app
| Column | Type | Nullable |
|---|---|---|
| user_id (PK) | int | NOT NULL |
| first_name | varchar | |
| last_name | varchar | |
| gender | varchar | |
| level | varchar |
- Sorting key: user_id
songs in music database
| Column | Type | Nullable |
|---|---|---|
| song_id (PK) | varchar | NOT NULL |
| title | varchar | NOT NULL |
| artist_id | varchar | NOT NULL |
| year | int | |
| duration | numeric | NOT NULL |
- Sorting key: song_id
artists in music database
| Column | Type | Nullable |
|---|---|---|
| artist_id (PK) | varchar | NOT NULL |
| name | varchar | |
| location | varchar | |
| latitude | float | |
| longitude | float |
- Sorting key: artist_id
timestamps of records in songplays broken down into specific units
| Column | Type | Nullable |
|---|---|---|
| start_time (PK) | timestamp | NOT NULL |
| hour | int | |
| day | int | |
| week | int | |
| month | int | |
| year | int | |
| weekday | int |
- Distribution Style: KEY start_time
- Sorting key: start_time
├── create_tables.sql
├── dags
│ ├── create_tables_dag.py
│ └── process_data_dag.py
└── plugins
├── operators
│ ├── create_table.py
│ ├── stage_redshift.py
│ ├── load_fact.py
│ ├── load_dimension.py
│ └── data_quality.py
└── helpers
└── sql_queries.py
- contains SQL statements to create tables in Redshift.
- creates the tables in Redshift.
- performs tasks such as staging the data, filling the data warehouse, and running checks on the data as the final step.
- creates the tables.
- loads the JSON formatted files from S3 to Amazon Redshift.
- takes as input a SQL statement, target database and target table that will contain the results of the transformation for the Fact table.
- takes as input a SQL statement, target database and target table that will contain the results of the transformation for the Dimension table.
- runs checks on the data itself.
- contains all the SQL transformations queries.
-
launch Redshift cluster
-
launch Airflow
-
setup connections
-
run create_tables_dag
-
run process_data_dag


