DEV Community

Cover image for Declarative Data Pipelines: Moving from Code to Configuration
Jonathan Bhaskar
Jonathan Bhaskar

Posted on

Declarative Data Pipelines: Moving from Code to Configuration

In data teams today, writing Python code to create data pipelines has become second nature, especially for analytics workloads. Creating new DAGs, tasks, and operators in Airflow - now the industry standard for data orchestration - is just part of our daily routine. But what if there's a simpler, more accessible approach to building these pipelines?

Before we explore this alternative, let's examine how another domain - DevOps - evolved its approach to building and shipping software, and see what lessons we can apply to data orchestration.

Evolution of Infrastructure Management

Let's examine a simple but common task: deploying a web application with a database. The way teams handled this task evolved dramatically over time.

Timeline showing evolution from Shell scripts (2000) to Infrastructure as code (2016)

Shell scripts (2000s)

In the early 2000s, deploying applications meant writing detailed shell scripts that listed every command needed. These scripts were brittle, hard to maintain, and required deep system knowledge:

#!/bin/bash

# Install dependencies
apt-get update
apt-get install -y nginx mysql-server

# Configure MySQL
mysql -u root -e "CREATE DATABASE myapp;"
mysql -u root -e "CREATE USER 'myapp'@'localhost';"
mysql -u root -e "GRANT ALL PRIVILEGES ON myapp.*;"

# Deploy application
cd /var/www/html
tar -xzf myapp.tar.gz
chown -R www-data:www-data /var/www/html

# Start services
service mysql start
service nginx start
Enter fullscreen mode Exit fullscreen mode

Configuration Management (2010s)

By 2010, tools like Puppet introduced a paradigm shift. Instead of listing commands, teams defined their desired system state in a declarative format. The tool would figure out how to achieve that state:

package { ['nginx', 'mysql-server']:
  ensure => installed,
}

service { 'nginx':
  ensure  => running,
  enable  => true,
  require => Package['nginx'],
}

service { 'mysql':
  ensure  => running,
  enable  => true,
  require => Package['mysql-server'],
}

exec { 'create-db':
  command => 'mysql -u root -e "CREATE DATABASE myapp;"',
  unless  => 'mysql -u root -e "SHOW DATABASES;" | grep myapp',
  require => Service['mysql'],
}

file { '/var/www/html/myapp':
  ensure  => directory,
  source  => 'puppet:///modules/myapp/files',
  recurse => true,
  owner   => 'www-data',
  group   => 'www-data',
}
Enter fullscreen mode Exit fullscreen mode

Infrastructure as Code (2016+)

Cloud platforms like AWS took this declarative approach even further. With CloudFormation, engineers simply specified the resources they needed, and AWS handled all implementation details:

AWSTemplateFormatVersion: '2010-09-09'
Resources:
  WebServer:
    Type: AWS::EC2::Instance
    Properties:
      InstanceType: t2.micro
      ImageId: ami-0c55b159cbfafe1f0
      UserData: 
        Fn::Base64: !Sub |
          #!/bin/bash
          yum update -y
          yum install -y nginx

  Database:
    Type: AWS::RDS::DBInstance
    Properties:
      Engine: mysql
      DBName: myapp
      MasterUsername: admin
      MasterUserPassword: password
      DBInstanceClass: db.t2.micro

  SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Allow web traffic
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 0.0.0.0/0
Enter fullscreen mode Exit fullscreen mode

The difference over the years is that, instead of actually writing the steps that needed to be executed (imperative), the engineers moved on to specifying what they wanted and let the system figure out how it was done (declarative).

Flow chart comparing Shell script vs Cloud Formation approaches

Evolution of Infrastructure Teams

This shift from imperative to declarative approaches fundamentally changed how infrastructure teams operated. Let's look at what this evolution meant in practice.

In the early days, deploying infrastructure involved a lot of back-and-forth between developers and system administrators:

Diagram showing complex Developer-Sysadmin interactions in 2000

