Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added TA10-BadDataBusters-ETL-Pipeline.zip
Binary file not shown.
49 changes: 49 additions & 0 deletions etl_pipeline.asl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"Comment": "ETL Data Pipeline",
"StartAt": "Extract",
"States": {
"Extract": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:484907485342:function:etl-pipeline-ExtractFunction-i4HDbxpUI79s",
"Next": "Transform",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "LogError"
}
]
},
"Transform": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:484907485342:function:etl-pipeline-TransformFunction-uU83pxSwfLeo",
"Next": "Load",
"ResultPath": "$.transformed_data",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "LogError"
}
]
},
"Load": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:484907485342:function:etl-pipeline-LoadFunction-mcIkgdZb2Ayp",
"End": true,
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "LogError"
}
]
},
"LogError": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:484907485342:function:etl-pipeline-LogErrorFunction-BNvChurMM8Am",
"End": true
}

}
}
94 changes: 94 additions & 0 deletions extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import boto3
import csv
import json

# -- Overview --
# This Lambda function is the first stage of a serverless ETL (Extract, Transform, Load) pipeline.
# Its job is to extract and clean messy startup funding data from an S3 bucket and prepare it for further processing.
# Think of it as the data janitor that makes sense of the chaos — fixing dates, removing unnecessary commas,
# and ensuring every field is ready for prime-time analysis.

# The dataset we’re working with is Indian Startup Funding Data, containing information about when startups raised money,
# how much they raised, who invested, and what industries they belong to.
# This function does the Extract part of ETL, with a sprinkle of Transform for data cleaning.

# The ETL data pipeline, defined in the etl_pipeline_atl.json, orchestrates a serverless workflow using
# AWS Step Functions to manage and process data efficiently. This ensures a seamless and fault-tolerant data workflow,
# integrating all stages of the ETL process.



#-----------------------------------------------------------------------------------------------------------------------------------------------------#

# S3 client, our pipeline's delivery executive.
# It’ll fetch files from Amazon S3 (think of it as our data warehouse in the cloud) and return cleaned, shiny data later.
s3 = boto3.client('s3')


def lambda_handler(event, context):
# First, we set up a storage spot for cleaned data because, the raw data we’re working with
# (funding details of Indian startups) can be a bit messy, just like the pitches at the Duke AI hackathon.
bucket_name = 'etl-intermediate-storage'

# This is the cleaned data’s new name when we upload it to our temporary S3 spot.
file_key = 'extracted-data.json'

# Here’s our raw material: a CSV file loaded with startup funding details.
# Spoiler alert: we’re going to clean up names, dates, and even commas from the funding amounts (seriously, who does that?).
source_file = 'startup_funding.csv'

try:
# **1: Let’s grab the CSV from our S3 warehouse**
# Picture this: a dusty Excel sheet tucked away in the bucket `etl-indian-startup-funding`.
# We fetch it like an intern running to grab chai (it's not chai tea) for the team.
response = s3.get_object(Bucket='etl-indian-startup-funding', Key=source_file)

# Now we’re cracking open the file and reading it line by line.
lines = response['Body'].read().decode('utf-8').splitlines()

# **2: Time for some serious data cleaning**
reader = csv.DictReader(lines)

# We’re going to build a shiny, cleaned-up dataset, just like how startups pivot to sound “fundable”.
cleaned_data = []

# Let’s dive into each row and clean the mess.
for row in reader:
cleaned_row = {
"Date": row.get("Date dd/mm/yyyy", "").strip(), # Date of funding (or when the founder sold their soul to VCs).
"Startup": row.get("Startup Name", "").strip(), # Name of the startup (cue buzzwords: “synergy”, “disruption”).
"Industry": row.get("Industry Vertical", "").strip(), # What space they’re in (no, “AI for Dosa Makers” isn’t valid).
"SubVertical": row.get("SubVertical", "").strip(), # The niche they’re chasing (probably still in beta).
"City": row.get("City Location", "").strip(), # Where they’re based (a.k.a where they rented co-working space).
"Investors": row.get("Investors Name", "").strip(), # Who believed in their “vision” (or their PPT).
"InvestmentType": row.get("InvestmentnType", "").strip(), # Pre-seed? Seed? Series Z? Does it even matter anymore?
"AmountUSD": parse_amount(row.get("Amount in USD", "")) # The *real* deal: how much money they raised (post “adjustments”).
}
# Add the cleaned row to our list
cleaned_data.append(cleaned_row)

# **3: Save the cleaned data back to S3**
# This cleaned dataset is now ready for the next stage. It’s like getting feedback on your MVP
# and saying, “We’ll fix it in the next sprint.” Here, we save it in S3 for real.
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=json.dumps(cleaned_data),
ContentType='application/json'
)

return {"bucket": bucket_name, "key": file_key}

