Apache Iceberg is already a popular lakehouse format that is supported by many query engines. What should we do if we want to make a technical selection among many query engines?
In the data warehouse domain, the most commonly used standard is TPC-DS, which defines several common scenarios and provides a set of standardized queries. Generally speaking, TPC-DS is the gold standard for benchmarking performance.
Although TPC-DS is quite popular and there are many common connectors for dumping test data into various databases, and even Trino, a pure computing engine, provides a dedicated catalog for TPC-DS, there is no such thing as a TPC-DS for lakehouse at the moment.
Lakehouse does not have a good connector for this purpose. Therefore, in this article we will try to describe how to dump the test data of TPC-DS into Iceberg's lakehouse.
Experiment environment setup
Regarding how to build the TPC-DS tools is not the focus of this article, so I'll start by assuming that dsdgen
is already installed.
First, let's generate a test data package.
dsdgen -SCALE 1 -DIR /home/ec2-user/sample
Once we have the test data, we need to build the Iceberg environment and import the data.
Although I've provided some Iceberg playgrounds before, this time I'd like to use tabular's experiment environment. The main reason is that the tabular environment also includes a spark notebook, which helps a lot.
https://gist.github.com/wirelessr/37b19323664cff6f9af42bd814f05a5d#file-docker-compose-yaml
One small modification we made was to mount the test folder sample
into spark-iceberg
.
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
- ./sample:/home/iceberg/sample
Import test data
After we have created the environment, we need to write the data into Iceberg.
First, we need to create table schema.
In general, the tpcds.sql
of TPC-DS can be used directly, but there are a few things that need to be modified in the Iceberg experiment.
- Add catalog and database to the original table name, i.e.
<catalog>.<db>.<table>
. - the
primary key
line should be pulled out and declaredPARTITIONED BY
instead. It's better to have it in buckets, so I've split it into 16 buckets, but it can be tweaked. - Now that the
primary key
has been removed, remember to remove the comma from the previous line as well.
Here is the table income_band
for illustration.
create table demo.test.income_band -- rename table
(
ib_income_band_sk integer not null,
ib_lower_bound integer ,
ib_upper_bound integer -- remove comma: ,
-- remove line: primary key (ib_income_band_sk)
)
PARTITIONED BY (bucket(16, ib_income_band_sk)) -- add line
;
I have written a Python script to handle this.
https://gist.github.com/wirelessr/37b19323664cff6f9af42bd814f05a5d#file-proc_ddl-py
When we are done with all the DDLs, then we just need to open the Spark SQL
built into the experiment environment to copy and paste those DDLs directly.
docker exec -it spark-iceberg spark-sql
After the tables are defined, it's time to start importing the data into Iceberg, which we typically do using the shortcuts provided by Spark SQL.
INSERT INTO demo.test.income_band
SELECT * FROM csv.`file:///home/iceberg/sample/income_band.dat`;
But there is a problem, the column type will not match and cause an error (SQL will treat INT
in csv as STRING
). So we can do it with PySpark to save some time.
There is also a ready-made notebook available in the experiment environment.
docker exec -it spark-iceberg pyspark-notebook
Therefore, we just need to open the notebook and start the task of importing the csv.
from pyspark.sql import SparkSession
dat = [
'call_center.dat',
'catalog_page.dat',
'catalog_returns.dat',
'catalog_sales.dat',
'customer.dat',
'customer_address.dat',
'customer_demographics.dat',
'date_dim.dat',
# 'dbgen_version.dat',
'household_demographics.dat',
'income_band.dat',
'inventory.dat',
'item.dat',
'promotion.dat',
'reason.dat',
'ship_mode.dat',
'store.dat',
'store_returns.dat',
'store_sales.dat',
'time_dim.dat',
'warehouse.dat',
'web_page.dat',
'web_returns.dat',
'web_sales.dat',
'web_site.dat'
]
spark = SparkSession.builder.appName("Import CSV").getOrCreate()
for f in dat:
df = spark.read.csv(f"file:///home/iceberg/sample/{f[:-4]}.dat", header=False, inferSchema=True, sep="|")
df = df.drop(df.columns[-1]) # drop the last empty column
df.write.mode("append").insertInto(f"demo.test.{f[:-4]}")
It's pretty easy to fill out each table with PySpark's inferSchema
feature. One thing to note is that I intentionally cut off the last column of the csv.
df = df.drop(df.columns[-1])
The reason is that the csv generated by TPC-DS has a separator at the end of each line, which will be wrongly recognized as one more column.
At this point, we have written all the data into lakehouse, and then we can use various query engines to benchmark the performance of the query predefined by TPC-DS.
Wrap up
TPC-DS is still a standard on lakehouse, but there are fewer resources on how to test it on lakehouse.
This article provides a quick overview of how to import data on lakehouse, and I believe that following the steps should be feasible.
The only thing left to do is to actually query on it, but that's a less difficult task, so I won't dive into it.
Top comments (0)