DEV Community

Cover image for Data-aware Scheduling in Airflow: A Practical Guide with DAG Factory
Cícero Joasyo Mateus de Moura for AWS Community Builders

Posted on • Edited on

Data-aware Scheduling in Airflow: A Practical Guide with DAG Factory

I recently contribute to an open-source project called DAG Factory, library for building Airflow DAGs declaratively using YAML files, eliminating the need for Python coding.

My contribution was to add support for the Data-aware Scheduling (Datasets) functionality of Airflow, which was introduced starting from version 2.4 (at the time of writing this article, Airflow is at version 2.6.3).

The aim here is to talk about the Datasets functionality in Airflow, introduce the DAG Factory library, and create a practical example using both.

You can access the repository with the code used in this article here.

What are Datasets in Airflow?

Data-aware Scheduling allows creating DAGs linked to files, whether local or remote, to trigger data processing based on the modification of one or multiple files, known as datasets.

Datasets help resolve the problem of data dependency between DAGs, which occurs when one DAG needs to consume data from another for analysis or further processing. They enable a more intelligent and visible scheduling with an explicit dependency between DAGs.

Basically, there are two fundamental concepts in Airflow's Datasets:

  • DAG Producer: this DAG creates or updates one or more datasets, accomplished through tasks using a parameter called outlets, to specify a particular dataset.

  • DAG Consumer: this DAG consumes one or more datasets and will be scheduled and triggered as soon as all datasets are successfully created or updated by the DAG Producer. The scheduling is done using the schedule directly in the DAG configuration.

Currently, there are two ways to schedule DAGs in Airflow: either by a recurrent schedule (cron, timedelta, timetable, etc.) or through one or multiple datasets. It's important to note that we cannot use both scheduling methods in a single DAG, only one in each DAG.

DAG Factory: Building DAGs with YAML

DAG Factory is a community library that allows configuring Airflow to generate DAGs from one or multiple YAML files.

The library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it easy to create and customize new functionalities.

The community around this library is highly engaged, making it worth exploring =)

Practical Use of Datasets

In this article, we'll work with the following scenario:

We need to build a pipeline that downloads data from an API and saves the results to Amazon S3. After successfully extracting and saving the data, we need to process it. Hence, we'll have another pipeline that will be triggered based on the create or update of the data.

Image description

The infrastructure to run Airflow and reproduce the example in this article can be found here.

The first step is to build the pipeline that extracts and saves the data to S3.

Producer DAG for Data

This pipeline consists of two tasks to extract data from the public PokeAPI and another two tasks to save the data to S3.

The tasks that extract data from the API using the SimpleHttpOperator, and the tasks that save the data to S3 use the S3CreateObjectOperator.

Since we'll be using YAML to build our DAGs, the following code constructs this first DAG with all its tasks.



download_data_api_dataset_producer_dag:
  description: "Example DAG producer custom config datasets"
  schedule_interval: "0 5 * * *"
  task_groups:
    extract_data:
      tooltip: "this is a task group"
    save_data:
      tooltip: "this is a task group"
  tasks:
    start_process:
      operator: airflow.operators.dummy.DummyOperator
    get_items_data:
      operator: airflow.providers.http.operators.http.SimpleHttpOperator
      method: "GET"
      http_conn_id: "poke_api"
      endpoint: "item/1"
      task_group_name: extract_data
      dependencies: [start_process]
    save_items_data:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
      aws_conn_id: aws_default
      s3_bucket: cjmm-datalake-raw
      s3_key: "poke_api/item/data_{{ ts }}.json"
      data: "{{ ti.xcom_pull(task_ids='get_items_data') }}"
      dependencies: [get_items_data]
      task_group_name: save_data
      outlets:
        file: /opt/airflow/dags/dags_config/datasets_config.yml
        datasets: ['dataset_poke_items']
    get_items_attribute_data:
      operator: airflow.providers.http.operators.http.SimpleHttpOperator
      method: "GET"
      http_conn_id: "poke_api"
      endpoint: "item-attribute/1"
      dependencies: [start_process]
      task_group_name: extract_data
    save_items_attribute_data:
        operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
        aws_conn_id: aws_default
        s3_bucket: cjmm-datalake-raw
        s3_key: "poke_api/items_attribute/data_{{ ts }}.json"
        data: "{{ ti.xcom_pull(task_ids='get_items_attribute_data') }}"
        dependencies: [get_items_attribute_data]
        task_group_name: save_data
        outlets:
          file: /opt/airflow/dags/dags_config/datasets_config.yml
          datasets: ['dataset_poke_items_attribute']


