DEV Community

Rahul Reddy Talatala
Rahul Reddy Talatala

Posted on

AtmoFlow: Breathing Life into Data - Real Time Weather and Air Quality Insights

Introduction

In the realm of data engineering, few challenges are as exciting and impactful as harnessing the power of weather and air quality data. The AtmoFlow project embarks on a thrilling journey to create a robust, scalable data pipeline that leverages the capabilities of Google Cloud Platform (GCP) to process, analyze, and visualize this critical environmental data. Join us as we dive deep into the technical intricacies and architectural decisions that bring AtmoFlow to life.

The project code is available here on GitHub

GCP Architecture Overview

At the heart of AtmoFlow lies a carefully crafted GCP architecture that enables seamless data ingestion, processing, and storage. The key components include:

  • Cloud Functions: Serverless functions that handle data collection from APIs and historical sources.
  • Pub/Sub: A reliable messaging system for ingesting streaming data. Cloud Storage: Scalable object storage for storing raw data files.
  • Dataproc: Managed Hadoop and Spark clusters for distributed data processing.
  • BigQuery: A serverless, highly-scalable data warehouse for storing processed data.

These GCP services work in harmony to create a robust foundation for AtmoFlow's data engineering pipeline.

Project Architecture

AtmoFlow's architecture is designed to handle both batch and streaming data efficiently. Here's a high-level overview:

Image description

1. Data Ingestion:

  • Cloud Functions collect data from weather and air quality APIs and historical files.
  • Streaming data is published to Pub/Sub topics.
  • Batch data is stored in Cloud Storage.

2. Data Processing:

  • Dataproc clusters are created on-demand using the DataprocCreateClusterOperator in the Cloud Composer DAG.
  • PySpark jobs are submitted to the cluster using the DataprocSubmitJobOperator.
  • The PySpark code (dataproc_stream_batch_pyspark.py) handles the core data processing logic.

3. Data Storage:

  • Processed data is stored in BigQuery tables, partitioned and clustered for optimal querying.
  • Fact and dimension tables are created to enable efficient analysis.

4. Orchestration:

  • Cloud Composer, built on Apache Airflow, orchestrates the entire pipeline.
  • The DAG (cloud_composer.py) defines the tasks and their dependencies.

5. Visualization:

  • Looker is integrated with BigQuery to create interactive dashboards.

Data Sources and Ingestion

AtmoFlow combines data from multiple sources to create a comprehensive dataset:

  • Weather data is fetched from the Open-Meteo API using the openmeteo-requests library.
  • Air quality data is fetched from the Open-Meteo Air Quality API.
  • Historical data files are stored in Cloud Storage.

Batch Data Collection

def fetch_air_quality_data(start_date, end_date):
    """Fetch historical air quality data"""
    air_quality_params = {
        "latitude": LATITUDE,
        "longitude": LONGITUDE,
        "start_date": start_date.strftime("%Y-%m-%d"),
        "end_date": end_date.strftime("%Y-%m-%d"),
        "hourly": ["pm10", "pm2_5", "carbon_monoxide", "nitrogen_dioxide",
                  "sulphur_dioxide", "ozone", "aerosol_optical_depth",
                  "dust", "uv_index", "us_aqi"]
    }
    # API request and response handling
    # Returns: DataFrame with historical air quality metrics
Enter fullscreen mode Exit fullscreen mode

Streaming Data Collection

def publish_to_topic(topic_path, data):
    """Publish data to a Pub/Sub topic with dead letter handling"""
    try:
        json_string = json.dumps(data)
        data_bytes = json_string.encode('utf-8')
        future = publisher.publish(topic_path, data=data_bytes)
        future.result()
    except Exception as e:
        # Dead letter queue handling
        error_data = {
            "original_data": data,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }
        dead_letter_future = publisher.publish(
            DEAD_LETTER_TOPIC, 
            data=json.dumps(error_data).encode('utf-8')
        )
Enter fullscreen mode Exit fullscreen mode

The batch_cloud_function.py and streaming_cloud_function.py scripts handle the data ingestion process. They use the requests-cache library to optimize API requests and implement retry logic for improved reliability.

Data Processing with Dataproc and PySpark