With declarative infrastructure, the interaction changed dramatically:

Diagram showing simplified Developer-Infrastructure Engineer interactions in 2016

The key difference was that teams now had a common language - YAML configurations - that both developers and infrastructure engineers could understand and work with effectively. This shift to configuration-driven workflows revolutionized how infrastructure teams operated.

Current state of data pipelines

Data teams today face similar challenges to what infrastructure teams encountered in the 2000s. To understand these parallels, let's examine how data pipelines are typically built, using a common scenario: moving data from S3 to GCS, then loading it into BigQuery - a pattern you might use when you have transactional database backups in S3 but your analytics stack runs on BigQuery.

def transfer_s3_to_gcs(
    s3_bucket,
    s3_key,
    gcs_bucket,
    gcs_object_name,
    aws_conn_id='aws_default',
    gcp_conn_id='google_cloud_default'
):
    # Initialize hooks
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id)

    # Create a temporary file
    with tempfile.NamedTemporaryFile() as temp_file:
        # Download file from S3
        s3_hook.download_file(
            key=s3_key,
            bucket_name=s3_bucket,
            local_path=temp_file.name
        )

        # Upload file to GCS
        gcs_hook.upload(
            bucket_name=gcs_bucket,
            object_name=gcs_object_name,
            filename=temp_file.name
        )

# Create DAG
dag = DAG(
    's3_to_gcs_transfer',
    default_args=default_args,
    description='Transfer files from S3 to GCS',
    schedule_interval='@daily',
    catchup=False
)

# Define the transfer task
transfer_task = PythonOperator(
    task_id='transfer_s3_to_gcs',
    python_callable=transfer_s3_to_gcs,
    op_kwargs={
        's3_bucket': 'your-s3-bucket-name',
        's3_key': 'path/to/your/file.csv',
        'gcs_bucket': 'your-gcs-bucket-name',
        'gcs_object_name': 'path/to/destination/file.csv'
    },
    dag=dag
)

# Load from GCS to BigQuery
load_to_bq = GCSToBigQueryOperator(
    task_id='load_to_bigquery',
    bucket='your-gcs-bucket',
    source_objects=['path/to/destination/file.csv'],
    destination_project_dataset_table='your-project:dataset.table'
)

# Set task dependencies (if you add more tasks)
transfer_task > load_to_bq
Enter fullscreen mode Exit fullscreen mode

When comparing this approach to what we saw with the DevOps workflows, the code is imperative in nature. We tell the orchestrator how to perform each step in the data pipeline, instead of what needs to be done and letting it figure out the how.

Data team interactions

Unsurprisingly, the data team workflows within companies mirror the interactions between developers and sysadmins in the 2000s. When a data scientist or analyst needs a new pipeline, they have to coordinate with data engineers who are often juggling multiple priorities:

Sequence diagram showing data team communications

Reusability

Another significant downside of the current imperative approach in Airflow is the difficulty of code reuse. Looking at Airflow's abstraction layers, we can see why:

Airflow architecture diagram showing layers and components

While Airflow provides hooks and operators that can be reused across DAGs, the tasks themselves must be written directly in the DAG file along with the business logic. Even if we need to create a new DAG that shares most of the same tasks but adds just one additional step, we have to rewrite all the task definitions in the new DAG file.

Modified Airflow diagram showing code duplication issue

The tasks have to be defined in this DAG again, even though the transfer_s3_to_gcs task and the load_to_bigquery task are the same as the previous DAG.

Tight coupling

The real challenge becomes apparent when we need to update implementation details. Consider a scenario where we need to make our transfer process more scalable. The current approach, which might work for smaller files, will fail for file sizes exceeding the Airflow worker's memory. Even worse, it could stall or crash the worker, disrupting other executing DAGs.

Instead, to ensure the workload scales independently, we create an instance and use the instance's STARTUP_SCRIPT to transfer the files.

