Skip to content

Commit 8439265

Browse files
committed
Modify Data Pipeline to perform Global Energy Transition Analysis
- Implement Global Energy Transition Analysis Pipeline in pipeline.py, replacing sample data generation with real-time data fetching from World Bank API. - Enhance data processing with energy transition metrics and regional analysis. - Add interactive visualizations using Plotly and a web dashboard with Dash. - Update README.md to reflect new features and usage instructions. Modify requirements.txt to include necessary libraries for data fetching and visualization.
1 parent 5c4ccc4 commit 8439265

File tree

3 files changed

+234
-44
lines changed

3 files changed

+234
-44
lines changed

README.md

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1-
# Simple Data Pipeline with Prefect
1+
# Global Energy Transition Analysis Pipeline with Prefect
22

3-
This project demonstrates a simple data pipeline using Prefect. The pipeline generates sample data, processes it by adding derived columns, and saves the results to a CSV file.
3+
This project implements a data pipeline that analyzes global energy consumption and renewable energy adoption data from the World Bank. The pipeline tracks energy transition progress across countries and regions, providing insights into the shift towards sustainable energy sources.
4+
5+
## Features
6+
7+
- Real-time data fetching from World Bank API
8+
- Multi-indicator energy analysis
9+
- Regional trend analysis
10+
- Interactive visualizations using Plotly
11+
- Web dashboard using Dash
12+
- Automated report generation
13+
- Prefect workflow management
414

515
## Setup
616

