Data pipelines are everywhere in the enterprise, understandably: data is the lifeblood of a company, and without being able to get it to those who need it, work would grind to a halt. The classic paradigm for building data pipelines has historically been ETL (Extract-Transform-Load). The name says it all: you build a job which extracts data from one source, apply your desired reshaping/aggregation/fancification, and pushes it to a destination. But one of my favorite developments over the past decade or so is the ELT paradigm (Extract-Load-Transform), which defers the reshaping until your data has already made it to the destination -- giving you flexibility to adjust that transform as needed and slimming down the components in your pipeline.
Snowflake is a cloud data warehouse that's the target of many data pipelines, and has three features that I love for building data pipelines where you do your transformation after you've loaded it in: Streams, User-Defined Functions (UDFs), and Tasks.
- Streams are like tables, except they only contain data that's new from their source. They can include all changes, or just inserts, depending on your needs. They work by storing an offset to Snowflake's internal CDC information, similar to a Kafka consumer offset, meaning streams don't actually store any data and can be re-created easily.
- UDFs are functions that you can write in a variety of languages (including #python). These have some language-specific particulars (for example, JavaScript UDFs take in all rows to the same execution instance, whereas Python UDFs can execute on one row or on batches of rows, exposed to the UDF as a pandas DataFrame) but overall are incredibly useful. They're also great for cases where you're working with rich JSON data that your team doesn't want to work with in plain SQL.
- Tasks are scheduled jobs that live right inside Snowflake, and be scheduled without the need to involve separate scheduling software.
In this tutorial, I'm going to show how you can build out the Transform step of an ELT pipeline entirely inside Snowflake. I won't go into how your data gets extracted from whatever source or loaded into Snowflake (but I really like Kafka).
Step 0: Setup
We'll use Snowflake's provided dataset just to easily generate data.
USE WORKSHEET;
USE SCHEMA WORKSHEET.RNUBEL;
CREATE TABLE ORDERS
AS (SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 1);
Step 1: Define your stream
Now that we've got a table, let's create a stream on it.
CREATE OR REPLACE STREAM ORDERS_STREAM ON TABLE ORDERS APPEND_ONLY = true;
A couple notes:
-
APPEND_ONLY = true
is a flag indicating we only want to see new records (i.e., INSERTS). If you also need to account for updates or deletes, don't pass this flag, and be prepared to handle those other operations. - When a stream is created, it initially will have its offset set to the tip of the table's internal changelog, and therefore contain no data if you query it. You can move this offset back with an
AT
orBEFORE
clause: see the docs for more information.
We expect our stream to be empty, at the moment:
SELECT COUNT(*) FROM ORDERS_STREAM; // returns 0
Let's insert some data to see the stream in action.
INSERT INTO ORDERS
(SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 10);
SELECT COUNT(*) FROM ORDERS_STREAM; // returns 10
Note that you can query that count as many times as you'd like and you'll still get 10. So when does the offset advance and clear the stream? It's dangerously simple: whenever any DML operation happens that involves the stream. This will work in our favor later, but it can be surprising. For now, this dummy insert operation will clear it:
CREATE TEMPORARY TABLE temp_delete_stuff AS (SELECT * FROM orders_stream);
DROP TABLE temp_delete_stuff;
SELECT COUNT(*) FROM ORDERS_STREAM; // returns 0
Step 2: Define a UDF
Now, this step might be optional for you. Maybe your transform stage can all happen in SQL, and you can skip right to Step 3, but I think having access to Python opens up a lot of possibilities. So let's make a Python UDF that will transform a row from our source table into a fancy destination row. Actually, it won't be fancy, because this is a tutorial, but it will at least be generated by Python.
CREATE OR REPLACE FUNCTION transform_orders("row" OBJECT)
RETURNS TABLE (order_key TEXT, order_age_in_days INTEGER)
LANGUAGE PYTHON
HANDLER = 'OrderTransformer'
RUNTIME_VERSION='3.8'
AS
'
from datetime import date
class OrderTransformer:
def __init__(self):
pass
def process(self, row):
age = date.today() - date.fromisoformat(row["O_ORDERDATE"])
return [(row["O_ORDERKEY"], age.days)]
def end_partition(self):
pass
'
;
Notes here:
- The input to this function is an
OBJECT
which we expect to hold the row as a dictionary. To get the row into this format, we'll use the Snowflake functionobject_construct()
, but this is mostly just to demonstrate flexibility and might not be what you need. You might be better off specifying specific input columns. - This UDF returns a table, so it has to return a list of tuples. This isn't the only option; your UDF could return a static value that you then break out to rows later on. It all depends on what sort of transform you're doing.
To test this out, run it on your full (mini) data set:
SELECT order_key, order_age_in_days
FROM
(SELECT object_construct(*) as_object FROM orders) orders,
LATERAL transform_orders(orders.as_object);
ORDER_KEY | ORDER_AGE_IN_DAYS |
---|---|
4800004 | 9,071 |
4800005 | 10,334 |
4800006 | 10,586 |
4800007 | 10,637 |
3600001 | 9,932 |
Granted, we didn't need Python to do that, but it's still cool.
Step 3: Create a destination table
Simple enough:
CREATE TABLE ORDER_FACTS (
ORDER_KEY TEXT,
ORDER_AGE_IN_DAYS INTEGER
);
Step 4: Create a procedure to transform and save all new records
This is where things get fun. We're going to leverage Snowflake's MERGE statement, which lets us run a query and compare every returned row to the target table and decide if the row needs an update or an insert:
CREATE OR REPLACE PROCEDURE load_orders()
RETURNS INT
LANGUAGE SQL
AS $$
begin
BEGIN TRANSACTION;
MERGE INTO ORDER_FACTS dst USING (
SELECT order_key,
MAX(order_age_in_days) AS order_age_in_days
FROM
(SELECT object_construct(*) as_object FROM orders_stream) orders,
LATERAL transform_orders(orders.as_object) output
GROUP BY 1
) src
ON src.order_key = dst.order_key
WHEN MATCHED THEN
UPDATE SET
order_age_in_days = src.order_age_in_days
WHEN NOT MATCHED THEN
INSERT (order_key, order_age_in_days)
VALUES (src.order_key, src.order_age_in_days);
COMMIT;
return 1;
end
$$
;
One key point here is that even though we're positive each row in src
will contain just one row per order, we're still using GROUP BY to ensure there's only one row being selected to merge. Otherwise, we could potentially have multiple rows running through the MERGE logic, causing non-deterministic behavior.
Also, note that we wrap the operation in an explicit transaction. I don't know if the caller is necessarily going to have AUTOCOMMIT enabled when it gets called, and there's no reason to risk it.
Insert some sample data and test out your procedure:
INSERT INTO ORDERS
(SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 100);
CALL load_orders();
SELECT * FROM ORDER_FACTS;
Step 5: Schedule the task
We probably don't want to log into Snowflake and execute the task all day long, so we'll leverage a Task to automatically run it. Suppose you want to refresh all new orders every hour:
CREATE OR REPLACE TASK orders_load_task
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
SCHEDULE = 'USING CRON 0 * * * * America/Chicago'
AS
CALL load_orders();
ALTER TASK orders_load_task RESUME;
When picking the right frequency, keep in mind that bringing up the warehouse comes with costs that might make it beneficial to run this less often. Keep your downstream users' needs in mind, but also keep an eye on the cost.
Conclusion
That finishes our transform! I really like that we were able to do this entirely in Snowflake -- no Airflow required. Snowflake's task system isn't fully fleshed out, though, so at Enova we still supplement this process with conventional DAGs using SnowflakeOperators. I might write about that in a future post.
One thing you might be wondering about is what happens when your transform fails. Maybe data changed, or values are unexpectedly NULL, or some other edge case produces an exception in your UDF. If you aren't handling it, the MERGE statement will fail and cause the task itself to fail, stalling your pipeline. The good news in that case is that no data is lost, assuming you fix the bug and recover before your stream reaches its maximum retention period (perhaps 7 days, perhaps 30; check your Snowflake account details).
If you can't tolerate any downtime of that nature, you could look into employing a dead-letter queue pattern and, after rescuing the error, move failed rows to a separate table for later processing.
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.