In data teams today, writing Python code to create data pipelines has become second nature, especially for analytics workloads. Creating new DAGs, tasks, and operators in Airflow - now the industry standard for data orchestration - is just part of our daily routine. But what if there's a simpler, more accessible approach to building these pipelines?
Before we explore this alternative, let's examine how another domain - DevOps - evolved its approach to building and shipping software, and see what lessons we can apply to data orchestration.
Evolution of Infrastructure Management
Let's examine a simple but common task: deploying a web application with a database. The way teams handled this task evolved dramatically over time.
Shell scripts (2000s)
In the early 2000s, deploying applications meant writing detailed shell scripts that listed every command needed. These scripts were brittle, hard to maintain, and required deep system knowledge:
#!/bin/bash
# Install dependencies
apt-get update
apt-get install -y nginx mysql-server
# Configure MySQL
mysql -u root -e "CREATE DATABASE myapp;"
mysql -u root -e "CREATE USER 'myapp'@'localhost';"
mysql -u root -e "GRANT ALL PRIVILEGES ON myapp.*;"
# Deploy application
cd /var/www/html
tar -xzf myapp.tar.gz
chown -R www-data:www-data /var/www/html
# Start services
service mysql start
service nginx start
Configuration Management (2010s)
By 2010, tools like Puppet introduced a paradigm shift. Instead of listing commands, teams defined their desired system state in a declarative format. The tool would figure out how to achieve that state:
package { ['nginx', 'mysql-server']:
ensure => installed,
}
service { 'nginx':
ensure => running,
enable => true,
require => Package['nginx'],
}
service { 'mysql':
ensure => running,
enable => true,
require => Package['mysql-server'],
}
exec { 'create-db':
command => 'mysql -u root -e "CREATE DATABASE myapp;"',
unless => 'mysql -u root -e "SHOW DATABASES;" | grep myapp',
require => Service['mysql'],
}
file { '/var/www/html/myapp':
ensure => directory,
source => 'puppet:///modules/myapp/files',
recurse => true,
owner => 'www-data',
group => 'www-data',
}
Infrastructure as Code (2016+)
Cloud platforms like AWS took this declarative approach even further. With CloudFormation, engineers simply specified the resources they needed, and AWS handled all implementation details:
AWSTemplateFormatVersion: '2010-09-09'
Resources:
WebServer:
Type: AWS::EC2::Instance
Properties:
InstanceType: t2.micro
ImageId: ami-0c55b159cbfafe1f0
UserData:
Fn::Base64: !Sub |
#!/bin/bash
yum update -y
yum install -y nginx
Database:
Type: AWS::RDS::DBInstance
Properties:
Engine: mysql
DBName: myapp
MasterUsername: admin
MasterUserPassword: password
DBInstanceClass: db.t2.micro
SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Allow web traffic
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
The difference over the years is that, instead of actually writing the steps that needed to be executed (imperative), the engineers moved on to specifying what they wanted and let the system figure out how it was done (declarative).
Evolution of Infrastructure Teams
This shift from imperative to declarative approaches fundamentally changed how infrastructure teams operated. Let's look at what this evolution meant in practice.
In the early days, deploying infrastructure involved a lot of back-and-forth between developers and system administrators:
With declarative infrastructure, the interaction changed dramatically:
The key difference was that teams now had a common language - YAML configurations - that both developers and infrastructure engineers could understand and work with effectively. This shift to configuration-driven workflows revolutionized how infrastructure teams operated.
Current state of data pipelines
Data teams today face similar challenges to what infrastructure teams encountered in the 2000s. To understand these parallels, let's examine how data pipelines are typically built, using a common scenario: moving data from S3 to GCS, then loading it into BigQuery - a pattern you might use when you have transactional database backups in S3 but your analytics stack runs on BigQuery.
def transfer_s3_to_gcs(
s3_bucket,
s3_key,
gcs_bucket,
gcs_object_name,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default'
):
# Initialize hooks
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id)
# Create a temporary file
with tempfile.NamedTemporaryFile() as temp_file:
# Download file from S3
s3_hook.download_file(
key=s3_key,
bucket_name=s3_bucket,
local_path=temp_file.name
)
# Upload file to GCS
gcs_hook.upload(
bucket_name=gcs_bucket,
object_name=gcs_object_name,
filename=temp_file.name
)
# Create DAG
dag = DAG(
's3_to_gcs_transfer',
default_args=default_args,
description='Transfer files from S3 to GCS',
schedule_interval='@daily',
catchup=False
)
# Define the transfer task
transfer_task = PythonOperator(
task_id='transfer_s3_to_gcs',
python_callable=transfer_s3_to_gcs,
op_kwargs={
's3_bucket': 'your-s3-bucket-name',
's3_key': 'path/to/your/file.csv',
'gcs_bucket': 'your-gcs-bucket-name',
'gcs_object_name': 'path/to/destination/file.csv'
},
dag=dag
)
# Load from GCS to BigQuery
load_to_bq = GCSToBigQueryOperator(
task_id='load_to_bigquery',
bucket='your-gcs-bucket',
source_objects=['path/to/destination/file.csv'],
destination_project_dataset_table='your-project:dataset.table'
)
# Set task dependencies (if you add more tasks)
transfer_task > load_to_bq
When comparing this approach to what we saw with the DevOps workflows, the code is imperative in nature. We tell the orchestrator how to perform each step in the data pipeline, instead of what needs to be done and letting it figure out the how.
Data team interactions
Unsurprisingly, the data team workflows within companies mirror the interactions between developers and sysadmins in the 2000s. When a data scientist or analyst needs a new pipeline, they have to coordinate with data engineers who are often juggling multiple priorities:
Reusability
Another significant downside of the current imperative approach in Airflow is the difficulty of code reuse. Looking at Airflow's abstraction layers, we can see why:
While Airflow provides hooks and operators that can be reused across DAGs, the tasks themselves must be written directly in the DAG file along with the business logic. Even if we need to create a new DAG that shares most of the same tasks but adds just one additional step, we have to rewrite all the task definitions in the new DAG file.
The tasks have to be defined in this DAG again, even though the transfer_s3_to_gcs
task and the load_to_bigquery
task are the same as the previous DAG.
Tight coupling
The real challenge becomes apparent when we need to update implementation details. Consider a scenario where we need to make our transfer process more scalable. The current approach, which might work for smaller files, will fail for file sizes exceeding the Airflow worker's memory. Even worse, it could stall or crash the worker, disrupting other executing DAGs.
Instead, to ensure the workload scales independently, we create an instance and use the instance's STARTUP_SCRIPT
to transfer the files.
with DAG('s3_to_gcs_transfer',
default_args=default_args,
schedule_interval='@daily',
catchup=False) as dag:
# Create and start instance
# Use the STARTUP_SCRIPT to transfer files in the instance
create_instance = ComputeEngineInsertInstanceOperator(
task_id='create_transfer_instance',
project_id='your-project-id',
zone='us-central1-a',
body={
'name': 'transfer-instance-{{ ds_nodash }}',
'machineType': 'n1-standard-2',
'metadata': {
'items': [
{'key': 'startup-script', 'value': STARTUP_SCRIPT},
{'key': 's3_bucket', 'value': 'your-s3-bucket'},
{'key': 's3_key', 'value': 'path/to/your/file'},
{'key': 'gcs_bucket', 'value': 'your-gcs-bucket'},
{'key': 'gcs_path', 'value': 'destination-path'}
]
}
}
)
# Delete instance
delete_instance = ComputeEngineDeleteInstanceOperator(
task_id='delete_transfer_instance',
project_id='your-project-id',
zone='us-central1-a',
resource_id='transfer-instance-{{ ds_nodash }}',
trigger_rule='all_done'
)
# Load from GCS to BigQuery
load_to_bq = GCSToBigQueryOperator(
task_id='load_to_bigquery',
bucket='your-gcs-bucket',
source_objects=['path/to/destination/file.csv'],
destination_project_dataset_table='project:dataset.table'
)
create_instance >> delete_instance
create_instance >> load_to_bq
When we need to update the DAGs to use this scalable approach, there isn't a clean way to do it. Every DAG using the transfer logic needs to be updated, tested, and redeployed individually. The refactoring effort multiplies with the number of files that need updating.
This limitation stems from Airflow's fundamental design pattern where business logic (the nodes and structure of the DAG) is tightly coupled with implementation logic (the actual task code). This coupling not only makes maintenance difficult but also creates a barrier between data engineers and other team members who could potentially define and modify pipelines themselves.
Declarative data pipelines
A declarative data platform could address these challenges by completely separating the technical implementation details (how to move data from S3 to GCS) from the pipeline business logic (which pipeline needs to move data from S3 to GCS). This separation would allow data engineers to focus on building robust, reusable components while enabling analysts and scientists to define pipelines without deep technical knowledge.
Config-based workflows
In summary, we can identify three key requirements for building an effective declarative data platform:
Separation of Concerns: Pipeline definitions (what) should be completely separate from task implementations (how)
Reusability: Tasks should be reusable across different pipelines without code duplication
Simplified Interface: Teams should be able to define pipelines using a simple, declarative syntax
Here's how such a platform might work. Instead of writing Python code, you'd define your pipeline in YAML:
tasks:
- name: daily_transfer
task: s3_to_gcs
params:
s3_bucket: your-s3-bucket-name,
s3_key: path/to/your/file.csv,
gcs_bucket: your-gcs-bucket-name,
gcs_object_name: path/to/destination/file.csv
- name: daily_load_to_bq
task: gcs_to_bq
depends_on: s3_to_gcs
params:
gcs_bucket: your-gcs-bucket-name,
gcs_object_name: path/to/destination/file.csv,
bq_dataset: your-bq-dataset-name,
bq_table: your-bq-table-name
The actual implementation of these tasks would be written in Python, but crucially, they would be independent of any specific pipeline. This separation allows data engineers to optimize and maintain task implementations without affecting pipeline definitions.
Benefits
This approach also offers other key advantages.
Reduced cognitive load
Declarative systems mirror how humans naturally think about problem-solving. When designing a new pipeline, we typically think top-down about what tasks need to be included. However, imperative systems like Airflow require bottom-up thinking, where you must detail every implementation step before building the pipeline.
Improved collaboration
Since creating and updating tasks are decoupled from business logic, teams can work more efficiently:
Data analysts can create and modify pipelines without deep technical knowledge
Data engineers can optimize task implementations without understanding pipeline-specific logic
Changes can be reviewed and deployed independently
UI and no-code tools
While this article proposes a YAML-based approach, visual workflow tools like AWS Glue offer similar declarative functionality. However, these UI-based tools come with some non-obvious limitations:
Limited DevOps Integration
Core software engineering practices like code review, testing, and automated deployments become difficult or impossible with UI-based tools.
Reduced Flexibility
While UI tools excel at common use cases, they often struggle with custom requirements. Teams frequently end up modifying their requirements to fit the tool's capabilities rather than the other way around.
AI & LLMs
The rise of Large Language Models (LLMs) presents interesting opportunities for declarative data platforms. LLMs excel at breaking down high-level instructions into logical steps but can struggle with generating efficient, maintainable implementation code.
A declarative platform could leverage these strengths by using LLMs to:
Help generate pipeline configurations from natural language descriptions
Suggest task combinations for common data patterns
Assist with pipeline documentation and metadata
This aligns well with LLMs' capabilities while avoiding their limitations around generating complex implementation code.
Implementation
With these challenges in mind, I built dagster-odp, as a proof-of-concept for bringing declarative pipelines to Dagster. Let's examine how this implementation addresses the problems we've discussed, starting with why Dagster was chosen as the foundation.
Why not Airflow
Two fundamental aspects of Airflow's design made it unsuitable for our needs:
Task definition model: In Airflow, tasks can only exist within DAGs - they aren't independent, reusable components. This conflicts with our core requirement of having tasks be first-class citizens that can be reused across different pipelines. The tight coupling between tasks and DAGs makes it impossible to build a truly modular system.
Dynamic DAG Creation: Our platform needs to create DAGs dynamically from YAML configurations. However, Airflow expects DAGs to be defined in Python files. Creating DAGs dynamically would mean generating Python code at runtime and writing this code to DAG files - an approach that's both complex and prone to errors.
Why Dagster?
Dagster's architecture aligns perfectly with our declarative approach, offering two key constructs that make it ideal for our platform:
Asset-Based Architecture
Instead of focusing on tasks like Airflow, Dagster uses assets as its primary abstraction. An asset represents a concrete piece of data - like a table, file, or model. Creating a new version of assets through a pipeline run is called asset materialization.
This approach means the code is directly linked to the data object it creates, enabling rich metadata when a new version of an asset is created. For instance, when running a pipeline in Dagster, your code can push detailed information about the data asset it produced to the UI. The metadata could include the asset's size, schema, quality metrics, and lineage. This tight coupling between code and data makes it easier to build observable, maintainable data pipelines.
Dynamic Definition Support
Dagster's component model is built around Python decorators rather than class inheritance, making it ideal for dynamic creation. For example:
@asset
def dynamic_asset(context):
# Asset logic here
pass
This design means we can programmatically create assets from YAML configurations without generating code files - a critical requirement for our platform.
Additional Dagster features
Beyond its core architecture, Dagster provides several features that make it an ideal foundation for our declarative platform:
Local Development: Dagster prioritizes developer experience by allowing local execution without complex setup. This means you can develop and test pipelines on your local machine with the same code that will run in production, leading to faster iteration cycles and simpler debugging.
DBT Integration: Dagster's DBT integration automatically discovers DBT models in your repository. This deep integration means you can treat individual DBT models as Dagster assets, run specific model selections, and maintain a complete picture of your data lineage across both DBT and non-DBT transformations.
Partitions: Dagster's partitioning system lets you process data in logical chunks, whether time-based (like daily runs) or categorical (like per-country processing). This is particularly powerful for backfills and incremental processing, as you can easily rerun specific partitions without touching others.
Asset checks: Data quality monitoring is built into Dagster's core abstractions through asset checks. Rather than treating quality checks as an afterthought, Dagster allows you to define checks that run automatically after asset materialization, enabling you to catch data issues before they propagate downstream.
dagster-odp
To demonstrate how dagster-odp brings these concepts together, we'll implement the same S3 to BigQuery pipeline we discussed earlier, but using a declarative approach. The complete implementation consists of three main components: resource configuration, task definition, and workflow configuration.
Resource definition
First, we define our resource configuration. Resources in dagster-odp
represent reusable connections and services, similar to Airflow hooks. While dagster-odp
ships with pre-built GCP resources, we'll also create a custom S3 resource:
resources:
- resource_kind: gcs
params:
project: my-gcp-project
- resource_kind: s3
- resource_kind: bigquery
params:
project: my-gcp-project
location: us-east1
The S3 resource implementation shows how to create a custom resource:
@odp_resource("s3")
class S3Resource(ConfigurableResource):
"""A resource that provides access to Amazon S3."""
def get_client(self) -> Any:
return boto3.client(
"s3",
aws_access_key_id=EnvVar(
"AWS_ACCESS_KEY_ID").get_value(),
aws_secret_access_key=EnvVar(
"AWS_SECRET_ACCESS_KEY").get_value(),
)
We use the odp_resource
decorator to define the resource class. This allows it to be discovered by the framework and used in the YAML file. We define one method, get_client
, which we will use in our task definition.
Task definition
With our resources defined, we can create the S3 to GCS transfer task by implementing a BaseTask subclass.
from dagster_odp.tasks.manager import BaseTask, odp_task
@odp_task(
"s3_file_to_gcs",
required_resources=["gcs", "s3"]
)
class S3ToGCSTransfer(BaseTask):
"""
A task that transfers a file from Amazon S3 to GCP Storage
"""
s3_bucket: str
s3_key: str
destination_uri: str
def run(self) -> dict:
s3_client = self._resources["s3"].get_client()
gcs_client = self._resources["gcs"]
# Download file from S3
s3_response = s3_client.get_object(Bucket=self.s3_bucket,
Key=self.s3_key)
file_content = s3_response["Body"].read()
bucket_name = self.destination_uri[5:].split("/")[0]
blob_name = "/".join(self.destination_uri[5:].split("/")[1:])
# Upload to GCS
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(file_content)
return {
"s3_bucket": self.s3_bucket,
"s3_key": self.s3_key,
"destination_uri": self.destination_uri,
"size": len(file_content),
}
The task implementation returns metadata that will be visible in the Dagster UI after each run.
Workflow definition
Finally, we tie everything together in our workflow configuration. This YAML file defines our assets, their dependencies, and scheduling:
assets:
- asset_key: raw_data_gcs
task_type: s3_file_to_gcs
params:
s3_bucket: my-source-bucket
s3_key: data/daily_export.csv
destination_file_uri: gs://my-gcs-bucket/data/daily_export.csv
- asset_key: raw_data_bq
task_type: gcs_file_to_bq
depends_on: [raw_data_gcs]
params:
source_file_uri: "{{raw_data_gcs.destination_file_uri}}"
destination_table_id: "my-project.my_dataset.daily_data"
job_config_params:
source_format: CSV
write_disposition: WRITE_TRUNCATE
autodetect: true
jobs:
- job_id: daily_data_transfer
description: "Transfer data from S3 to BigQuery daily"
asset_selection: [raw_data_gcs, raw_data_bq]
triggers:
- trigger_id: daily_schedule
trigger_type: schedule
params:
schedule_kind: cron
schedule_params:
cron_schedule: "@daily"
These components are converted by dagster-odp
into Dagster constructs like assets, jobs and schedules.
dagster-odp Features
The example above demonstrates some key features of the framework:
Asset dependencies through the
depends_on
fieldMetadata sharing between assets using handlebars syntax (
{{raw_data_gcs.destination_file_uri}}
)Built-in scheduling with cron support
Pre-built tasks for common operations like
gcs_file_to_bq
The framework also provides several other capabilities:
Integration with DLT for data ingestion
Soda integration for data quality monitoring
DBT support with dependency management and variable passing
For a hands-on introduction to dagster-odp
, check out the Getting Started guide. The guide walks through setting up your first project and building a simple pipeline using the concepts we've discussed.
The Move Towards Declarative Pipelines
dagster-odp
isn't the only declarative data platform. Several platforms are exploring similar approaches, recognizing the need to make data pipelines more maintainable and accessible:
Data aware scheduling: Airflow's data aware scheduling (v2.4+) introduces a concept similar to Dagster's assets, allowing DAGs to be dynamically chained together based on datasets. However, since DAGs must still be defined in Python files, creating these jobs dynamically from YAML remains challenging.
DLT+: dlthub, an open source data ingestion tool, recently launched DLT+, offering end-to-end workflow definitions via YAML files. It's built specifically for data scientists, focusing on single-machine computation for both ingestion and processing tasks. To learn more, get in touch with them.
Dagster: The Dagster team is working on their own declarative feature that will be deeply integrated into their orchestration product. To learn more or collaborate, jump into the Dagster Slack.
Conclusion
The evolution from imperative to declarative approaches in data engineering mirrors what we've seen in other domains like DevOps. As data pipelines become more complex, the need for maintainable, configurable, and collaborative solutions grows stronger. A declarative framework addresses these challenges by separating pipeline definitions from implementation details, enabling teams to work more efficiently while maintaining the flexibility needed for complex data workflows.
Whether you're building new pipelines or maintaining existing ones, considering a declarative approach could help make your data infrastructure more maintainable and your teams more productive.
Top comments (4)
Good work! :)
From what I've seen in other companies - usually the ones with high demand for data engineering always end up in developing something along the same lines (declarative pipelines). So, I think you are on to something here!
I also like your comparison to infra - many companies treat data engineering as software development, thinking, for example, that DEs release software. Where I believe DEs should release data (or data assets) - every day, every hour, whatever. And I do often advocate for a much more operational approach to data engineering.
Simply put - the company will always care more about do I have the new data now? than did they implement that step in their DAG? Anyway, that's a bit off topic...
Thank you for reading. The declarative pattern is definitely gaining popularity in enterprise solutions - I've implemented one in production myself. Like you mentioned, the goal of DE teams is to deliver data assets, not just the software that produces them. Any approach that simplifies asset production while maintaining flexibility and customization is valuable.
Really interesting read! I love the idea of making data pipelines more reusable and less tied to code. Declarative approaches definitely have their advantages. But I always feel like YAML is a bit of a love-hate thing—it makes things look clean at first, but debugging and handling more complex logic can get tricky fast.
Thanks for reading. I agree that the YAML approach does come with tradeoffs. While proper validation helps catch errors early and assists in debugging, implementing complex logic usually leads to extensive parameterization which can become unwieldy.