The heart of AtmoFlow's data processing lies in the dataproc_stream_batch_pyspark.py script, which runs on Dataproc clusters. The script performs the following key tasks:

  1. Reads historical data from Cloud Storage using the defined schemas.
  2. Processes streaming data from Pub/Sub topics.
  3. Merges batch and streaming data using sliding windows and watermarks.
  4. Validates data quality using configurable thresholds and required columns.
  5. Creates dimension tables (e.g., time, location, weather condition, air quality status).
  6. Creates fact tables with derived metrics and aggregations.
  7. Writes processed data to BigQuery tables with partitioning and clustering.

Data Quality Monitoring

def validate_data(df, table_name, threshold=10.0, required_columns=None):
    """Enhanced data validation with configurable thresholds"""
    total_rows = df.count()

    # Validate required columns
    if required_columns:
        missing_columns = [col for col in required_columns 
                         if col not in df.columns]
        if missing_columns:
            raise DataQualityError(
                f"Missing columns in {table_name}: {missing_columns}"
            )

    # Check for null values
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c) 
        for c in df.columns
    ]).collect()[0]
Enter fullscreen mode Exit fullscreen mode

Merging Stream and Batch Data

def merge_batch_and_stream(batch_df, stream_df, 
                          window_duration="1 hour",
                          watermark_delay="10 minutes"):
    """Merge batch and streaming data with deduplication"""
    stream_with_watermark = stream_df.withWatermark(
        "timestamp", watermark_delay
    )

    # Create time windows
    windowed_batch = batch_df.withColumn(
        "window",
        window(col("timestamp"), window_duration)
    )

    # Combine and deduplicate
    merged_df = windowed_batch.unionByName(
        windowed_stream,
        allowMissingColumns=True
    )
Enter fullscreen mode Exit fullscreen mode

The script leverages PySpark's DataFrame API and SQL functions extensively to perform complex transformations and aggregations efficiently.

Dimensional Modeling and Data Warehouse

Facts and Dimensions

AtmoFlow employs dimensional modeling techniques to create a structured and optimized data warehouse in BigQuery. The key components include:

  • Fact Tables:

    • AirQualityFact: Stores air quality measurements with time, location, and status dimensions.
    • WeatherFact: Stores weather measurements with time, location, condition, severity, and season dimensions.
  • Dimension Tables:

    • TimeDim: Represents time hierarchy with various time attributes.
    • LocationDim: Represents location hierarchy with geographic attributes.
    • AirQualityStatusDim: Stores air quality status classifications and descriptions.
    • WeatherConditionDim: Stores weather condition classifications and descriptions.
    • SeasonDim: Represents seasonal characteristics.
    • SeverityDim: Represents severity levels and impact information.

The dimension tables are designed to provide rich context and enable efficient querying and analysis of the fact data.

Data Harmonization

To prepare the data for machine learning applications, AtmoFlow creates a harmonized dataset that combines weather and air quality features. The create_harmonized_data function in the PySpark script joins the relevant data sources and performs feature engineering tasks, such as:

  • Adding derived features like air density and dew point.
  • Normalizing and scaling numerical features.
  • Encoding categorical variables.
def create_harmonized_data(weather_df, air_quality_df):
    """Create harmonized dataset combining weather and air quality metrics."""
    try:
        # Add location key to both datasets
        location_key = hash(f"{config['LATITUDE']}_{config['LONGITUDE']}")

        # Select and rename weather features
        weather_features = (weather_df
            .withColumn("location_key", lit(location_key))
            .select(
                ...
            ))

        # Select air quality features
        air_quality_features = (air_quality_df
            .withColumn("location_key", lit(location_key))
            .select(
               ...
            ))

        # Combine features and add derived metrics
        harmonized_df = (weather_features
            .join(air_quality_features, ["timestamp", "location_key"], "outer")
            .withColumn("air_density",  # Calculate air density
                col("surface_pressure") / (287.05 * (col("temperature_2m") + 273.15)))
            .withColumn("dew_point",  # Calculate dew point
                col("temperature_2m") - ((100 - col("relative_humidity_2m")) / 5)))

        return harmonized_df
Enter fullscreen mode Exit fullscreen mode

The resulting harmonized dataset is stored in BigQuery and can be easily consumed by machine learning pipelines.

