DEV Community

Cover image for Study Notes 2.2.3: Setting Up an ETL Pipeline with Kestra and Postgres
Pizofreude
Pizofreude

Posted on

Study Notes 2.2.3: Setting Up an ETL Pipeline with Kestra and Postgres

Introduction

This study note covers the key steps to set up an ETL (Extract, Transform, Load) pipeline using Kestra with Postgres for New York taxi data. The dataset includes information for both yellow and green taxis from 2019 to July 2021. The workflow is designed to automate the monthly data processing.


1. Overview of Kestra ETL Workflow

The pipeline follows a structured ETL approach:

  • Extract: Retrieve raw data from GitHub.
  • Transform: Process and clean the data.
  • Load: Store the processed data into a Postgres database.

The workflow is designed with dynamic parameters to allow users to process specific months and taxi types efficiently.


2. Setting Up Input Parameters

To make the workflow flexible, input parameters are introduced:

  • Taxi Type: Users can choose between yellow or green taxis.
  • Year and Month: Users can specify a particular year and month to process data accordingly.

This ensures that the workflow can dynamically adjust filenames and database table names based on the selected parameters.


3. Extracting Data from GitHub

  • The workflow uses a shell command to download and unzip data files directly from GitHub.
  • Example of a command used:

    wget -O data.zip "<GitHub_URL>" && unzip data.zip -d /tmp/
    
    
  • The extracted files are stored in a temporary directory for further processing.


4. Setting Up the Postgres Database

4.1 Creating the Postgres Instance

  • The Postgres database is created using Docker.
  • The connection details are configured for Kestra to interact with the database.

4.2 Creating Database Tables

  • SQL queries are written to create tables for yellow and green taxi data.
  • Unique IDs are generated using MD5 hashing to prevent duplicate records.
  • Example of SQL query:

    CREATE TABLE IF NOT EXISTS green_taxi (
        id SERIAL PRIMARY KEY,
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count INT,
        trip_distance FLOAT,
        fare_amount FLOAT,
        hash_id TEXT UNIQUE
    );
    
    

5. Transforming and Loading Data

  • The extracted data is processed to ensure it matches the expected schema.
  • The workflow differentiates between yellow and green taxis, processing them separately.
  • Data is inserted into Postgres tables, ensuring no duplicate records are added.
  • Example of an insert query:

    INSERT INTO green_taxi (pickup_datetime, dropoff_datetime, passenger_count, trip_distance, fare_amount, hash_id)
    SELECT pickup_datetime, dropoff_datetime, passenger_count, trip_distance, fare_amount,
           MD5(CONCAT(pickup_datetime, dropoff_datetime, passenger_count)) AS hash_id
    FROM staging_green_taxi;
    
    

6. Optimizing the Pipeline

6.1 Purging Execution Output Files

  • To manage storage efficiently, a purge task is added to remove unnecessary execution output files.
  • This ensures that only essential data is stored.

6.2 Handling Large Datasets

  • Since yellow taxi datasets are significantly larger, cloud solutions may be required for better performance.
  • Using cloud storage (e.g., AWS S3, GCP Storage) is recommended for handling large-scale data efficiently.

7. Final Adjustments and Testing

  • The workflow is tested for different months and taxi types to ensure accuracy.
  • The database is checked to confirm successful data insertion.
  • Adjustments are made to improve performance and flexibility of the pipeline.

Conclusion

This study note outlines the process of setting up a robust ETL pipeline using Kestra and Postgres for processing New York taxi data. By utilizing dynamic parameters, shell scripts, SQL transformations, and optimizations, the pipeline efficiently handles both yellow and green taxi data while ensuring scalability and flexibility.

Top comments (0)