with DAG('s3_to_gcs_transfer',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    # Create and start instance
    # Use the STARTUP_SCRIPT to transfer files in the instance
    create_instance = ComputeEngineInsertInstanceOperator(
        task_id='create_transfer_instance',
        project_id='your-project-id', 
        zone='us-central1-a',
        body={
            'name': 'transfer-instance-{{ ds_nodash }}',
            'machineType': 'n1-standard-2',
            'metadata': {
                'items': [
                    {'key': 'startup-script', 'value': STARTUP_SCRIPT},
                    {'key': 's3_bucket', 'value': 'your-s3-bucket'},
                    {'key': 's3_key', 'value': 'path/to/your/file'},
                    {'key': 'gcs_bucket', 'value': 'your-gcs-bucket'},
                    {'key': 'gcs_path', 'value': 'destination-path'}
                ]
            }
        }
    )

    # Delete instance
    delete_instance = ComputeEngineDeleteInstanceOperator(
        task_id='delete_transfer_instance',
        project_id='your-project-id',
        zone='us-central1-a',
        resource_id='transfer-instance-{{ ds_nodash }}',
        trigger_rule='all_done'
    )

    # Load from GCS to BigQuery
    load_to_bq = GCSToBigQueryOperator(
        task_id='load_to_bigquery',
        bucket='your-gcs-bucket',
        source_objects=['path/to/destination/file.csv'],
        destination_project_dataset_table='project:dataset.table'
    )

    create_instance >> delete_instance
    create_instance >> load_to_bq
Enter fullscreen mode Exit fullscreen mode

When we need to update the DAGs to use this scalable approach, there isn't a clean way to do it. Every DAG using the transfer logic needs to be updated, tested, and redeployed individually. The refactoring effort multiplies with the number of files that need updating.

Diagram illustrating pipeline update propagation challenges

This limitation stems from Airflow's fundamental design pattern where business logic (the nodes and structure of the DAG) is tightly coupled with implementation logic (the actual task code). This coupling not only makes maintenance difficult but also creates a barrier between data engineers and other team members who could potentially define and modify pipelines themselves.

Declarative data pipelines

A declarative data platform could address these challenges by completely separating the technical implementation details (how to move data from S3 to GCS) from the pipeline business logic (which pipeline needs to move data from S3 to GCS). This separation would allow data engineers to focus on building robust, reusable components while enabling analysts and scientists to define pipelines without deep technical knowledge.

Architecture diagram of declarative pipeline approach

Config-based workflows

In summary, we can identify three key requirements for building an effective declarative data platform:

  1. Separation of Concerns: Pipeline definitions (what) should be completely separate from task implementations (how)

  2. Reusability: Tasks should be reusable across different pipelines without code duplication

  3. Simplified Interface: Teams should be able to define pipelines using a simple, declarative syntax

Here's how such a platform might work. Instead of writing Python code, you'd define your pipeline in YAML:

tasks:
  - name: daily_transfer
    task: s3_to_gcs
    params:
      s3_bucket: your-s3-bucket-name,
      s3_key: path/to/your/file.csv,
      gcs_bucket: your-gcs-bucket-name,
      gcs_object_name: path/to/destination/file.csv

  - name: daily_load_to_bq
    task: gcs_to_bq
    depends_on: s3_to_gcs
    params:
      gcs_bucket: your-gcs-bucket-name,
      gcs_object_name: path/to/destination/file.csv,
      bq_dataset: your-bq-dataset-name,
      bq_table: your-bq-table-name
Enter fullscreen mode Exit fullscreen mode

The actual implementation of these tasks would be written in Python, but crucially, they would be independent of any specific pipeline. This separation allows data engineers to optimize and maintain task implementations without affecting pipeline definitions.

Benefits

This approach also offers other key advantages.

Reduced cognitive load

Declarative systems mirror how humans naturally think about problem-solving. When designing a new pipeline, we typically think top-down about what tasks need to be included. However, imperative systems like Airflow require bottom-up thinking, where you must detail every implementation step before building the pipeline.

Comparison of top-down vs bottom-up pipeline design approaches

Improved collaboration

Since creating and updating tasks are decoupled from business logic, teams can work more efficiently:

  • Data analysts can create and modify pipelines without deep technical knowledge

  • Data engineers can optimize task implementations without understanding pipeline-specific logic

  • Changes can be reviewed and deployed independently

Workflow diagram between Data Scientists and Engineers

UI and no-code tools

While this article proposes a YAML-based approach, visual workflow tools like AWS Glue offer similar declarative functionality. However, these UI-based tools come with some non-obvious limitations:

Limited DevOps Integration

Core software engineering practices like code review, testing, and automated deployments become difficult or impossible with UI-based tools.

Reduced Flexibility

While UI tools excel at common use cases, they often struggle with custom requirements. Teams frequently end up modifying their requirements to fit the tool's capabilities rather than the other way around.

AI & LLMs

The rise of Large Language Models (LLMs) presents interesting opportunities for declarative data platforms. LLMs excel at breaking down high-level instructions into logical steps but can struggle with generating efficient, maintainable implementation code.

A declarative platform could leverage these strengths by using LLMs to:

  • Help generate pipeline configurations from natural language descriptions

  • Suggest task combinations for common data patterns

  • Assist with pipeline documentation and metadata

This aligns well with LLMs' capabilities while avoiding their limitations around generating complex implementation code.

Implementation

With these challenges in mind, I built dagster-odp, as a proof-of-concept for bringing declarative pipelines to Dagster. Let's examine how this implementation addresses the problems we've discussed, starting with why Dagster was chosen as the foundation.

Why not Airflow

Two fundamental aspects of Airflow's design made it unsuitable for our needs:

  1. Task definition model: In Airflow, tasks can only exist within DAGs - they aren't independent, reusable components. This conflicts with our core requirement of having tasks be first-class citizens that can be reused across different pipelines. The tight coupling between tasks and DAGs makes it impossible to build a truly modular system.

  2. Dynamic DAG Creation: Our platform needs to create DAGs dynamically from YAML configurations. However, Airflow expects DAGs to be defined in Python files. Creating DAGs dynamically would mean generating Python code at runtime and writing this code to DAG files - an approach that's both complex and prone to errors.

Why Dagster?

Dagster's architecture aligns perfectly with our declarative approach, offering two key constructs that make it ideal for our platform:

Asset-Based Architecture

Instead of focusing on tasks like Airflow, Dagster uses assets as its primary abstraction. An asset represents a concrete piece of data - like a table, file, or model. Creating a new version of assets through a pipeline run is called asset materialization.

This approach means the code is directly linked to the data object it creates, enabling rich metadata when a new version of an asset is created. For instance, when running a pipeline in Dagster, your code can push detailed information about the data asset it produced to the UI. The metadata could include the asset's size, schema, quality metrics, and lineage. This tight coupling between code and data makes it easier to build observable, maintainable data pipelines.

Dagster UI vs Airflow UI comparison screenshot

Dynamic Definition Support

Dagster's component model is built around Python decorators rather than class inheritance, making it ideal for dynamic creation. For example:

@asset 
def dynamic_asset(context): 
    # Asset logic here 
    pass
Enter fullscreen mode Exit fullscreen mode

This design means we can programmatically create assets from YAML configurations without generating code files - a critical requirement for our platform.

Additional Dagster features

Beyond its core architecture, Dagster provides several features that make it an ideal foundation for our declarative platform:

  1. Local Development: Dagster prioritizes developer experience by allowing local execution without complex setup. This means you can develop and test pipelines on your local machine with the same code that will run in production, leading to faster iteration cycles and simpler debugging.

  2. DBT Integration: Dagster's DBT integration automatically discovers DBT models in your repository. This deep integration means you can treat individual DBT models as Dagster assets, run specific model selections, and maintain a complete picture of your data lineage across both DBT and non-DBT transformations.

  3. Partitions: Dagster's partitioning system lets you process data in logical chunks, whether time-based (like daily runs) or categorical (like per-country processing). This is particularly powerful for backfills and incremental processing, as you can easily rerun specific partitions without touching others.

  4. Asset checks: Data quality monitoring is built into Dagster's core abstractions through asset checks. Rather than treating quality checks as an afterthought, Dagster allows you to define checks that run automatically after asset materialization, enabling you to catch data issues before they propagate downstream.

dagster-odp

To demonstrate how dagster-odp brings these concepts together, we'll implement the same S3 to BigQuery pipeline we discussed earlier, but using a declarative approach. The complete implementation consists of three main components: resource configuration, task definition, and workflow configuration.

Resource definition

First, we define our resource configuration. Resources in dagster-odp represent reusable connections and services, similar to Airflow hooks. While dagster-odp ships with pre-built GCP resources, we'll also create a custom S3 resource:

resources:
  - resource_kind: gcs
    params:
      project: my-gcp-project

  - resource_kind: s3

  - resource_kind: bigquery
    params:
      project: my-gcp-project
      location: us-east1
Enter fullscreen mode Exit fullscreen mode

The S3 resource implementation shows how to create a custom resource:

@odp_resource("s3")
class S3Resource(ConfigurableResource):
    """A resource that provides access to Amazon S3."""

    def get_client(self) -> Any:
        return boto3.client(
        "s3",
        aws_access_key_id=EnvVar(
            "AWS_ACCESS_KEY_ID").get_value(),
        aws_secret_access_key=EnvVar(
            "AWS_SECRET_ACCESS_KEY").get_value(),
        )
Enter fullscreen mode Exit fullscreen mode

We use the odp_resource decorator to define the resource class. This allows it to be discovered by the framework and used in the YAML file. We define one method, get_client, which we will use in our task definition.

Task definition

With our resources defined, we can create the S3 to GCS transfer task by implementing a BaseTask subclass.

from dagster_odp.tasks.manager import BaseTask, odp_task

@odp_task(
    "s3_file_to_gcs",
    required_resources=["gcs", "s3"]
)
class S3ToGCSTransfer(BaseTask):
    """
    A task that transfers a file from Amazon S3 to GCP Storage
    """
    s3_bucket: str
    s3_key: str
    destination_uri: str

    def run(self) -> dict:
        s3_client = self._resources["s3"].get_client()
        gcs_client = self._resources["gcs"]


        # Download file from S3
        s3_response = s3_client.get_object(Bucket=self.s3_bucket,
                                            Key=self.s3_key)
        file_content = s3_response["Body"].read()

        bucket_name = self.destination_uri[5:].split("/")[0]
        blob_name = "/".join(self.destination_uri[5:].split("/")[1:])

        # Upload to GCS
        bucket = gcs_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(file_content)

        return {
            "s3_bucket": self.s3_bucket,
            "s3_key": self.s3_key,
            "destination_uri": self.destination_uri,
            "size": len(file_content),
        }
Enter fullscreen mode Exit fullscreen mode

The task implementation returns metadata that will be visible in the Dagster UI after each run.

Workflow definition

Finally, we tie everything together in our workflow configuration. This YAML file defines our assets, their dependencies, and scheduling:

assets:
  - asset_key: raw_data_gcs
    task_type: s3_file_to_gcs
    params:
      s3_bucket: my-source-bucket
      s3_key: data/daily_export.csv
      destination_file_uri: gs://my-gcs-bucket/data/daily_export.csv

  - asset_key: raw_data_bq
    task_type: gcs_file_to_bq
    depends_on: [raw_data_gcs]
    params:
      source_file_uri: "{{raw_data_gcs.destination_file_uri}}"
      destination_table_id: "my-project.my_dataset.daily_data"
      job_config_params:
        source_format: CSV
        write_disposition: WRITE_TRUNCATE
        autodetect: true

jobs:
  - job_id: daily_data_transfer
    description: "Transfer data from S3 to BigQuery daily"
    asset_selection: [raw_data_gcs, raw_data_bq]
    triggers:
      - trigger_id: daily_schedule
        trigger_type: schedule
        params:
          schedule_kind: cron
          schedule_params:
            cron_schedule: "@daily"
Enter fullscreen mode Exit fullscreen mode

These components are converted by dagster-odp into Dagster constructs like assets, jobs and schedules.

dagster-odp system conversion flow diagram

dagster-odp Features

The example above demonstrates some key features of the framework:

  1. Asset dependencies through the depends_on field

  2. Metadata sharing between assets using handlebars syntax ({{raw_data_gcs.destination_file_uri}})

  3. Built-in scheduling with cron support

  4. Pre-built tasks for common operations like gcs_file_to_bq

The framework also provides several other capabilities:

  • Integration with DLT for data ingestion

  • Soda integration for data quality monitoring

  • DBT support with dependency management and variable passing

For a hands-on introduction to dagster-odp, check out the Getting Started guide. The guide walks through setting up your first project and building a simple pipeline using the concepts we've discussed.

The Move Towards Declarative Pipelines

dagster-odp isn't the only declarative data platform. Several platforms are exploring similar approaches, recognizing the need to make data pipelines more maintainable and accessible:

  1. Data aware scheduling: Airflow's data aware scheduling (v2.4+) introduces a concept similar to Dagster's assets, allowing DAGs to be dynamically chained together based on datasets. However, since DAGs must still be defined in Python files, creating these jobs dynamically from YAML remains challenging.

  2. DLT+: dlthub, an open source data ingestion tool, recently launched DLT+, offering end-to-end workflow definitions via YAML files. It's built specifically for data scientists, focusing on single-machine computation for both ingestion and processing tasks. To learn more, get in touch with them.

  3. Dagster: The Dagster team is working on their own declarative feature that will be deeply integrated into their orchestration product. To learn more or collaborate, jump into the Dagster Slack.

Conclusion

The evolution from imperative to declarative approaches in data engineering mirrors what we've seen in other domains like DevOps. As data pipelines become more complex, the need for maintainable, configurable, and collaborative solutions grows stronger. A declarative framework addresses these challenges by separating pipeline definitions from implementation details, enabling teams to work more efficiently while maintaining the flexibility needed for complex data workflows.

Whether you're building new pipelines or maintaining existing ones, considering a declarative approach could help make your data infrastructure more maintainable and your teams more productive.

Top comments (4)

Collapse
 
zethix profile image
Andrey Rusev

Good work! :)