Orchestration with Cloud Composer

Cloud Composer, based on Apache Airflow, serves as the orchestration layer for AtmoFlow. The cloud_composer.py script defines the DAG that manages the entire pipeline. The key tasks include:

  1. Triggering the batch and streaming data collection Cloud Functions.
  2. Creating a Dataproc cluster with the specified configuration.
  3. Submitting the PySpark job to the Dataproc cluster.
  4. Deleting the Dataproc cluster after the job completion.

Cloud Composer DAG

Cloud Composer allows for easy scheduling, monitoring, and management of the pipeline, ensuring smooth execution and enabling data freshness.

Data Visualization and Dashboarding

To unlock the full potential of the processed data, AtmoFlow integrates with Looker for data visualization and dashboarding.

Looker Studio Dashboard UI

Looker connects directly to the BigQuery tables, allowing users to create interactive and insightful visualizations. Key metrics and trends related to weather patterns, air quality levels, and their impact on various factors can be easily explored and analyzed.

This is the link to the Looker Dashboard.

Challenges and Lessons Learned

Building a robust data pipeline like AtmoFlow comes with its fair share of challenges. Some key lessons learned include:

Complex Data Processing Architecture:

  • Challenge: The system needed to handle both historical weather data (batch) and real-time updates (streaming) while maintaining data consistency and accuracy.
  • Solution: Built a dual-pipeline system using PySpark that processes both batch and streaming data. Implemented time-based windows to organize data arrival and created a deduplication system to ensure data integrity.
def merge_batch_and_stream(batch_df, stream_df):
    # Add time tracking for late data
    stream_with_watermark = stream_df.withWatermark("timestamp", "10 minutes")

    # Combine streams and remove duplicates
    merged_df = batch_df.unionByName(stream_with_watermark)
    deduplicated_df = merged_df.dropDuplicates(["timestamp"])
Enter fullscreen mode Exit fullscreen mode

Data Quality Management:

  • Challenge: Weather and air quality data frequently contained missing values, incorrect readings, or arrived late. The system required robust validation to ensure data reliability.
  • Solution: Implemented configurable validation rules, built comprehensive error tracking, and developed automatic data cleaning procedures with alert systems for quality issues.
def validate_data(df, table_name, threshold=10.0):
    # Monitor data quality with null checks
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c) 
        for c in df.columns
    ])

    # Raise alerts for quality issues
    if null_percentage > threshold:
        raise DataQualityError(f"Missing value threshold exceeded in {table_name}")
Enter fullscreen mode Exit fullscreen mode

BigQuery Optimization:

  • Challenge: Growing data volumes led to increased query latency and costs. The system needed optimization for both performance and cost-effectiveness.
  • Solution: Implemented strategic partitioning and clustering in BigQuery tables. Designed efficient table structures and query patterns to minimize resource usage.
CREATE OR REPLACE TABLE weather_air_quality.WeatherFact
PARTITION BY DATE(timestamp)
CLUSTER BY location_key, severity_key
AS
SELECT * FROM staging_weather_data;
Enter fullscreen mode Exit fullscreen mode

Complex Data Harmonization:

  • Challenge: The project required combining weather and air quality data from different formats into a unified, analyzable dataset.
  • Solution: Developed a standardized data format and built a harmonization system that combines multiple data sources while adding derived metrics for enhanced analysis.
def create_harmonized_data(weather_df, air_quality_df):
    # Combine different data sources
    harmonized_df = weather_df.join(
        air_quality_df,
        ["timestamp", "location_key"]
    )

    # Add calculated metrics
    harmonized_df = harmonized_df.withColumn(
        "air_quality_index",
        calculate_aqi(col("pm25"), col("pm10"))
    )
Enter fullscreen mode Exit fullscreen mode

Conclusion

AtmoFlow showcases the power and potential of data engineering in the cloud, specifically leveraging GCP's robust suite of tools and services. By harnessing the capabilities of Cloud Functions, Pub/Sub, Dataproc, BigQuery, and Cloud Composer, AtmoFlow creates a seamless end-to-end pipeline for processing and analyzing weather and air quality data.

Happy data engineering!๐Ÿš€

Built with โค๏ธ by Rahul
LinkedIn | GitHub | Portfolio

Top comments (0)