DEV Community

Cover image for Study Note 2.2.6: ETL Pipelines in Kestra on Google Cloud Platform
Pizofreude
Pizofreude

Posted on

Study Note 2.2.6: ETL Pipelines in Kestra on Google Cloud Platform

Overview

In this session, it demonstrated how to take an existing ETL pipeline and move it over to Google Cloud using Google Cloud Storage (GCS) and BigQuery. The goal is to extract data from a CSV file, upload it to GCS, and then use BigQuery to create tables and process the data. This allows for handling larger datasets efficiently using cloud resources.

Steps to Set Up on Google Cloud

  1. Google Cloud Setup:
    • Set up Google Cloud Service Accounts.
    • Obtain the project ID, location region, and bucket name.
  2. Key Value Store in Kestra:
    • Utilize the key value store in Kestra to manage environment variables.
    • Store sensitive information such as service accounts and project IDs securely.
  3. Creating a Google Cloud Project:
    • Create a new project in Google Cloud.
    • Generate a service account with the required roles (e.g., Storage Admin, BigQuery Admin).
  4. Service Account Key:
    • Create a JSON key for the service account and download it.
    • Add the JSON key to Kestra’s key value store.

Workflow in Kestra

  1. Uploading CSV to GCS:
    • Extract data from a CSV file.
    • Upload the CSV file to GCS.
  2. Creating BigQuery Tables:
    • Create a main table in BigQuery using the schema from the CSV file.
    • Create a staging table to temporarily store data before merging it into the main table.
  3. Processing Data in BigQuery:
    • Load data from the CSV file into the staging table.
    • Add unique values and merge the data from the staging table into the main table.
  4. Handling Different Data Sets:
    • Use conditional logic in Kestra to handle different data sets (e.g., green and yellow taxi data).
    • Run tasks based on the selected data set and schema.

Practical Examples

  1. Upload CSV to GCS:

    # Sample task to upload CSV to GCS
    upload_to_gcs:
        type: gcs.upload
        bucket: "{{bucket_name}}"
        key: "data/{{year}}/{{month}}/{{taxi_type}}.csv"
        source: "local_path_to_csv"
    
    
  2. Create Main Table in BigQuery:

    CREATE TABLE `project_id.dataset.main_table` (
        unique_id STRING,
        file_name STRING,
        column1 TYPE,
        column2 TYPE,
        ...
    )
    
    
  3. Merge Staging Table Data:

    MERGE INTO `project_id.dataset.main_table` AS main
    USING `project_id.dataset.staging_table` AS staging
    ON main.unique_id = staging.unique_id
    WHEN MATCHED THEN
        UPDATE SET main.column1 = staging.column1, ...
    WHEN NOT MATCHED THEN
        INSERT (unique_id, file_name, column1, ...)
        VALUES (staging.unique_id, staging.file_name, staging.column1, ...)
    
    

Key Points

  • Google Cloud Storage (GCS): Used to store unstructured data like CSV files.
  • BigQuery: A data warehouse for storing and processing structured data.
  • Kestra: An orchestration tool used to manage workflows and automate data pipelines.
  • Conditional Logic: Allows handling different data sets with varying schemas efficiently.
  • Key Value Store: Manages environment variables and sensitive data securely.

Future Steps

  • Scheduling and Backfills:
    • Implement schedules to automatically run workflows as new data arrives.
    • Use backfills to process historical data and ensure the pipeline works for past data.

Conclusion

This session provides a comprehensive guide on setting up and running ETL pipelines on Google Cloud using Kestra. By leveraging GCS and BigQuery, we can handle larger datasets efficiently and automate data processing workflows. The use of conditional logic and key value stores in Kestra ensures flexibility and security in managing different data sets and sensitive information.

Top comments (0)