Introduction
In the realm of data engineering, few challenges are as exciting and impactful as harnessing the power of weather and air quality data. The AtmoFlow project embarks on a thrilling journey to create a robust, scalable data pipeline that leverages the capabilities of Google Cloud Platform (GCP) to process, analyze, and visualize this critical environmental data. Join us as we dive deep into the technical intricacies and architectural decisions that bring AtmoFlow to life.
The project code is available here on GitHub
GCP Architecture Overview
At the heart of AtmoFlow lies a carefully crafted GCP architecture that enables seamless data ingestion, processing, and storage. The key components include:
- Cloud Functions: Serverless functions that handle data collection from APIs and historical sources.
- Pub/Sub: A reliable messaging system for ingesting streaming data. Cloud Storage: Scalable object storage for storing raw data files.
- Dataproc: Managed Hadoop and Spark clusters for distributed data processing.
- BigQuery: A serverless, highly-scalable data warehouse for storing processed data.
These GCP services work in harmony to create a robust foundation for AtmoFlow's data engineering pipeline.
Project Architecture
AtmoFlow's architecture is designed to handle both batch and streaming data efficiently. Here's a high-level overview:
1. Data Ingestion:
- Cloud Functions collect data from weather and air quality APIs and historical files.
- Streaming data is published to Pub/Sub topics.
- Batch data is stored in Cloud Storage.
2. Data Processing:
- Dataproc clusters are created on-demand using the
DataprocCreateClusterOperator
in the Cloud Composer DAG. - PySpark jobs are submitted to the cluster using the DataprocSubmitJobOperator.
- The PySpark code (
dataproc_stream_batch_pyspark.py
) handles the core data processing logic.
3. Data Storage:
- Processed data is stored in BigQuery tables, partitioned and clustered for optimal querying.
- Fact and dimension tables are created to enable efficient analysis.
4. Orchestration:
- Cloud Composer, built on Apache Airflow, orchestrates the entire pipeline.
- The DAG (
cloud_composer.py
) defines the tasks and their dependencies.
5. Visualization:
- Looker is integrated with BigQuery to create interactive dashboards.
Data Sources and Ingestion
AtmoFlow combines data from multiple sources to create a comprehensive dataset:
- Weather data is fetched from the Open-Meteo API using the
openmeteo-requests
library. - Air quality data is fetched from the Open-Meteo Air Quality API.
- Historical data files are stored in Cloud Storage.
Batch Data Collection
def fetch_air_quality_data(start_date, end_date):
"""Fetch historical air quality data"""
air_quality_params = {
"latitude": LATITUDE,
"longitude": LONGITUDE,
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
"hourly": ["pm10", "pm2_5", "carbon_monoxide", "nitrogen_dioxide",
"sulphur_dioxide", "ozone", "aerosol_optical_depth",
"dust", "uv_index", "us_aqi"]
}
# API request and response handling
# Returns: DataFrame with historical air quality metrics
Streaming Data Collection
def publish_to_topic(topic_path, data):
"""Publish data to a Pub/Sub topic with dead letter handling"""
try:
json_string = json.dumps(data)
data_bytes = json_string.encode('utf-8')
future = publisher.publish(topic_path, data=data_bytes)
future.result()
except Exception as e:
# Dead letter queue handling
error_data = {
"original_data": data,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
dead_letter_future = publisher.publish(
DEAD_LETTER_TOPIC,
data=json.dumps(error_data).encode('utf-8')
)
The batch_cloud_function.py
and streaming_cloud_function.py
scripts handle the data ingestion process. They use the requests-cache
library to optimize API requests and implement retry logic for improved reliability.
Data Processing with Dataproc and PySpark
The heart of AtmoFlow's data processing lies in the dataproc_stream_batch_pyspark.py
script, which runs on Dataproc clusters. The script performs the following key tasks:
- Reads historical data from Cloud Storage using the defined schemas.
- Processes streaming data from Pub/Sub topics.
- Merges batch and streaming data using sliding windows and watermarks.
- Validates data quality using configurable thresholds and required columns.
- Creates dimension tables (e.g., time, location, weather condition, air quality status).
- Creates fact tables with derived metrics and aggregations.
- Writes processed data to BigQuery tables with partitioning and clustering.
Data Quality Monitoring
def validate_data(df, table_name, threshold=10.0, required_columns=None):
"""Enhanced data validation with configurable thresholds"""
total_rows = df.count()
# Validate required columns
if required_columns:
missing_columns = [col for col in required_columns
if col not in df.columns]
if missing_columns:
raise DataQualityError(
f"Missing columns in {table_name}: {missing_columns}"
)
# Check for null values
null_counts = df.select([
sum(col(c).isNull().cast("int")).alias(c)
for c in df.columns
]).collect()[0]
Merging Stream and Batch Data
def merge_batch_and_stream(batch_df, stream_df,
window_duration="1 hour",
watermark_delay="10 minutes"):
"""Merge batch and streaming data with deduplication"""
stream_with_watermark = stream_df.withWatermark(
"timestamp", watermark_delay
)
# Create time windows
windowed_batch = batch_df.withColumn(
"window",
window(col("timestamp"), window_duration)
)
# Combine and deduplicate
merged_df = windowed_batch.unionByName(
windowed_stream,
allowMissingColumns=True
)
The script leverages PySpark's DataFrame API and SQL functions extensively to perform complex transformations and aggregations efficiently.
Dimensional Modeling and Data Warehouse
AtmoFlow employs dimensional modeling techniques to create a structured and optimized data warehouse in BigQuery. The key components include:
-
Fact Tables:
-
AirQualityFact
: Stores air quality measurements with time, location, and status dimensions. -
WeatherFact
: Stores weather measurements with time, location, condition, severity, and season dimensions.
-
-
Dimension Tables:
-
TimeDim
: Represents time hierarchy with various time attributes. -
LocationDim
: Represents location hierarchy with geographic attributes. -
AirQualityStatusDim
: Stores air quality status classifications and descriptions. -
WeatherConditionDim
: Stores weather condition classifications and descriptions. -
SeasonDim
: Represents seasonal characteristics. -
SeverityDim
: Represents severity levels and impact information.
-
The dimension tables are designed to provide rich context and enable efficient querying and analysis of the fact data.
Data Harmonization
To prepare the data for machine learning applications, AtmoFlow creates a harmonized dataset that combines weather and air quality features. The create_harmonized_data
function in the PySpark script joins the relevant data sources and performs feature engineering tasks, such as:
- Adding derived features like air density and dew point.
- Normalizing and scaling numerical features.
- Encoding categorical variables.
def create_harmonized_data(weather_df, air_quality_df):
"""Create harmonized dataset combining weather and air quality metrics."""
try:
# Add location key to both datasets
location_key = hash(f"{config['LATITUDE']}_{config['LONGITUDE']}")
# Select and rename weather features
weather_features = (weather_df
.withColumn("location_key", lit(location_key))
.select(
...
))
# Select air quality features
air_quality_features = (air_quality_df
.withColumn("location_key", lit(location_key))
.select(
...
))
# Combine features and add derived metrics
harmonized_df = (weather_features
.join(air_quality_features, ["timestamp", "location_key"], "outer")
.withColumn("air_density", # Calculate air density
col("surface_pressure") / (287.05 * (col("temperature_2m") + 273.15)))
.withColumn("dew_point", # Calculate dew point
col("temperature_2m") - ((100 - col("relative_humidity_2m")) / 5)))
return harmonized_df
The resulting harmonized dataset is stored in BigQuery and can be easily consumed by machine learning pipelines.
Orchestration with Cloud Composer
Cloud Composer, based on Apache Airflow, serves as the orchestration layer for AtmoFlow. The cloud_composer.py
script defines the DAG that manages the entire pipeline. The key tasks include:
- Triggering the batch and streaming data collection Cloud Functions.
- Creating a Dataproc cluster with the specified configuration.
- Submitting the PySpark job to the Dataproc cluster.
- Deleting the Dataproc cluster after the job completion.
Cloud Composer allows for easy scheduling, monitoring, and management of the pipeline, ensuring smooth execution and enabling data freshness.
Data Visualization and Dashboarding
To unlock the full potential of the processed data, AtmoFlow integrates with Looker for data visualization and dashboarding.
Looker connects directly to the BigQuery tables, allowing users to create interactive and insightful visualizations. Key metrics and trends related to weather patterns, air quality levels, and their impact on various factors can be easily explored and analyzed.
This is the link to the Looker Dashboard.
Challenges and Lessons Learned
Building a robust data pipeline like AtmoFlow comes with its fair share of challenges. Some key lessons learned include:
Complex Data Processing Architecture:
- Challenge: The system needed to handle both historical weather data (batch) and real-time updates (streaming) while maintaining data consistency and accuracy.
- Solution: Built a dual-pipeline system using PySpark that processes both batch and streaming data. Implemented time-based windows to organize data arrival and created a deduplication system to ensure data integrity.
def merge_batch_and_stream(batch_df, stream_df):
# Add time tracking for late data
stream_with_watermark = stream_df.withWatermark("timestamp", "10 minutes")
# Combine streams and remove duplicates
merged_df = batch_df.unionByName(stream_with_watermark)
deduplicated_df = merged_df.dropDuplicates(["timestamp"])
Data Quality Management:
- Challenge: Weather and air quality data frequently contained missing values, incorrect readings, or arrived late. The system required robust validation to ensure data reliability.
- Solution: Implemented configurable validation rules, built comprehensive error tracking, and developed automatic data cleaning procedures with alert systems for quality issues.
def validate_data(df, table_name, threshold=10.0):
# Monitor data quality with null checks
null_counts = df.select([
sum(col(c).isNull().cast("int")).alias(c)
for c in df.columns
])
# Raise alerts for quality issues
if null_percentage > threshold:
raise DataQualityError(f"Missing value threshold exceeded in {table_name}")
BigQuery Optimization:
- Challenge: Growing data volumes led to increased query latency and costs. The system needed optimization for both performance and cost-effectiveness.
- Solution: Implemented strategic partitioning and clustering in BigQuery tables. Designed efficient table structures and query patterns to minimize resource usage.
CREATE OR REPLACE TABLE weather_air_quality.WeatherFact
PARTITION BY DATE(timestamp)
CLUSTER BY location_key, severity_key
AS
SELECT * FROM staging_weather_data;
Complex Data Harmonization:
- Challenge: The project required combining weather and air quality data from different formats into a unified, analyzable dataset.
- Solution: Developed a standardized data format and built a harmonization system that combines multiple data sources while adding derived metrics for enhanced analysis.
def create_harmonized_data(weather_df, air_quality_df):
# Combine different data sources
harmonized_df = weather_df.join(
air_quality_df,
["timestamp", "location_key"]
)
# Add calculated metrics
harmonized_df = harmonized_df.withColumn(
"air_quality_index",
calculate_aqi(col("pm25"), col("pm10"))
)
Conclusion
AtmoFlow showcases the power and potential of data engineering in the cloud, specifically leveraging GCP's robust suite of tools and services. By harnessing the capabilities of Cloud Functions, Pub/Sub, Dataproc, BigQuery, and Cloud Composer, AtmoFlow creates a seamless end-to-end pipeline for processing and analyzing weather and air quality data.
Happy data engineering!๐
Top comments (0)