@@ -23,11 +33,16 @@ python pipeline.py
2333
```
2434

2535
The pipeline will:
26-
1. Generate sample time series data
27-
2. Process the data by adding:
28-
- 7-day rolling mean
29-
- Boolean flag indicating if value is above mean
30-
3. Save the processed data to a CSV file with timestamp
36+
1. Fetch the latest energy consumption data from World Bank
37+
2. Process and analyze energy transition metrics
38+
3. Generate interactive visualizations
39+
4. Create a web dashboard
40+
5. Generate a summary report
41+
42+
Outputs:
43+
- Interactive visualizations in the `output` directory
44+
- Summary report in `output/energy_report.txt`
45+
- Web dashboard available at http://localhost:8050
3146

3247
## Cloud Deployment
3348

@@ -40,6 +55,7 @@ The pipeline will:
4055
- Go to your GitHub repository settings
4156
- Navigate to Secrets and Variables > Actions
4257
- Add a new secret named `PREFECT_API_KEY` with your Prefect Cloud API key
58+
- Add a new secret named `PREFECT_WORKSPACE` with your workspace name
4359

4460
### GitHub Actions
4561

@@ -58,13 +74,30 @@ To run the pipeline manually:
5874
- `pipeline.py`: Main pipeline script containing the Prefect flow and tasks
5975
- `requirements.txt`: Project dependencies
6076
- `.github/workflows/pipeline.yml`: GitHub Actions workflow configuration
77+
- `output/`: Directory containing generated visualizations and reports
6178
- `README.md`: This file
6279

63-
## Features
64-
65-
- Uses Prefect's `@flow` and `@task` decorators for workflow management
66-
- Demonstrates basic data processing with pandas
67-
- Includes automatic file naming with timestamps
68-
- Provides clear logging of pipeline execution
69-
- Runs in Docker containers via Prefect Cloud
70-
- Automated execution through GitHub Actions
80+
## Data Analysis Features
81+
82+
1. Energy Consumption Analysis:
83+
- Per capita energy use
84+
- Electric power consumption
85+
- Fossil fuel dependency
86+
- Renewable energy adoption
87+
88+
2. Transition Metrics:
89+
- Energy transition score
90+
- Year-over-year renewable growth
91+
- Regional energy patterns
92+
- Country-level comparisons
93+
94+
3. Visualizations:
95+
- Interactive scatter plots for energy transition progress
96+
- Regional trend line charts
97+
- Web dashboard with multiple views
98+
99+
4. Reporting:
100+
- Key energy statistics
101+
- Top performing countries
102+
- Regional comparisons
103+
- Links to interactive visualizations

pipeline.py

Lines changed: 181 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,208 @@
22
import pandas as pd
33
import numpy as np
44
from datetime import datetime
5+
import requests
6+
import plotly.express as px
7+
import plotly.graph_objects as go
8+
from dash import Dash, html, dcc
59
import os
10+
from dotenv import load_dotenv
11+
12+
# Load environment variables
13+
load_dotenv()
614

715
@task
8-
def generate_data():
9-
"""Generate sample data"""
10-
np.random.seed(42)
11-
data = {
12-
'date': pd.date_range(start='2024-01-01', periods=100),
13-
'value': np.random.normal(100, 15, 100)
16+
def fetch_energy_data():
17+
"""Fetch energy consumption and renewable energy data from World Bank API"""
18+
# World Bank API endpoints for energy indicators
19+
indicators = {
20+
'EG.USE.PCAP.KG.OE': 'Energy use per capita (kg of oil equivalent)',
21+
'EG.FEC.RNEW.ZS': 'Renewable energy consumption (% of total final energy consumption)',
22+
'EG.USE.COMM.FO.ZS': 'Fossil fuel energy consumption (% of total)',
23+
'EG.USE.ELEC.KH.PC': 'Electric power consumption (kWh per capita)'
1424
}
15-
return pd.DataFrame(data)
25+
26+
try:
27+
# Fetch data for each indicator
28+
dfs = []
29+
for indicator in indicators.keys():
30+
url = f"https://api.worldbank.org/v2/country/all/indicator/{indicator}?format=json&per_page=1000"
31+
response = requests.get(url)
32+
response.raise_for_status()
33+
34+
# Extract data from response
35+
data = response.json()[1]
36+
df = pd.DataFrame(data)
37+
df['indicator'] = indicators[indicator]
38+
dfs.append(df)
39+
40+
# Combine all indicators
41+
combined_df = pd.concat(dfs, ignore_index=True)
42+
43+
# Clean and process the data
44+
combined_df['date'] = pd.to_datetime(combined_df['date'])
45+
combined_df['value'] = pd.to_numeric(combined_df['value'], errors='coerce')
46+
47+
# Pivot the data to have indicators as columns
48+
df_pivot = combined_df.pivot_table(
49+
index=['country', 'date'],
50+
columns='indicator',
51+
values='value',
52+
aggfunc='first'
53+
).reset_index()
54+
55+
return df_pivot
56+
except Exception as e:
57+
print(f"Error fetching data: {e}")
58+
raise
1659

1760
@task
1861
def process_data(df):
19-
"""Process the data by adding derived columns"""
20-
df['rolling_mean'] = df['value'].rolling(window=7).mean()
21-
df['is_above_mean'] = df['value'] > df['value'].mean()
22-
return df
62+
"""Process and clean the energy data"""
63+
# Calculate year-over-year changes
64+
df['Renewable Growth'] = df.groupby('country')['Renewable energy consumption (% of total final energy consumption)'].pct_change()
65+
66+
# Calculate energy transition score (higher renewable %, lower fossil fuel %)
67+
df['Energy Transition Score'] = (
68+
df['Renewable energy consumption (% of total final energy consumption)'] -
69+
df['Fossil fuel energy consumption (% of total)']
70+
)
71+
72+
# Get latest data for each country
73+
latest_data = df.groupby('country').last().reset_index()
74+
75+
# Calculate regional averages
76+
df['Region'] = df['country'].map(lambda x: get_region(x))
77+
regional_avg = df.groupby(['Region', 'date']).mean().reset_index()
78+
79+
return df, latest_data, regional_avg
80+
81+
def get_region(country):
82+
"""Map countries to regions"""
83+
# This is a simplified mapping - you can expand this
84+
regions = {
85+
'United States': 'North America',
86+
'Canada': 'North America',
87+
'China': 'Asia',
88+
'India': 'Asia',
89+
'Germany': 'Europe',
90+
'France': 'Europe',
91+
'Brazil': 'South America',
92+
'South Africa': 'Africa',
93+
'Australia': 'Oceania'
94+
}
95+
return regions.get(country, 'Other')
2396

2497
@task
25-
def save_data(df):
26-
"""Save the processed data"""
27-
output_file = f"processed_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
28-
df.to_csv(output_file, index=False)
29-
return output_file
30-
31-
@flow(name="Simple Data Pipeline")
32-
def data_pipeline():
98+
def create_visualizations(df, latest_data, regional_avg):
99+
"""Create interactive visualizations"""
100+
# Create output directory if it doesn't exist
101+
os.makedirs('output', exist_ok=True)
102+
103+
# 1. Energy Transition Progress
104+
fig_transition = px.scatter(
105+
latest_data,
106+
x='Fossil fuel energy consumption (% of total)',
107+
y='Renewable energy consumption (% of total final energy consumption)',
108+
size='Energy use per capita (kg of oil equivalent)',
109+
color='Region',
110+
hover_data=['country'],
111+
title='Energy Transition Progress by Country',
112+
labels={
113+
'Fossil fuel energy consumption (% of total)': 'Fossil Fuel Consumption (%)',
114+
'Renewable energy consumption (% of total final energy consumption)': 'Renewable Energy (%)'
115+
}
116+
)
117+
fig_transition.write_html('output/energy_transition.html')
118+
119+
# 2. Regional Renewable Energy Trends
120+
fig_regional = px.line(
121+
regional_avg,
122+
x='date',
123+
y='Renewable energy consumption (% of total final energy consumption)',
124+
color='Region',
125+
title='Regional Renewable Energy Adoption Trends',
126+
labels={
127+
'Renewable energy consumption (% of total final energy consumption)': 'Renewable Energy (%)',
128+
'date': 'Year'
129+
}
130+
)
131+
fig_regional.write_html('output/regional_trends.html')
132+
133+
# 3. Create a dashboard
134+
app = Dash(__name__)
135+
136+
app.layout = html.Div([
137+
html.H1('Global Energy Transition Dashboard'),
138+
html.Div([
139+
dcc.Graph(figure=fig_transition),
140+
dcc.Graph(figure=fig_regional)
141+
])
142+
])
143+
144+
app.run_server(debug=False, port=8050)
145+
146+
return 'output/energy_transition.html', 'output/regional_trends.html'
147+
148+
@task
149+
def generate_report(latest_data, transition_file, regional_file):
150+
"""Generate a summary report"""
151+
report = f"""
152+
Global Energy Transition Analysis Report
153+
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
154+
155+
Key Statistics:
156+
--------------
157+
Total Countries Analyzed: {len(latest_data)}
158+
Global Average Renewable Energy Share: {latest_data['Renewable energy consumption (% of total final energy consumption)'].mean():.2f}%
159+
160+
Top 5 Countries by Renewable Energy Share:
161+
----------------------------------------
162+
{latest_data.nlargest(5, 'Renewable energy consumption (% of total final energy consumption)')[['country', 'Renewable energy consumption (% of total final energy consumption)']].to_string()}
163+
164+
Top 5 Countries by Energy Transition Score:
165+
----------------------------------------
166+
{latest_data.nlargest(5, 'Energy Transition Score')[['country', 'Energy Transition Score']].to_string()}
167+
168+
Visualizations:
169+
--------------
170+
- Energy Transition Progress: {transition_file}
171+
- Regional Renewable Trends: {regional_file}
172+
"""
173+
174+
with open('output/energy_report.txt', 'w') as f:
175+
f.write(report)
176+
177+
return 'output/energy_report.txt'
178+
179+
@flow(name="Global Energy Transition Analysis Pipeline")
180+
def energy_pipeline():
33181
"""Main pipeline flow"""
34-
# Generate data
35-
raw_data = generate_data()
182+
# Fetch data
183+
raw_data = fetch_energy_data()
36184

37185
# Process data
38-
processed_data = process_data(raw_data)
186+
processed_data, latest_data, regional_avg = process_data(raw_data)
187+
188+
# Create visualizations
189+
transition_file, regional_file = create_visualizations(processed_data, latest_data, regional_avg)
39190

40-
# Save results
41-
output_file = save_data(processed_data)
191+
# Generate report
192+
report_file = generate_report(latest_data, transition_file, regional_file)
42193

43-
print(f"Pipeline completed successfully. Output saved to: {output_file}")
194+
print(f"Pipeline completed successfully. Report saved to: {report_file}")
195+
print(f"Visualizations saved in the 'output' directory")
196+
print("Dashboard available at http://localhost:8050")
44197

45198
def create_deployment():
46199
"""Create a deployment for the pipeline"""
47-
data_pipeline.serve(
48-
name="data-pipeline-deployment",
200+
energy_pipeline.serve(
201+
name="energy-analysis-deployment",
49202
work_queue_name="default",
50203
)
51204

52205
if __name__ == "__main__":
53206
if os.getenv("PREFECT_DEPLOYMENT"):
54207
create_deployment()
55208
else:
56-
data_pipeline()
209+
energy_pipeline()

requirements.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
prefect>=2.14.0
22
pandas>=2.0.0
3-
numpy>=1.24.0
3+
numpy>=1.24.0
4+
requests>=2.31.0
5+
plotly>=5.18.0
6+
dash>=2.14.0
7+
python-dotenv>=1.0.0

0 commit comments

Comments
 (0)