except Exception as e:
# If something breaks (a.k.a the demo doesn’t work), log the error and move on.
# We’re not here to fail silently — let’s make noise.
return {"error": str(e)}


def parse_amount(amount):
# Helper function to remove commas and make funding a proper number.
try:
return int(amount.replace(",", "")) if amount else 0
except ValueError:
# If something funky sneaks into the amount field (like “N/A”), we default to 0.
return 0
57 changes: 57 additions & 0 deletions load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import boto3
import json
import logging

# -- Overview --
# The Load Stage is the final step in the ETL (Extract, Transform, Load) pipeline.
# This stage is responsible for saving the processed, aggregated data (generated during the Transform Stage)
# into an S3 bucket. The goal is to make the insights available for downstream use.


#-----------------------------------------------------------------------------------------------------------------------------------------------------#

# Logging system is like a CTO tracking production deployments.
# INFO logs are for bragging rights (successful operations), and ERROR logs are for... well, explaining what broke.
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):

s3 = boto3.client('s3')

# Define the final destination for our transformed data.
# Think of this as the VC dashboard where all startup funding trends are stored for everyone to admire.
bucket_name = 'etl-indian-startup-output' # The final resting place for our processed data.
file_key = 'transformed-data.json' # The name of our report file in JSON format.

try:
# **1: Grab the Transformed Data**
# This is where the spotlight shifts to the results from the Transform stage.
# The aggregated funding data by city lands here, passed via Step Functions.
transformed_data = event.get('transformed_data', {})
# Example `transformed_data`: {"Bangalore": 3000000, "Mumbai": 2000000, "Delhi": 1500000}

# **2: Store the Data in the Destination Bucket**
# Time to upload this to S3, so it’s ready for the next AIPI class.
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=json.dumps(transformed_data),
ContentType='application/json'
)

# **3: Log the Success**
# You know that feeling when you successfully close a funding round? This is the equivalent.
logger.info(f"Data loaded successfully to {bucket_name}/{file_key}")

# Return a success message to mark the final step of the ETL process.
return {'message': 'Data loaded successfully'}

except Exception as e:
# **4: Handle Errors Gracefully**
# Just like us founders know how to pivot after rejection, we’ll log and handle any issues here.
logger.error(f"Error uploading data to S3: {e}")

# Return the error message, so it’s clear what went wrong (we’re not here to hide bugs, and call it 'features').
return {'error': str(e)}
56 changes: 56 additions & 0 deletions transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import boto3
import json

# -- Overview --
# The Transform Stage is the second phase of the ETL (Extract, Transform, Load) pipeline.
# This stage processes cleaned data from the Extract Stage to generate meaningful insights.
# Specifically, it aggregates startup funding amounts by city, enabling data-driven analysis of
# which cities are attracting the most investment.


#-----------------------------------------------------------------------------------------------------------------------------------------------------#

# S3 client helps us fetch data from one bucket (Intermediate Storage) and send it wherever it’s needed next.
s3 = boto3.client('s3')

def lambda_handler(event, context):
try:
# Imagine you're a startup founder, and you’ve just hired someone to organize your funding data.
# They tell you where the cleaned-up data is stored (bucket name and file key).
# We’re retrieving that information here.
bucket_name = event['bucket'] # This is the bucket where cleaned data is stored temporarily.
file_key = event['key'] # And this is the name of the cleaned file we’re about to process.

# **1: Fetch Cleaned Data**
# Our cleaned funding data is sitting in S3 (Intermediate Storage). We’re going to fetch it.
# Think of this as pulling a Google Sheet (With the right permissions for once) someone just shared with you.
response = s3.get_object(Bucket=bucket_name, Key=file_key)

data = json.loads(response['Body'].read())

# **2: Transform the Data**
# Let’s do something useful with this data: calculate how much funding each city has received.
# This step is like analyzing your startup’s traction — where’s the money really coming from?
city_funding = {} # This will be our final report: {City: Total Funding}

# We loop through each record in the cleaned data to build our city-wise funding report.
for record in data:
# Grab the city name and funding amount from each record.
city = record.get("City") # e.g., "Bangalore"
amount = record.get("AmountUSD", 0) # Funding amount (defaults to 0 if not present)

if city: # We’re only interested in records with valid city names.
# Add the funding amount to the city’s total. If it’s the first time seeing this city, start at 0.
city_funding[city] = city_funding.get(city, 0) + amount

# By the end of this loop, we’ll have something like:
# {"Bangalore": 3000000, "Mumbai": 2000000, "Delhi": 1500000}

# **3: Return the Transformed Data**
return {"transformed_data": city_funding}

except Exception as e:
# **4: Error Handling**
# If something goes wrong — like missing data or permissions issues — we’ll catch it here.
# Instead of crashing the pipeline, we return an error message for debugging.
return {"error": str(e)}