Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions airflow/dags/dag_sec_json_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def schema_def_json(**kwargs):
year = kwargs['dag_run'].conf.get('year')
quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1]
return f"""
CREATE OR REPLACE TABLE SEC_JSON_{year}_{quarter} (
CREATE OR REPLACE TABLE SEC_JSON_{year}_Q{quarter} (
json_data VARIANT
);
"""
Expand All @@ -478,9 +478,68 @@ def copyinto_json(**kwargs):
year = kwargs['dag_run'].conf.get('year')
quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1]
return f"""
COPY INTO SEC_JSON_{year}_{quarter}
COPY INTO SEC_JSON_{year}_Q{quarter}
FROM @sec_json_stage/data/{year}/{quarter}/json/
FILE_FORMAT = (TYPE = 'JSON');
CREATE OR REPLACE VIEW JSON_BS_{year}_Q{quarter} AS
SELECT
t.json_data:year::NUMBER AS year,
t.json_data:quarter::STRING AS quarter,
t.json_data:country::STRING AS country,
t.json_data:city::STRING AS city,
t.json_data:name::STRING AS company_name,
t.json_data:symbol::STRING AS symbol,

bs.value:concept::STRING AS concept,
bs.value:info::STRING AS info,
bs.value:label::STRING AS label,
bs.value:unit::STRING AS unit,
bs.value:value::NUMBER AS value
FROM SEC_JSON_{year}_Q{quarter} t,
LATERAL FLATTEN(input => t.json_data:data.bs) bs;

CREATE OR REPLACE VIEW JSON_CF_{year}_Q{quarter} AS
SELECT
t.json_data:year::NUMBER AS year,
t.json_data:quarter::STRING AS quarter,
t.json_data:country::STRING AS country,
t.json_data:city::STRING AS city,
t.json_data:name::STRING AS company_name,
t.json_data:symbol::STRING AS symbol,

cf.value:concept::STRING AS concept,
cf.value:info::STRING AS info,
cf.value:label::STRING AS label,
cf.value:unit::STRING AS unit,
cf.value:value::NUMBER AS value
FROM SEC_JSON_{year}_Q{quarter} t,
LATERAL FLATTEN(input => t.json_data:data.cf) cf;

CREATE OR REPLACE VIEW JSON_IS_{year}_Q{quarter} AS
SELECT
t.json_data:year::NUMBER AS year,
t.json_data:quarter::STRING AS quarter,
t.json_data:country::STRING AS country,
t.json_data:city::STRING AS city,
t.json_data:name::STRING AS company_name,
t.json_data:symbol::STRING AS symbol,

ic.value:concept::STRING AS concept,
ic.value:info::STRING AS info,
ic.value:label::STRING AS label,
ic.value:unit::STRING AS unit,
ic.value:value::NUMBER AS value
FROM SEC_JSON_{year}_Q{quarter} t,
LATERAL FLATTEN(input => t.json_data:data.ic) ic;

CREATE OR REPLACE VIEW JSON_FV_{year}_Q{quarter} AS
SELECT *, 'Balance Sheet' AS section_type FROM JSON_BS_{year}_Q{quarter}
UNION ALL
SELECT *, 'Cash Flow' AS section_type FROM JSON_CF_{year}_Q{quarter}
UNION ALL
SELECT *, 'Income Statement' AS section_type FROM JSON_IS_{year}_Q{quarter};


"""
generate_copy_sql_json = PythonOperator(
task_id='generate_copy_sql_json',
Expand Down
5 changes: 2 additions & 3 deletions dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ FROM --platform=linux/amd64 python:3.12.8-slim
WORKDIR /app

# Copy requirements first for layer caching
COPY requirements.txt .
COPY requirement.txt .
COPY .env /app/.env

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r requirement.txt

# Copy the application code
COPY ./backend /app/backend

# COPY ./frontend /app/frontend
COPY ./features /app/features
COPY ./services /app/services

# Set environment variables
Expand Down
41 changes: 5 additions & 36 deletions frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,10 @@
load_dotenv()

# Airflow API endpoint
# AIRFLOW_API_URL = "https://ebaeb7d6-905a-429f-8719-9ff6a3c16313.c67.us-east-1.airflow.amazonaws.com"
AIRFLOW_API_URL = "http://localhost:8080"

<<<<<<< HEAD
st.title("SEC Data - Bridge")


# # Input fields for year and quarter
year = st.selectbox("Select Year",("2024","2023","2022","2021","2020","2019","2018","2017"))
quarter = st.selectbox("Select Quarter", ("1","2","3","4"))

if st.button("Fetch Data"):
# Payload for triggering the DAG
payload = {
"conf": {
"year": year,
"quarter": quarter
}
}
dag_id = "sec_data_to_s3_scraper"
AIRFLOW_API_URL = f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns"

# Trigger the DAG via Airflow REST API
response = requests.post(
AIRFLOW_API_URL,
json=payload,
auth=(f"{AIRFLOW_USER}", f"{AIRFLOW_PASSCODE}")
)

if response.status_code == 200:
st.success("DAG triggered successfully!")
else:
st.error(f"Failed to trigger DAG: {response.text}")
=======
QUERY_API_URL = "http://localhost:8000"
>>>>>>> origin/main
QUERY_API_URL = "https://fastapi-service-7ss2sa6dka-uc.a.run.app"

def populate_airflow_page():
# Display the airflow page
Expand Down Expand Up @@ -118,9 +87,9 @@ def populate_query_page():
st.session_state.flag = True
else:
st.info(f"No data available for **{source}**, Year: **{year}**, Quarter: **{quarter}**. Trigger the Airflow DAG to fetch data.")
st.write("Query Results:")
st.dataframe(query_executed)
st.success(f"Query executed successfully.")
# st.write("Query Results:")
# st.dataframe(query_executed)
# st.success(f"Query executed successfully.")
# Show query input only if data is available
if st.session_state.flag:
# Text area for query input (persistent using session state)
Expand Down
11 changes: 5 additions & 6 deletions requirement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ numpy
pandas
fastapi
uvicorn
bs4
requests
boto3
datetime
pydantic
pathlib
logging
apache-airflow
python-dotenv
sqlalchemy
snowflake-sqlalchemy
apache-airflow-providers-snowflake
dbt-core
dbt-snowflake
apache-airflow-providers-amazon
logging