Decisions about running a company are rarely made based on individual transactions. Instead, business intelligence is usually derived from aggregate data, e.g., daily sales by region/product/demographic. Individual transactions are frequently aggregated in advance to analyze this data in a timely manner and make slicing and dicing it a pleasant experience. In this blog post, we'll discuss building this kind of aggregation pipeline with Airflow and Athena.
We'll base this post on the TPC-H dataset, which is commonly used to benchmark data warehouses and describes a typical e-commerce application. I explained how to provision this dataset into your datalake in the past: Making the TPC-H dataset available in Athena using Airflow, so go check that out if you haven't already. I'll also assume that you're familiar with both Athena and Airflow.
The TPC-H dataset features the orders table, which stores individual transactions, the total order volume, and references to some dimensions, e.g., customer info. We can use these to determine the market segment, country, and region in which the sale occurred by joining the orders table with other dimensions. As an example, we can use the following query to compute the sales volume per market segment, nation, and region for the 1st of August 1998:
/*
Daily sales per market segment, nation, and region
*/
select c_mktsegment,
n_name,
n_nationkey,
r_name,
r_regionkey,
o_orderdate,
sum(o_totalprice) as total_revenue,
year(o_orderdate) as "year",
month(o_orderdate) as "month",
day_of_month(o_orderdate) as "day_of_month",
day_of_week(o_orderdate) as "day_of_week"
from customer c
join orders o on c.c_custkey = o.o_custkey
join nation n on c_nationkey = n.n_nationkey
join region r on r_regionkey = n.n_regionkey
where o_orderdate = date('1998-08-01')
group by c_mktsegment,
n_name,
n_nationkey,
r_name,
r_regionkey,
o_orderdate
;
As mentioned, this aggregates the day's transactions by market segment, nation, and region, which reduces the number of records our BI tool, e.g., Quicksight, has to process when creating interactive visualizations by several orders of magnitude.
Before we turn this individual statement into a data pipeline in form of an Airflow Directed Acyclic Graph (DAG), let's talk about some qualities we like to see in our data pipelines or ETL Jobs. Well-designed ETL jobs should include some form of error handling that tries to resolve commonly occurring problems on its own.
Additionally, it should support reprocessing existing data because errors may be discovered and corrected later, and these fixes should be incorporated into the output. Visibility into a running process and the overall status of our pipeline over time should also be built in.
We will tackle this using Airflow for orchestration and Athena to perform the actual data processing. The code I'm going to show now is available in the companion repo on Github. With regard to errors, the most common remediation step is to retry the operation, which Airflow can do for us, so it's just a matter of configuration.
with DAG(
dag_id="daily_sales_aggregation",
dag_display_name="Daily Sales Aggregation",
default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
# ...
) as dag:
# ...
The more specific error condition we need to consider is that there isn't a table when writing data for the first time. Since I like my DAGs to be self-contained, I will have the DAG respond to this error by creating the table and performing the insert operation again. Unfortunately, that's not straightforward. First, we add a failure callback, which serializes the error messages and stores it as an XCom, basically a key-value store that allows us to fetch the message later from another task.
def _add_exception_to_xcom(context):
context["task_instance"].xcom_push("insert_error", str(context["exception"]))
insert_into_aggregate_table = AthenaOperator(
task_id="insert_into_aggregate_table",
task_display_name="Insert into Daily Sales",
on_failure_callback=_add_exception_to_xcom,
**insert_params,
)
Then, a branch operator retrieves that error message and checks if it's the expected Table not found message. In that case, we move on to the create_aggregate_table
task. Otherwise, we raise an error because that could indicate some other unknown issue.
@task.branch(
task_id="handle_result",
task_display_name="Table doesn't exist?",
trigger_rule="all_failed",
)
def is_table_missing(**context):
error_message = context["task_instance"].xcom_pull(
key="insert_error", task_ids="insert_into_aggregate_table"
)
if fnmatch.fnmatch(error_message, "*Error: Table * not found in database*"):
LOGGER.info("Table doesn't exist, next step: create table.")
return "create_aggregate_table"
raise AirflowException(f"Unknown error during insert {error_message}")
handle_missing_table = is_table_missing()
insert_into_aggregate_table >> handle_missing_table
Of course, we could forego this complexity and ensure we create the table manually before we run the process for the first time. I'm not a huge fan of manual tasks, though, and this way, the DAG takes care of the whole table's lifecycle, aside from deleting it.
The other quality we were looking for is the ability to reprocess data, which means that we want to achieve a mild form of idempotency, i.e., the ability to re-run the pipeline with the same inputs and underlying data and get the same output. Given that we're in a data lake situation and Athena can't execute delete statements, we need to remove any underlying data before we insert data. Otherwise, we may end up with duplicate records when data is reprocessed.
For this to work, we need to ensure that each insert statement creates a new partition, i.e., we're going to partition our daily_sales
table by year, month, and day. This will, in turn, result in the data for each partition being stored in a predictable S3 location and allows us to use the S3DeleteObjectsOperator
to clean any objects from that prefix before trying to insert data. If there is no pre-existing data, this costs us one ListObjectsV2
API call per day, which is very cheap.
clean_s3_prefix = S3DeleteObjectsOperator(
task_id="clean_s3_prefix",
task_display_name="Delete Existing data from S3",
bucket="{{params.s3_bucket}}",
prefix="{{params.s3_prefix}}{{params.daily_sales_table_name}}/{{ macros.ds_format(ds, '%Y-%m-%d', 'year=%Y/month=%-m/day_of_month=%-d/') }}",
)
Assembling these building blocks into a DAG gives us the following pipeline. First, we delete data for the daily partition if there is any. Next, we try to insert data into the table. If the table doesn't exist, we create it and repeat the insert.
Aside from the initial run, only the first two tasks should ever be triggered, and usually, the first one won't do much. If we ignored all repeatability and error handling, we could just have one insert task. Making something production-ready and maintainable can add some complexity, which you won't have if you only consider the happy path.
Something to consider with the TPC-H dataset is that the data covers the years from 1992 to 1998, so without further modifications we can't really simulate newly incoming data. Instead, we can set a logical date when we run the DAG to run it for a historical date. You'll find the time frame hard-coded in the DAG parameters on GitHub.
To create our table, we can set the catchup
parameter to true, which will cause it to compute the daily sales for the whole time period. Since that may take a while, I've also created a second DAG that does monthly aggregation, where the catchup won't take as long.
We can also use the airflow dags backfill
command in order to trigger or re-run DAG runs for a specific time period. The following command triggers DAG runs for November 1994. By default, it won't re-run for dates that it has already processed, which we could change by adding the --reset-dagruns
parameter.
$ airflow dags backfill daily_sales_aggregation \
--start-date 1994-11-01 \
--end-date 1994-11-30
The calendar view lets us view the progress of our backfill operation.
I should mention one other detail here. I've made life simple for myself by using the {{ds}}
template variable for the day filter. This means Airflow will filter the data based on the day the DAG is running on (unless you overwrite the logical date). Depending on when your data is updated, you may have to do some date arithmetic here. If, for example, you have to process the data of the previous day, you'd have to replace {{ds}}
with {{ macros.ds_add(ds, -1) }}
. There are more considerations with regard to data consistency and scheduling, but this is already getting long, so I'll talk about it another day.
In this post, I've shared two DAGs with you that can be used to create daily or monthly data aggregation pipelines. I've also outlined some features of a production-ready data pipeline, which, on the surface, complicate things but make the pipeline more robust in the real world. I hope you learned something new.
We're happy to help you set up your data pipelines and BI solutions. Go check out our offerings in the data analytics and machine learning space!
— Maurice
Special thanks to my friend Peter for sharing his expertise and feedback!
Cover Photo by Isaac Smith on Unsplash (yes, I chose it because of the axes)
Top comments (0)