From what I've seen in other companies - usually the ones with high demand for data engineering always end up in developing something along the same lines (declarative pipelines). So, I think you are on to something here!

I also like your comparison to infra - many companies treat data engineering as software development, thinking, for example, that DEs release software. Where I believe DEs should release data (or data assets) - every day, every hour, whatever. And I do often advocate for a much more operational approach to data engineering.

Simply put - the company will always care more about do I have the new data now? than did they implement that step in their DAG? Anyway, that's a bit off topic...

Collapse
 
jonathanbhaskar profile image
Jonathan Bhaskar

Thank you for reading. The declarative pattern is definitely gaining popularity in enterprise solutions - I've implemented one in production myself. Like you mentioned, the goal of DE teams is to deliver data assets, not just the software that produces them. Any approach that simplifies asset production while maintaining flexibility and customization is valuable.

Collapse
 
olgabraginskaya profile image
Olga Braginskaya

Really interesting read! I love the idea of making data pipelines more reusable and less tied to code. Declarative approaches definitely have their advantages. But I always feel like YAML is a bit of a love-hate thing—it makes things look clean at first, but debugging and handling more complex logic can get tricky fast.

Collapse
 
jonathanbhaskar profile image
Jonathan Bhaskar

Thanks for reading. I agree that the YAML approach does come with tradeoffs. While proper validation helps catch errors early and assists in debugging, implementing complex logic usually leads to extensive parameterization which can become unwieldy.