This repository contains the final project for the "Large-Scale Data Management" course at the National Technical University of Athens (NTUA). The project focuses on the end-to-end analysis of large-scale datasets related to crime, demographics, and geospatial information in Los Angeles County. The implementation utilizes a distributed environment based on Kubernetes, Apache Spark, and HDFS to handle data preprocessing, complex querying, and performance optimization.
- Data Preprocessing: Implementing an ETL pipeline to ingest raw
.csvand.txtdatasets, perform data cleaning and transformation, and store them in the efficient Parquet columnar format on HDFS. - Advanced Querying with Spark APIs: Developing solutions for complex analytical queries using RDDs, DataFrames, and Spark SQL to compare their performance and expressiveness.
- Performance & Scalability Analysis: Conducting horizontal and vertical scaling experiments to analyze the impact of cluster configurations (executors, cores, memory) on job performance for computationally intensive tasks.
- Join Strategy Optimization: Analyzing Spark's physical execution plans generated by the Catalyst Optimizer to understand and verify the selection of optimal join strategies, such as Broadcast Hash Join.
The analysis integrates the following public datasets:
- Los Angeles Crime Data (2010-Present): Detailed records of reported crimes.
- 2010 Census Populations by Zip Code: Demographic data including population counts for each zip code.
- Median Household Income by Zip Code (2015): Socioeconomic data for income-related analysis.
- LA Police Stations: Geospatial data containing the coordinates of all 21 LAPD stations.
- MO Codes (Modus Operandi): Descriptive codes used to classify criminal methods.
- Aggravated Assault Victim Demographics: Classification and ranking of victim age groups involved in "aggravated assault" incidents.
- Top 3 Police Precincts by Closed-Case Rate: Annual identification of top-performing police precincts based on their rate of closed cases.
- Income Per Capita by Zip Code: Calculation of the average income per person for each zip code by joining census population and household income datasets.
- Geospatial Analysis of Gun-Related Crimes: A multi-step query to compute the number of gun-related crimes and their average distance (using the Haversine formula) from the nearest police station. This query involved a Cross Join and served as the benchmark for the scaling experiments.
All jobs were executed on the CSLab's Kubernetes cluster. The spark-submit command was used to launch applications in cluster mode, with configurations for the master endpoint, container image, and event logging.
Example spark-submit command:
spark-submit \
--master k8s://https://termi7.cslab.ece.ntua.gr:6443 \
--deploy-mode cluster \
--name MySparkQuery \
--conf spark.kubernetes.namespace=<your-username>-priv \
--conf spark.kubernetes.container.image=apache/spark \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://hdfs-namenode:9000/user/<your-username>/logs \
hdfs://hdfs-namenode:9000/user/<your-username>/path/to/your_script.pyA local Spark History Server was configured using docker-compose to analyze the performance and execution details of completed jobs. The server was set up to read event logs directly from the project's HDFS directory. This was critical for comparing job durations, inspecting execution DAGs, analyzing data shuffle, and verifying the Catalyst Optimizer's join strategies.
- HDFS Event Log Directory:
hdfs://hdfs-namenode:9000/user/<your-username>/logs/ - Local Web UI Access:
http://localhost:18080
- Processing Engine: Apache Spark
- Distributed Storage: Hadoop Distributed File System (HDFS)
- Orchestration: Kubernetes
- Programming Language: Python (PySpark)
- Monitoring: Spark History Server, Docker