Enter fullscreen mode Exit fullscreen mode

A highlight is the configuration of the Datasets, done through the outlets tag added to the tasks save_items_data and save_items_attribute_data.



outlets:
   file: /opt/airflow/dags/dags_config/datasets_config.yml
   datasets: ['dataset_poke_items_attribute']


Enter fullscreen mode Exit fullscreen mode

In this configuration, we specify the path of the file, where all Datasets are centrally declared for reuse, and the names of the datasets contained in the file for use.

Below is the datasets_config.yml file used in this example, containing the Dataset's name (used only in Airflow) and the URI, which is the path where the current file is stored, in this case, Amazon S3.



datasets:
  - name: dataset_poke_items_attribute
    uri: s3://cjmm-datalake-raw/poke_api/items_attribute/*.json
  - name: dataset_poke_items
    uri: s3://cjmm-datalake-raw/poke_api/items/*.json


Enter fullscreen mode Exit fullscreen mode

The resulting DAG visualization in Airflow will look like this:

Image description

Consumer DAG for Data

Now, let's build the DAG that consumes the data, which performs the processing and handling of the datasets.

The DAG is scheduled based on datasets, not on an execution time, so it will only be triggered when all the datasets it depends on are updated.

Currently, we cannot use two types of scheduling simultaneously; it's either through a schedule interval or datasets.

In this example, we'll only build a DAG with PythonOperator, simulating the consumption and processing of the produced data.

Below is the configuration file for the consumer DAG:



process_data_api_dataset_consumer_dag:
  description: "Example DAG consumer custom config datasets"
  schedule:
    file: /opt/airflow/dags/dags_config/datasets_config.yml
    datasets: ['dataset_poke_items', 'dataset_poke_items_attribute']
  tasks:
    start_process:
      operator: airflow.operators.dummy.DummyOperator
    process_data:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: process_data_function
      python_callable_file: /opt/airflow/dags/process_data.py
      task_group_name: etl_data
      provide_context: true
      dependencies: [start_process]


Enter fullscreen mode Exit fullscreen mode

A highlight is the configuration of the schedule based on datasets, which is similar to the configuration of the outlets in the producer DAG:



schedule:
   file: /opt/airflow/dags/dags_config/datasets_config.yml
   datasets: ['dataset_poke_items', 'dataset_poke_items_attribute']


Enter fullscreen mode Exit fullscreen mode

The resulting DAG visualization in Airflow will be as follows:

Image description

Overview of DAGs with Datasets

When we have DAGs using Airflow's datasets, we can observe some interesting points:

  • The consumer DAG in the list of all DAGs is flagged to indicate scheduling based on datasets.

Image description

  • There is a specific visualization in the Airflow menu called Datasets, you can check the configured datasets, the dependencies between DAGs, and the log of dataset creation, update, and consumption.

Image description

  • The DAG Dependencies visualization shows the relationships between the DAGs, providing a helpful overview of the processing mesh and data dependencies.

Image description

Important Points about Datasets

The functionality of Datasets in Airflow is still recent, and there are many improvements in the community backlog. However, I would like to highlight some points at this moment:

  • Currently, Airflow's dataset functionality does not directly inspect the physical file itself. Instead, it schedules the consumer pipeline directly through the database, almost like an implicit DAG Trigger.

  • Considering the previous point, it's better to use a Sensor if you genuinely need to "see and access" the data when triggering the DAG Consumer.

  • The official documentation does not recommend using regular expressions in the URI of datasets. However, in my tests, I didn't encounter any issues with this, as the functionality doesn't yet look directly at the physical file.

  • Since the DAG Consumer doesn't have a specific schedule, it's challenging to measure if it was triggered at a planned time, making it difficult to define an SLA. A more refined monitoring approach is needed to avoid missing critical scheduling.

Conclusion

By using the DAG Factory library, we simplify the process of creating and configuring new DAGs, leveraging the extensibility provided by the library's open-source code.

Airflow's Datasets enable more efficient scheduling by triggering DAGs only when necessary data is available, avoiding unnecessary and delayed executions.

I hope this article has been useful in understanding Airflow's Datasets functionality and how to apply it to your projects. With this approach, you can build more robust and efficient pipelines, fully utilizing Airflow's potential.

Follow me:
LinkedIn: https://www.linkedin.com/in/cicero-moura/
Github: https://github.com/cicerojmm

Top comments (0)