We have a self-created Python pipeline, and we can set up a cron job to run the script on a daily basis for whenever we want. Back when I started engineering pipelines at work, I was using Django/Celery/Redis, but at home I was using cron. I had some, but limited visibility into the jobs (mostly in logs) and decided I needed a better tool; that's where Airflow came in.
Each workflow is captured in a Directed Acyclic Graph (DAG) of tasks that are performed as you have laid out (not recursively). Some people have called Airflow "cron on steroids," but it's really much more than that. You'll see why it's more than that in a moment, but it's true that at its core, Airflow is a tool to schedule workflows on a cron based schedule (there are some aliases like @daily for everyday at midnight instead of 0 0 * * *
and @hourly for 0 * * * *
).
The classic, verbose way to set up a DAG is like this:
# From the Airflow tutorial
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
"tutorial_dag",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={"retries": 2},
description="DAG tutorial",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
This will create a new workflow with ID "tutorial_dag" in the UI. The tooltip and description at the top of the screen on the DAG view will say "DAG tutorial." If it fails, it'll retry 2x before showing up as failed (something cron doesn't do!). The schedule
or schedule_interval
is set to None, so this workflow won't run on any schedule and can only be run manually. You could change that parameter to @hourly if you wanted it to be run every hour or 15 8 * * *
if you wanted it to be run everyday at 8:15 UTC. The start_date
is important because your workflow won't run before that date (set to 2021-01-01 here) and if you set catchup=True
it'll run every run between your start_date
and the latest run depending on your schedule; so be careful! tags
are something you can filter on in the UI to find your DAGs quicker.
That's it! Of course, there are lots more knobs you can turn, but when you're just getting started you really just need dag_id
, start_date
, and schedule_interval
.
How do you pass in a task? That's also super easy. Using the simple scraping script I wrote first, you would just do the following:
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id='Functional_Sprott_Scraper',
schedule_interval='5 20 * * 1-6',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
default_args=args,
render_template_as_native_obj=True,
tags=['price', 'scraper']
) as dag:
def web_call(url): # Extract
import requests
r = requests.get(url)
if r.status_code == 200:
soup: BeautifulSoup = BeautifulSoup(r.content, "html.parser")
return soup
else:
return r.status_code
def get_fund_values(soup, index, class_name): # Transform
fund_values = soup.find_all('div', class_=class_name)
value = fund_values[index].contents
return str(value[0]).strip().replace('$US', '').replace(',', '')
def write_json(data, filename='data.json'): # Load
import json
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
@task()
def execute_scraper():
soup = web_call(
url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
data = {}
data['shareprice'] = get_fund_values(soup, 4, 'fundHeader_value')
data['u3o8_stock'] = get_fund_values(soup, 6, 'fundHeader_value')
write_json(data)
Here is our DAG!
The @task decorator is short for this:
def execute_scraper():
soup = web_call(
url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
data = {}
data['shareprice'] = get_fund_values(soup, 4, 'fundHeader_value')
data['u3o8_stock'] = get_fund_values(soup, 6, 'fundHeader_value')
write_json(data)
scrape_task = PythonOperator(task_id='scrape_task', python_callable=execute_scraper)
Most people nowadays just write Python scripts to execute in Airflow, so they short handed the way to call the most basic operator, the PythonOperator.
I'm going to detour and talk about Operators next time since they define your tasks. There are also things called hooks and sensors I'll briefly go into.
As always, the code is on Github here.
Top comments (0)