Forem

ChunTing Wu
ChunTing Wu

Posted on

Benchmarking Lakehouse Query Engines with TPC-DS

Apache Iceberg is already a popular lakehouse format that is supported by many query engines. What should we do if we want to make a technical selection among many query engines?

In the data warehouse domain, the most commonly used standard is TPC-DS, which defines several common scenarios and provides a set of standardized queries. Generally speaking, TPC-DS is the gold standard for benchmarking performance.

Although TPC-DS is quite popular and there are many common connectors for dumping test data into various databases, and even Trino, a pure computing engine, provides a dedicated catalog for TPC-DS, there is no such thing as a TPC-DS for lakehouse at the moment.

Lakehouse does not have a good connector for this purpose. Therefore, in this article we will try to describe how to dump the test data of TPC-DS into Iceberg's lakehouse.

Experiment environment setup

Regarding how to build the TPC-DS tools is not the focus of this article, so I'll start by assuming that dsdgen is already installed.

First, let's generate a test data package.

dsdgen -SCALE 1 -DIR /home/ec2-user/sample
Enter fullscreen mode Exit fullscreen mode

Once we have the test data, we need to build the Iceberg environment and import the data.

Although I've provided some Iceberg playgrounds before, this time I'd like to use tabular's experiment environment. The main reason is that the tabular environment also includes a spark notebook, which helps a lot.

https://gist.github.com/wirelessr/37b19323664cff6f9af42bd814f05a5d#file-docker-compose-yaml

One small modification we made was to mount the test folder sample into spark-iceberg.

    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
      - ./sample:/home/iceberg/sample
Enter fullscreen mode Exit fullscreen mode

Import test data

After we have created the environment, we need to write the data into Iceberg.

First, we need to create table schema.

In general, the tpcds.sql of TPC-DS can be used directly, but there are a few things that need to be modified in the Iceberg experiment.

  1. Add catalog and database to the original table name, i.e. <catalog>.<db>.<table>.
  2. the primary key line should be pulled out and declared PARTITIONED BY instead. It's better to have it in buckets, so I've split it into 16 buckets, but it can be tweaked.
  3. Now that the primary key has been removed, remember to remove the comma from the previous line as well.

Here is the table income_band for illustration.

create table demo.test.income_band      -- rename table
(
    ib_income_band_sk  integer  not null,
    ib_lower_bound     integer          ,
    ib_upper_bound     integer          -- remove comma: ,
    -- remove line: primary key (ib_income_band_sk)
)
PARTITIONED BY (bucket(16, ib_income_band_sk)) -- add line
;
Enter fullscreen mode Exit fullscreen mode

I have written a Python script to handle this.

https://gist.github.com/wirelessr/37b19323664cff6f9af42bd814f05a5d#file-proc_ddl-py

When we are done with all the DDLs, then we just need to open the Spark SQL built into the experiment environment to copy and paste those DDLs directly.

docker exec -it spark-iceberg spark-sql
Enter fullscreen mode Exit fullscreen mode

After the tables are defined, it's time to start importing the data into Iceberg, which we typically do using the shortcuts provided by Spark SQL.

INSERT INTO demo.test.income_band
SELECT * FROM csv.`file:///home/iceberg/sample/income_band.dat`;
Enter fullscreen mode Exit fullscreen mode

But there is a problem, the column type will not match and cause an error (SQL will treat INT in csv as STRING). So we can do it with PySpark to save some time.

There is also a ready-made notebook available in the experiment environment.

docker exec -it spark-iceberg pyspark-notebook
Enter fullscreen mode Exit fullscreen mode

Therefore, we just need to open the notebook and start the task of importing the csv.

from pyspark.sql import SparkSession

dat = [
    'call_center.dat',
    'catalog_page.dat',
    'catalog_returns.dat',
    'catalog_sales.dat',
    'customer.dat',
    'customer_address.dat',
    'customer_demographics.dat',
    'date_dim.dat',
    # 'dbgen_version.dat',
    'household_demographics.dat',
    'income_band.dat',
    'inventory.dat',
    'item.dat',
    'promotion.dat',
    'reason.dat',
    'ship_mode.dat',
    'store.dat',
    'store_returns.dat',
    'store_sales.dat',
    'time_dim.dat',
    'warehouse.dat',
    'web_page.dat',
    'web_returns.dat',
    'web_sales.dat',
    'web_site.dat'
]

spark = SparkSession.builder.appName("Import CSV").getOrCreate()

for f in dat:
    df = spark.read.csv(f"file:///home/iceberg/sample/{f[:-4]}.dat", header=False, inferSchema=True, sep="|")
    df = df.drop(df.columns[-1]) # drop the last empty column

    df.write.mode("append").insertInto(f"demo.test.{f[:-4]}")
Enter fullscreen mode Exit fullscreen mode

It's pretty easy to fill out each table with PySpark's inferSchema feature. One thing to note is that I intentionally cut off the last column of the csv.

df = df.drop(df.columns[-1])
Enter fullscreen mode Exit fullscreen mode

The reason is that the csv generated by TPC-DS has a separator at the end of each line, which will be wrongly recognized as one more column.

At this point, we have written all the data into lakehouse, and then we can use various query engines to benchmark the performance of the query predefined by TPC-DS.

Wrap up

TPC-DS is still a standard on lakehouse, but there are fewer resources on how to test it on lakehouse.

This article provides a quick overview of how to import data on lakehouse, and I believe that following the steps should be feasible.

The only thing left to do is to actually query on it, but that's a less difficult task, so I won't dive into it.

Top comments (0)