Index
- Introduction
- Environment preparation and data download
- Comparing CSV to Apache Parquet
- Conclusion and final thoughts
Before we start just a word on why I wrote this...
This first part doesn't contain any technical content, you can go directly to introduction if you want.
This post builds on one of my previous posts: https://dev.to/zompro/quick-pandas-and-dask-comparison-processing-large-csv-files-real-world-example-that-you-can-do-now-1n15
That posts introduces Dask as an alternative to Pandas. This post presents Apache Parquet as an alternative to CSV files. Columnar files can perform very well and, in particular Parquet, can save a lot of space because of its compression capabilities.
This posts is an independent stand alone post (so there's some repetition from the previous post).
1. Introduction
Before we start you must:
- Have python 3.5 or older installed with venv installed
- At the moment a Linux-based system or a little bit of knowledge to translate the commands (I don't have a Windows machine close but give me a shout if you want me to translate the non-compatible commands)
- That's it!
Apache Parquet is a file format (a data storage format more specifically) that focus on handling data through columns (as opposite to rows like in a CSV file). It isn't the only columnar format but it is a well known and popular one. In a nutshell, this format stores information about the data allowing to perform certain operations very quickly. It also relies heavily on compression allowing for smaller file sizes.
If you come from a "normal" database or are used to work with Data Frames, it is normal to think in rows. You have a header row naming each column and then pick up the rows one by one. This structure represents very well the relationship that exists between the columns or fields. However, sometimes you want to run a query that affects heavily a particular column rather than each row.
Think for example trying to find a particular date or a range of dates in a timestamp column. In a bad scenario, you need to scan every record in the column to find what you need. Formats such as CSV don't know anything about the information they contain. Parquet will know more about your column and the data will be stored in a way that will make these type of queries perform much better.
Enough writing! let's get our hands into it! By the way, the Wikipedia page on Apache Parquet is amazing in case you want to go deeper. It's a very interesting but massive subject: https://en.wikipedia.org/wiki/Apache_Parquet
2. Environment preparation and data download
This section is very similar to my previous post (https://dev.to/zompro/quick-pandas-and-dask-comparison-processing-large-csv-files-real-world-example-that-you-can-do-now-1n15). If you already followed that post, you just need to install pyarrow by activating your virtual environment and running pip install pyarrow
If you are coming to this post first, carry on reading so you can get your environment and data ready.
We are going to create a virtual environment, install Dask, pyarrow and Jupyter Notebooks (this last one just to run our code).
We will now create the main folder called parquetdask
and a virtual environment called venv inside:
mkdir parquetdask
cd parquetdask
python3 -m venv venv
Now we will activate the virtual environment and install the packages we are going to need
source venv/bin/activate
pip install "dask[complete]==2.27.0" pyarrow==1.0.1 jupyter==1.0.0
Before we move on to our notebook, let's download the data we are going to use (this is the same data we used in the previous post so you don't have to download it again if you already have it). We will use the uk gov housing price paid data. Make sure you read the usage guidelines here https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads#using-or-publishing-our-price-paid-data
The copyright disclaimer:
Contains HM Land Registry data © Crown copyright and database right 2020. This data is licensed under the Open Government Licence v3.0.
We will download all the data ever recorded.
mkdir data
cd data
wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv
cd ..
We created a data folder, went into it, downloaded the data and now we are back at the root of our directory.
3. Comparing CSV to Apache Parquet
Start your notebook
jupyter notebook
Then create a new notebook and copy these sections into separated sections.
First we import the libraries we are going to need and start Dask (you can read more about this in my previous post)
import time
import os
import subprocess
import dask
import dask.dataframe as dd
from dask.delayed import delayed
import time
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=4)
Let's define some variables we will need
all_data = "data/pp-complete.csv"
columns = ["transaction", "price", "date", "postcode", "prop_type", "old_new", "duration", "paon", "saon", "street", "locality", "city", "district", "county", "ppd_cat", "rec_status"]
Now, we will run some analysis using Dask directly from our csv file.
start = time.time()
df = dd.read_csv(all_data, blocksize=32 * 1024 * 1024, header=0, names=columns)
df_by_county = df.groupby("county")["price"].sum().compute()
print(df_by_county)
end = time.time()
print("time elapsed {}".format(end-start))
The result I got was:
dask using a csv file - time elapsed 19.78 seconds
(remember from the previous post, pandas took over 50 secs)
Next, we will transform our CSV file to Parquet (it will take some time)
df.to_parquet(all_data + ".parquet", engine='pyarrow')
Before we move on, let's have a look at the size of the two files (the parquet and the csv one in our data folder). You can use the follow command in a unix-like system or just look at the size in your file browser (note that parquet is a folder, not a file)
print("{}G".format(round(os.path.getsize(all_data)/1000000000, 1)))
print(subprocess.check_output(['du','-sh', all_data + ".parquet"]).split()[0].decode('utf-8'))
The results I got was:
4.4G
2.2G
This is already showing the power of compression implemented in Parquet.
But what about performance? let's re-run the same process again, this time reading from the Parquet version:
start = time.time()
df = dd.read_parquet(all_data + ".parquet", blocksize=32 * 1024 * 1024, header=0, names=columns, engine='pyarrow')
df_by_county = df.groupby("county")["price"].sum().compute()
print(df_by_county)
end = time.time()
print("time elapsed {}".format(end-start))
My result I got was:
dask using a parquet file - time elapsed 13.65 seconds
That's approximately 30% faster than the csv. Not bad at all considering you also get the storage space improvement.
4. Conclusion and final thoughts
This article is just a very brief introduction to Apache Parquet. There's a lot more to discover and analyse.
For example, you can explore the folder created as the Parquet "file". In there you can see that the original csv file is split into lots of smaller files allowing for better parallel execution.
The file size (and number of) split is itself something you can modify to test for performance improvement. There will be a relation between the number of files and the number of Dask workers/threads running that will define the optimum performance.
You should notice also that there are metadata files. While these are not really fully human readable, they do show why this format performs so well in what we did. The file "knows" more about the data.
Finally, we are using PyArrow to handle the file but you could use Fastparquet as an alternative. Again, this is something else you can explore.
I hope you found this post interesting. As I said, this is just an introduction and there are lots more to explore!
Top comments (4)
If you are worried about space wouldn't you work from a gzipped CSV file? I wonder what the size of the CSV file is when gzipped - 9?
I believe Dask doesn't support reading from zip. See here: github.com/dask/dask/issues/2554#i....
Looking around it does seem to be able to read from a single gzip but it doesn't seem to be straightforward. If you need to unzip the file then any gains on storage would be nullified.
I would be very interested to try it if you know a way and compare size and performance. Normally storage is cheap, at least a lot cheaper than other resources so performance is in most cases the priority and the compression is a nice to have (depends on the use case of course)
Sadly I don't use Dask, but in the past I have used zcat on a Linux command line to stream a csv to stdin for a script to then process without needing the whole of the data uncompressed in memory/on disk.
Cool I can totally see a use case for that streaming into something like Apache Kafka. I will prototype a couple of things and see if it can become another little article. Thanks!