Study Notes 5.3.3: Preparing Yellow and Green Taxi Data
1. Overview and Context
This video demonstrates how to prepare taxi data for further processing. The main tasks include:
- Downloading Data: Automating the download of monthly data files for Yellow and Green taxi trips (from 2020 and 2021).
- File Organization and Compression: Storing the downloaded CSV files in a structured directory format and compressing them.
- Data Conversion: Using Apache Spark to read the CSV files, define an explicit schema, and convert the data into Parquet format for efficient storage and querying.
- Error Handling and Logging: Incorporating basic error handling in scripts to ensure robust execution.
This workflow is typical in data engineering pipelines where raw data ingestion is followed by cleaning, transformation, and conversion into optimized storage formats.
2. Environment Setup
Tools and Platforms
- Google Cloud Virtual Machine (VM): The example uses a VM on Google Cloud.
- Bash Shell: For writing scripts that automate data download.
- wget: A command-line tool to download files from the web.
- jzip: A utility to compress CSV files (Spark can read these compressed files directly).
- Apache Spark & PySpark: For processing data and converting CSV files to Parquet.
- Jupyter Notebook: To run Spark code interactively.
Directory Structure
- Code Directory: All scripts and notebooks are organized in a project directory.
-
Data Organization: Data is saved under a structured hierarchy, e.g.,
data/raw/<taxi_type>/<year>/<month>/
for CSV files, anddata/pq/<taxi_type>/<year>/<month>/
for Parquet files.
3. Downloading Data Using Bash Scripting
Key Concepts:
- Parameterization: The script accepts parameters such as the taxi type (e.g., yellow, green) and the year.
-
Looping Over Months: A
for
loop is used to iterate over all 12 months. -
Formatting Month Numbers: Since the dataset URL requires months in a two-digit format (e.g., "01", "02"), the
printf
command is used:
f_month=$(printf "%02d" $month)
Building URLs: The URL is constructed dynamically by concatenating the URL prefix, taxi type, and formatted month.
-
Downloading and Compressing:
-
wget: Downloads the CSV file using the
O
option to specify the output file. - jzip: Compresses the CSV file immediately after download.
-
wget: Downloads the CSV file using the
Error Handling:
Usingset -e
ensures that if a command (like a download that returns a 404 error) fails, the script stops executing.
Sample Bash Script Outline:
#!/bin/bash
set -e # Exit if any command fails
taxi_type=$1 # e.g., yellow or green
year=$2 # e.g., 2021
url_prefix="https://nyc-taxi-data.s3.amazonaws.com/trip data"
for month in {1..12}; do
# Format month with a leading zero if necessary
f_month=$(printf "%02d" $month)
# Construct URL and local file paths
url="${url_prefix}/${taxi_type}_tripdata_${year}-${f_month}.csv"
local_prefix="data/raw/${taxi_type}/${year}/${f_month}"
local_file="${taxi_type}_tripdata_${year}_${f_month}.csv"
# Create local directory if it doesn't exist
mkdir -p ${local_prefix}
# Download the file and compress it immediately
echo "Downloading ${url} to ${local_prefix}/${local_file}"
wget -O "${local_prefix}/${local_file}" "${url}"
jzip "${local_prefix}/${local_file}"
done
Additional Considerations:
-
Logging: Use
echo
statements to log progress. -
Handling 404 Errors: The script stops at the first non-existent file (e.g., for months with no data) due to
set -e
.
4. Converting CSV to Parquet with Apache Spark
Why Parquet?
- Built-In Schema: Parquet files store schema information along with the data, reducing surprises when merging datasets.
- Efficient Storage & Query Performance: Columnar storage and compression improve performance for analytical queries.
Steps in the Spark Notebook:
-
Starting Spark and Jupyter Notebook:
Ensure your Spark environment is set up correctly with the appropriate environment variables (e.g.,
PYTHONPATH
). -
Reading CSV Data:
-
Using Pandas for Schema Inference:
A small sample of the CSV is read with Pandas to infer the data types.
import pandas as pd green_sample = pd.read_csv("data/raw/green/2021/01/green_tripdata_2021_01.csv", nrows=1000, compression='zip') print(green_sample.dtypes)
-
- **Defining the Schema for Spark:**
The inferred schema is then adjusted and defined explicitly using Spark SQL types:
```python
from pyspark.sql import types as T
green_schema = T.StructType([
T.StructField("pickup_datetime", T.TimestampType(), True),
T.StructField("dropoff_datetime", T.TimestampType(), True),
T.StructField("passenger_count", T.IntegerType(), True),
# ... define other fields accordingly ...
])
```
-
Reading Data with the Defined Schema:
green_df = spark.read.csv("data/raw/green/2021/01/", header=True, schema=green_schema, compression='zip') green_df.show(5)
-
Repartitioning Data:
- For compressed CSVs, Spark might treat the file as a single partition. Repartitioning ensures you leverage multiple cores for parallel processing:
green_df = green_df.repartition(4)
-
Writing Data to Parquet:
- Convert the dataframe to Parquet format, preserving the defined schema:
output_path = "data/pq/green/2021/01/" green_df.write.parquet(output_path)
-
Looping Over Months:
- A loop (using Python) can be implemented to process each month similarly:
for month in range(1, 13): f_month = f"{month:02d}" # Format month as two digits input_path = f"data/raw/green/2021/{f_month}/" output_path = f"data/pq/green/2021/{f_month}/" print(f"Processing month: {f_month}") df = spark.read.csv(input_path, header=True, schema=green_schema, compression='zip').repartition(4) df.write.parquet(output_path)
Additional Spark Considerations:
- Monitoring Spark UI: Use Spark’s web UI to monitor job progress, particularly when repartitioning large files.
- Handling Large Files: Efficient partitioning is key to utilizing available cores and ensuring fast processing.
5. Key Takeaways for Data Engineers
For Beginners:
- Script Automation: Learn basic bash scripting for repetitive tasks like data downloads.
- Schema Importance: Always define and enforce data schemas to avoid type mismatches and data quality issues.
-
Command-Line Tools: Familiarize yourself with utilities such as
wget
,printf
, andmkdir
for automation. -
Data Inspection: Use tools like
zcat
,head
, andtree
to verify data integrity and directory structures.
For Professional Data Engineers:
-
Robust Pipeline Design:
Incorporate error handling (using
set -e
) and logging to build reliable data ingestion pipelines. - Efficient Data Formats: Transitioning to Parquet not only improves query performance but also ensures consistency with built-in schemas.
- Scalability and Parallelism: Leverage Spark’s partitioning features to handle large-scale datasets efficiently.
- Interoperability: Use tools like Pandas to quickly prototype and infer schemas, then enforce them in Spark for production-grade pipelines.
6. Additional Resources
- Apache Spark Documentation: Explore the Spark SQL Guide for more on DataFrame operations and schema definitions.
- NYC Taxi Data: Visit the NYC Taxi & Limousine Commission website for more details on the dataset.
- Bash Scripting Tutorials: There are many free resources online that cover basic and advanced bash scripting techniques.
- Parquet File Format: Learn about the benefits and internals of the Parquet format at the Apache Parquet website.
7. Summary
This study guide covers:
- Automated Data Download: Using a bash script to fetch monthly taxi data files, with proper formatting, error handling, and compression.
- Data Conversion Pipeline: Reading the compressed CSV files, inferring and defining a consistent schema, and converting the data to Parquet format using Apache Spark.
- Best Practices: Emphasizing the importance of schema enforcement, proper partitioning, logging, and the benefits of using efficient file formats like Parquet.
Study Notes 5.3.4 - SQL with Spark
1. Overview
This lesson demonstrates how to use Spark SQL to process and query taxi trip data. In the example, the prepared datasets—green and yellow taxi trip data (for 2020 and 2021)—are first loaded from Parquet files. The session covers:
- Data Loading: Reading data from Parquet files using Spark.
- Data Harmonization: Ensuring both datasets have a common set of columns by selecting intersections and renaming mismatched column names.
- Data Combination: Using DataFrame operations (such as union) to merge datasets.
- Executing SQL Queries: Registering a DataFrame as a temporary table to run SQL queries.
- Result Output: Writing query results back to disk, simulating a data lake scenario.
This workflow is critical in data engineering pipelines when working with heterogeneous data sources that need to be unified before analysis.
2. Setting Up Spark SQL
Creating a Spark Session
- Start a New Notebook: Open a new Jupyter notebook (or your preferred Spark environment).
-
Import Required Modules:
from pyspark.sql import SparkSession from pyspark.sql import functions as F
-
Create a Spark Session:
spark = SparkSession.builder \ .appName("SparkSQLDemo") \ .getOrCreate()
Spark UI: The Spark Master UI becomes available to monitor job execution and task progress.
3. Loading Data from Parquet Files
Reading the Taxi Data
-
Loading Data with Wildcards:
Use wildcards in the file path to read multiple files, accounting for the nested directory structure (year/month). For example:
green_df = spark.read.parquet("data/pq/green/2020/*/*") yellow_df = spark.read.parquet("data/pq/yellow/2020/*/*")
-
Schema Inspection:
UseprintSchema()
to verify data types and ensure consistency.
green_df.printSchema() yellow_df.printSchema()
Additional Context
- Why Parquet? Parquet is a columnar storage format that efficiently compresses data, preserves schema information, and enables faster query execution.
4. Data Harmonization and Preparation
Aligning Column Names
-
Issue:
Green and yellow taxi datasets may have similar data but with different column names (e.g., one dataset might have
lpap_pickup_time
while the other hastpap_pickup_time
). -
Solution:
Use the DataFrame API to rename columns so that both datasets share the same naming conventions. For example:
green_df = green_df.withColumnRenamed("lpap_pickup_time", "pickup_datetime") \ .withColumnRenamed("lpap_dropoff_time", "dropoff_datetime") yellow_df = yellow_df.withColumnRenamed("tpap_pickup_time", "pickup_datetime") \ .withColumnRenamed("tpap_dropoff_time", "dropoff_datetime")
Selecting Common Columns
-
Determine Intersection:
Identify the columns present in both dataframes (using Python’sset
operations) and preserve the order from one of the datasets.
green_columns = set(green_df.columns) yellow_columns = set(yellow_df.columns) common_columns = [col for col in green_df.columns if col in yellow_columns]
Apply Column Selection:
Useselect()
to pick only the common columns from both DataFrames.
Adding a Service Type Indicator
- Purpose: When merging datasets, it’s useful to know which record came from which taxi service.
-
Implementation:
Add a new column using the Spark SQLlit()
function:
green_df = green_df.select(*common_columns).withColumn("service_type", F.lit("green")) yellow_df = yellow_df.select(*common_columns).withColumn("service_type", F.lit("yellow"))
5. Combining Datasets
Union of DataFrames
-
Combine the Data:
Merge the two datasets using the union (or unionAll) operation:
trips_df = green_df.union(yellow_df)
-
Verification:
You can group by theservice_type
column and count records to verify the union:
trips_df.groupBy("service_type").count().show()
6. Querying Data Using Spark SQL
Registering a Temporary Table
-
Step:
Before you can run SQL queries, register the DataFrame as a temporary table (or view):
trips_df.createOrReplaceTempView("trips_data")
-
Running a Query:
Execute SQL queries on this temporary table. For instance, group by certain dimensions:
query = """ SELECT revenue_zone, revenue_month, service_type, COUNT(*) as trip_count, AVG(fare_amount) as avg_fare FROM trips_data GROUP BY revenue_zone, revenue_month, service_type """ result_df = spark.sql(query) result_df.show()
Discussion Points
-
Lazy Evaluation:
Spark builds a logical plan for the query and only executes when an action (like
show()
orwrite()
) is called. - SQL vs. DataFrame API: Spark SQL offers a familiar SQL syntax while leveraging Spark’s distributed processing. It can be beneficial when dealing with complex transformations and aggregations.
7. Writing the Results to Disk
Saving Query Results
-
Writing to Parquet:
Store the query results in Parquet format to ensure efficient storage and schema preservation:
result_df.write.mode("overwrite").parquet("data/report/revenue")
-
Reducing File Count:
If many small files are created, usecoalesce()
to reduce the number of partitions:
result_df.coalesce(1).write.mode("overwrite").parquet("data/report/revenue")
Data Lake Concept:
The result is written back to a storage layer (data lake) that can later be queried directly by Spark or other tools like Presto or Hive.
8. Advanced Considerations and Best Practices
For Beginners
- Understand the Data Pipeline: Learn the importance of data ingestion, cleaning, and transformation. This example illustrates how to prepare raw data for analysis.
- Practice with Spark SQL: Familiarize yourself with creating Spark sessions, reading/writing data, and executing SQL queries within Spark.
For Professional Data Engineers
- Schema Enforcement: Explicitly defining schemas avoids data inconsistencies and type mismatches when combining data from different sources.
- Efficient Data Processing: Use partitioning and proper file formats (like Parquet) to optimize both storage and query performance.
- Monitoring and Debugging: Leverage Spark’s UI to monitor job progress and troubleshoot stages that involve heavy transformations, such as unions and aggregations.
- Integration with Data Lakes: Writing results directly to a data lake allows seamless integration with downstream analytics and BI tools without the need for an intermediate data warehouse.
Additional Context
- Spark SQL Execution: Internally, Spark SQL breaks down queries into stages and tasks. Intermediate results are often written to temporary storage. Understanding this can help in tuning performance.
- When to Use Spark SQL: If you already have a Spark cluster and data stored in a data lake, using Spark SQL for queries is a powerful way to leverage distributed computing for analytics, especially when traditional SQL engines (like Hive or Presto) are not available.
9. Conclusion
In this session, you learned how to:
- Load multiple Parquet files from a data lake.
- Harmonize two datasets by selecting common columns and renaming mismatched ones.
- Combine datasets using union operations while tagging each record with its source.
- Register a DataFrame as a temporary SQL table and execute SQL queries.
- Write the processed results back to disk, optimizing the output by coalescing partitions.
Top comments (0)