AWS NOW SUPPORTS DELTA LAKE ON GLUE NATIVELY.
CHECK IT OUT HERE:
The purpose of this blog post is to demonstrate how you can use Spark SQL Engine
to do UPSERTS
, DELETES
, and INSERTS
. Basically, updates.
Earlier this month, I made a blog post about doing this via PySpark
. Check it out below:
UPSERTS and DELETES using AWS Glue and Delta Lake
Kyle Escosia for AWS Community ASEAN ・ Jul 21 '21
But, what if we want it to make it more simple and familiar?
This month, AWS released Glue version 3.0! AWS Glue 3.0 introduces a performance-optimized Apache Spark 3.1 runtime for batch and stream processing. The new engine speeds up data ingestion, processing and integration allowing you to hydrate your data lake and extract insights from data quicker.
But, what's the big deal with this?
Well, aside from a lot of general performance improvements of the Spark Engine, it can now also support the latest versions of Delta Lake. The most notable one is the Support for SQL Insert, Delete, Update and Merge.
If you don't know what Delta Lake is, you can check out my blog post that I referenced above to have a general idea of what it is.
Let's proceed with the demo!
Table of Contents
✅ Architecture Diagram
This is basically a simple process flow of what we'll be doing. We take a sample csv
file, load it into an S3 Bucket
then process it using Glue
. (OPTIONAL) Then you can connect it into your favorite BI tool (I'll leave it up to you) and start visualizing your updated data.
❗ Pre-requisites
But, before we get to that, we need to do some pre-work.
- Download the Delta Lake package here - a bit hard to spot, but look for the
Files
in the table and click on thejar
- An AWS Account - ❗ Glue ETL is not included in the free tier
- Download the sample data here - you can use your own though, but I'll be using this one
- Codes can be found in my GitHub Repository
✅ Format to Delta Table
First things first, we need to convert each of our dataset into Delta Format. Below is the code for doing this.
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL, UNCOMMENT IF YOU WANT TO VIEW ALSO THE DATA FOR UPDATES IN ATHENA
###
# Generate MANIFEST file for Updates
# updatesDeltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/updates_delta/")
# updatesDeltaTable.generate("symlink_format_manifest")
This code converts our dataset into delta
format. This is done on both our source data and as well as for the updates.
After generating the SYMLINK MANIFEST
file, we can view it via Athena. SQL code is also included in the repository
🔀 Upserts
Upsert is defined as an operation that inserts
rows into a database table if they do not already exist
, or updates
them if they do.
In this example, we'll be updating the value for a couple of rows on ship_mode
, customer_name
, sales
, and profit
. I just did a random character spam and I didn't think it through 😅.
# Import as always
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
updateDF = spark.sql("""
MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
The SQL Code above updates
the current table that is found on the updates table based on the row_id
. It then proceeds to evaluate the condition that,
If
row_id
is matched, thenUPDATE ALL
the data. If not, then do anINSERT ALL
.
If you want to check out the full operation semantics of MERGE
you can read through this
After which, we update the MANIFEST
file again. Note that this generation of MANIFEST
file can be set to automatically update by running the query below.
ALTER TABLE delta.`<path-to-delta-table>`
SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
More information can be found here
You should now see your updated table in Athena.
❌ Deletes
Deletes via Delta Lakes are very straightforward.
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
deleteDF = spark.sql("""
DELETE
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
WHERE CAST(superstore.row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
This operation does a simple delete based on the row_id
.
SELECT *
FROM "default"."superstore"
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer);
⤴ Inserts
Like Deletes, Inserts are also very straightforward.
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST
# spark.sql("""
# GENERATE symlink_format_manifest
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)
❗ Partitioned Data
We've done Upsert, Delete, and Insert operations for a simple dataset. But, that rarely happens irl. So what if we spice things up and do it to a partitioned data?
I went ahead and did some partitioning via Spark and did a partitioned
version of this using the order_date
as the partition key. The S3 structure looks like this:
❗ What do you think?
Answer is: YES! You can also do this on a partitioned data.
The concept of Delta Lake is based on log history
.
Delta Lake will generate delta logs for each committed transactions.
Delta logs will have delta files stored as JSON
which has information about the operations occurred and details about the latest snapshot of the file and also it contains the information about the statistics of the data.
Delta files are sequentially increasing named JSON
files and together make up the log of all changes that have occurred to a table.
-from Data Floq
We can see this on the example below
current date_part=2014-08-27/ - DELETED ROWS
From the examples above, we can see that our code wrote a new parquet
file during the delete excluding
the ones that are filtered from our delete
operation. After which, the JSON
file maps it to the newly generated parquet
.
Additionally, in Athena, if your table is partitioned, you need to specify it in your query during the creation of schema
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING,
date_part STRING
)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
Then run an MSCK REPAIR <table>
to add the partitions.
If you don't do these steps, you'll get an error.
✅ Conclusion
That's it! It's a great time to be a SQL Developer! Thank you for reading through! Hope you learned something new on this post.
Have you tried Delta Lake? What tips, tricks and best practices can you share with the community? Would love to hear your thoughts on the comments below!
Happy coding!
Top comments (8)
Thank you for the article. We have the need to do fast UPSERTs in an ETL pipeline just like this article. I am using Glue 2.0 with Hudi in a PoC that seems to be giving us the performance we need. Delta was on my radar and when I saw the Glue 3.0 announcement making a lot of improvements for Delta but no mention of Hudi it makes me think we should have looked at Delta first. Do you have any experience with Hudi to compare with your Delta experience in this article?
I actually want to try out Hudi because I'm still evaluating whether to use Delta Lake over it for our future workloads. I'm on the same boat as you, I was reluctant to try out Delta Lake since AWS Glue only supports Spark 2.4, but yeah, Glue 3.0 came, and with it, the support for the latest Delta Lake package.
Others think that Delta Lake is too "databricks-y", if that's a word lol, not sure what they meant by that (perhaps the runtime?). But so far, I haven't encountered any problems with it because AWS supports Delta Lake as much as it does with Hudi.
Thanks much for this nice article.
I was just wondering whether you could actually test the performance of such setup while querying from Athena.
Indeed a typical optimization technique for Athena is to have files which are big enough ( ~100 MB). So what would be the impact of having instead many small Parquet files within a given partition, each containing a wave of updates?
Glad you liked it! Interesting. Haven't done an extensive test yet, but yeah I get your point, one impact would be your overhead cost of querying because you have a lot of partitions. Well, you aren't going to query all the partitions anyways if you wanted to update, the Glue Job will do that for you. So the one that you'll see in Athena will always be the latest ones. Having said that, you can always control the number of files that are being stored in a partition using
coalesce()
orrepartition()
in Spark.Hi Kyle, Thank a lot for your article, it's very useful information that data engineer can understand how to use Deta lake, with AWS Glue like Upsert scenario. I think your post is useful with Thai developer community, and I have already did translate your post in Thai language version, just want to let you know, and all credit to you. :)
dev.to/chatchaikomrangded/sql-base...
Cool! Thank you! Glad I could help! Thanks for letting me know.
This is so awesome! Tried first time on our own data and looks very promising. May I know if you have written seperate glue job scripts for Update/Insert/Deletes or is it just one glue job that does all operations?
Current Situation:
We had 3~5 Business Units prior to 2019 and each business unit used to have their own warehouse tools and technologies for eg: one business unit completely built the warehouse using SQL Server CDC, Stored Procedures, SSIS, SSRS etc.This was done as very complex stored procedures with lots of surrogate keys generated and follows star schema. The jobs for this business unit uses CDC and have an SLA of 5 minutes. Users still want more and more fresh data.
Another Buiness Unit used Snaplogic for ETL and target data store as Redshift.
Another Business Unit used custom python codes to merge the data and write to SQL Server.
Now in 2022, these Business Units got merged, I have been tasked with building a common data ingestion framework for all the business units using lake house architecture/concepts. I have come with a draft architecture following prescriptive methodology from AWS, below is the tool set selected as we are an AWS shop
Stream Ingestion: Kinesis Firehouse
Batch Ingestion: AWS Glue
Jobs Orchestrator : MWAA ( Managed Airflow )
Lake House Data Store: S3
Target Analytics Store: Redshift
Presentation : Quicksight and Tableu
The jobs run on various cadence like 5 minutes to daily depending on each business unit requirement. I have proposed 3 AWS storage layers like raw/modified/processed. I'm so confused about how to partition these layers but to the best of my knowledge, i have proposed the below
raw --> raw-bucketname/source_system_name/tablename/extract_date=
Modified--> modified-bucketname/source_system_name/tablename ( if the table is large or have lot of data to query based on a date then choose date partition)
processed --> processed-bucketname/tablename/ ( partition should be based on analytical queries)
Questions that I have are:
Hi nk793,
Yes, jobs are different for each process. You want to be as idempotent as possible. Just remember to tag your resources so you don't get lost in the jungle of jobs lol.
Prefixes/Partitioning should be okay, but you might want to split the date further for throughput purposes (more prefix = more throughput). In case of a full refresh, you don't have a choice where you'll start with your earliest date and apply UPSERTS or changes as you go through the dates. What would be a scenario where you'll query the RAW layer? Usually DS accesses the Analytics/Curated/Processed layer, sometimes, staging layer. More info on storage layers here.
Good thing that crawlers now support Delta Files, when I was writing this article, it doesn't support it yet. I suggest you should create crawlers for each layers so each crawler is not dependent from each other. Ideally, it should be 1 database per source system so you'll be able to distinguish them from each other. You can just put a _dev, _raw, _curated in the prefix if you want. Up to you.
This should come from the business. If the trigger is everyday @9am, you can schedule that or if not, you can schedule it based on event. Crawlers can be run if there are additional partitions.
Depends on how complex your processing is and how optimized your queries and codes are. AutoScaling in Glue is also a preview, perhaps have a go on that one. Check out also the different worker types in Glue.
Glue has a Glue Studio, it's a drag and drop tool if you have troubles in writing your own code. If you're talking about automating the same set of Glue Scripts and creating a Glue Job, you can look at Infrastructure-as-a-Code (IaaC) frameworks such as AWS CDK, CloudFormation or Terraform.
Hope this was helpful for you.