This project was submitted as part of first project for the Data Engineering Nanodegree. The project has three key components:
- Data modeling with Postgres
- Database star schema created
- ETL pipeline using Python
A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analytics team is particularly interested in understanding what songs users are listening to. Currently, they don't have an easy way to query their data, which resides in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.
They'd like a data engineer to create a Postgres database with tables designed to optimize queries on song play analysis, and bring you on the project. Your role is to create a database schema and ETL pipeline for this analysis. You'll be able to test your database and ETL pipeline by running queries given to you by the analytics team from Sparkify and compare your results with their expected results.
In this project, you'll apply what you've learned on data modeling with Postgres and build an ETL pipeline using Python. To complete the project, you will need to define fact and dimension tables for a star schema for a particular analytic focus, and write an ETL pipeline that transfers data from files in two local directories into these tables in Postgres using Python and SQL. A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analytics team is particularly interested in understanding what songs users are listening to. Currently, they don't have an easy way to query their data, which resides in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.
Your role is to create a database schema and ETL pipeline for this analysis.
There are two kinds of datasets available to you, namely song datasets and log datasets.
- Song datasets: Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID. And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like. A sample of this files is:
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
- Log datasets: The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate activity logs from a music streaming app based on specified configurations. Below is how the log data in a typical row looks like:
{"artist":"Slipknot","auth":"Logged In","firstName":"Aiden","gender":"M","itemInSession":0,"lastName":"Ramirez","length":192.57424,"level":"paid","location":"New York-Newark-Jersey City, NY-NJ-PA","method":"PUT","page":"NextSong","registration":1540283578796.0,"sessionId":19,"song":"Opium Of The People (Album Version)","status":200,"ts":1541639510796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"","userId":"20"}
The schema used for this exercise is the Star Schema: There is one main fact table containing all the measures associated to each event (user song plays), and 4 dimentional tables, each with a primary key that is being referenced from the fact table.
songplays - records in log data associated with song plays i.e. records with page NextSong
- songplay_id (INT) PRIMARY KEY
- start_time (DATE) NOT NULL
- user_id (INT) NOT NULL
- level (TEXT)
- song_id (TEXT)
- artist_id (TEXT)
- session_id (INT)
- location (TEXT)
- user_agent (TEXT)
users - users in the app
- user_id (INT) PRIMARY KEY
- last_name (TEXT) NOT NULL
- gender (TEXT)
- level (TEXT)
songs - songs in music database
- song_id (TEXT) PRIMARY KEY
- title (TEXT) NOT NULL
- artist_id (TEXT) NOT NULL
- year (INT)
- duration (FLOAT) NOT NULL
artists - artists in music database
- artist_id (TEXT) PRIMARY KEY
- name (TEXT) NOT NULL
- location (TEXT)
- lattitude (FLOAT)
- longitude (FLOAT)
time - timestamps of records in songplays broken down into specific units
- start_time (DATE) PRIMARY KEY
- hour (INT)
- day (INT)
- week (INT)
- month (INT)
- year (INT)
- weekday (TEXT)
- data This is the folder in the repository which contains all the data.
- sql_queries.py All the sql queries are written in this file.
- create_tables.py The tables are created and dropped in this file using sql queries written in create_tables.py. The tables need to be reset everytime the ETL script(see no. 5 or 6) is rerun.
- test.ipynb This python notebook displays the first few rows of each table to check that the data is inserted into the tables.
- etl.ipynb reads and processes a single file from song_data and log_data and loads the data into your tables.
- etl.py reads and processes files from song_data and log_data and loads them into your tables.
- README.md This is the current file which provides discussion on the project.
python sql_queries.py
In the python file sql_queries.py the DROP, CREATE and INSERT query statements are first written. Below an example can be seen where a user table is created using SQl query "CREATE TABLE IF NOT EXISTS name_of_the_table". The various columns in the table with their respective data type are then listed such as user id which is an integer. It is also declared as a PRIMARY KEY.
user_table_create = ("""
CREATE TABLE IF NOT EXISTS users
(user_id int PRIMARY KEY,
first_name text NOT NULL,
last_name text NOT NULL,
gender text,
level text)
""")
Following the same procedure, other tables of the star schema(listed in section Database Schema) are also created. Then continuing with the example, data is inserted into the user table as follows:
user_table_insert = ("""
INSERT INTO users
(user_id, first_name, last_name, gender, level)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_id) DO UPDATE SET
level = EXCLUDED.level
""")
Here the %s in VALUES act as placeholders for the data, which is comprised of user_id, first_name, last_name, gender, and level respectively.
Same procedure is used for all the other tables in the schema. Finally, a general create table and drop table list is created which contains all the queries for each table in the schema:
create_table_queries = [user_table_create, artist_table_create, song_table_create, time_table_create, songplay_table_create]
drop_table_queries = [user_table_drop, artist_table_drop, song_table_drop, time_table_drop, songplay_table_drop]
python create_tables.py
In this python file three functions are written:
- create_database()
- drop_tables(cur, conn)
- create_tables(cur, conn)
The function create_database creates a connection to the database and return connection and cursor object.
The functions drop_tables and create_tables drop and create the tables in the database by taking in conn and cur objects as input.
These functions are called in the main function below for execution of their respective tasks:
def main():
"""
Description: This is the main functon which implements the following:
- Drops (if exists) and Creates the sparkify database.
- Establishes connection with the sparkify database and gets
cursor to it.
- Drops all the tables.
- Creates all tables needed.
- Finally, closes the connection.
Arguments:
None
Returns:
None
"""
cur, conn = create_database()
drop_tables(cur, conn)
create_tables(cur, conn)
conn.close()
if name == "main":
main()
python etl.py
Note: Before running(rerunning) etl.py and/or etl.pynb, database and tables need to be created(updated) each time by running
python create_tables.py
Some of the Key components in etl.pynb and etl.py are:
- A connection is made to the sparkify database.
- A get_files function is written which provides the path to each data file
- The song data is first processed by putting the data into pandas dataframe by using pandas read_json function: df = pd.read_json(filepath,lines=True)
- The selected columns from the data are extracted from the pandas dataframe discussed in the above line: artist_id, artist_latitude, artist_location, artist_longitude, artist_name, duration, num_songs, song_id, title, year = df.values[0]
- The data is then put into a list: song_data = [song_id, title, artist_id, year, duration]
- The record is then inserted into the song table: cur.execute(song_table_insert, song_data) conn.commit()
Similar procedure is followed for the other tables in the schema. The details can be seen in the files etl.pynb and etl.py.
Recall that the difference between files etl.pynb and etl.py is that etl.pynb reads and processes and loads a single file from song_data and log_data as compared to etl.py which processes and loads all the data into the database.