Ingesting data files in Apache Kafka is a very common task. Among all the various file formats that we can find, CSV is probably the most popular one to move data between different systems. This is due to its simplicity and to the fact that it can be used to export or import data from one (small) database to another.
A CSV file is nothing more than a text file (with a .csv
extension). Each line of the file represents a data record and each record consists of one or more fields, separated by a comma (or another separator).
Here is a chunk of example :
40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987; U2;Rock
In Addition, a CSV file can contain a header line to indicate the name of each field.
title;album;duration;release;artist;type
In this article, we will see how to integrate such a file in Apache Kafka. Of course, we are not going to reinvent the wheel. Many tools already exist to do this and are available in open-source.
We will use the Kafka Connect framework, which is part of the Apache Kafka project. Kafka Connect has been designed to move data in and out of Kafka using connectors.
Kafka Connect File Pulse connector
The Kafka Connect File Pulse connector makes it easy to parse, transform, and stream data file into Kafka. It supports several formats of files, but we will focus on CSV.
For a broad overview of FilePulse, I suggest you read this article :
For more information, you can check-out the documentation here.
How to use the connector
The easiest and fastest way to get started with Connect the FilePulse connector is to use the Docker image available on Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
You can download the docker-compose.yml
file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
You can also install the connector either from GitHub Releases Page or from Confluent Hub.
Ingesting data
Let's create a first connector instance :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"musics-filepulse-csv-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
You can check the connector is properly started by executing:
$ curl -s localhost:8083/connectors/source-csv-filepulse-00/status|jq '.connector.state'
"RUNNING"
Right now our connector is not processing any data at all. Let's copy a CSV file into the input directory /tmp/kafka-connect/examples/
:
// Download CSV file
$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
$ curl -sSL $GITHUB_REPO_MASTER/examples/quickstart-musics-dataset.csv -o musics-dataset.csv
// Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv
Let's check if we have some data into the output topic musics-filepulse-csv-00
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
{
"message": {
"string": "Zoo Station;Achtung Baby;04:36;1991;U2;Rock"
},
"headers": {
"array": [
{
"string": "title;album;duration;release;artist;type"
}
]
}
}
Note: In the example above, we are using kafkacat to consume messages. The option -o-1
is used to only consume the latest message.
As we can see, our topic contains one Avro message for each line of the CSV file. The reason for this is because we have configured our connector to use the RowFileInputReader
(see: task.reader.class
configuration)
Each record contains two fields:
message
: This field is of typestring
and represents a single line of the input file.headers
: This field is of typearray
and contains the first header line of the input file. This field is automatically added by theRowFileInputReader
because we have configured it withskip.headers=1
.
Headers
The FilePulse connector will add headers to each record containing metadata about the source file from which the data was extracted. This can be useful for debugging but also for data lineage.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .headers
[
"connect.file.name",
"musics-dataset-00.csv",
"connect.file.path",
"/tmp/kafka-connect/examples",
"connect.file.hash",
"1466679696",
"connect.file.size",
"6588",
"connect.file.lastModified",
"1597337097000",
"connect.hostname",
"57a2fb6213f9"
]
Parsing data
The File Pulse connector allows us to define complex pipelines to parse, transform, and enrich data through the use of processing Filters.
You can use the built-in DelimitedRowFilter to parse each line. Also, because the first line of the CSV file is a header you can set the property extractColumnName
to name the record's fields based on the headers
field.
Let's create a new connector with this new configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"musics-filepulse-csv-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"filters":"ParseLine",
"filters.ParseLine.extractColumnName": "headers",
"filters.ParseLine.trimColumn": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
}'
Then, let's consume the topic musics-filepulse-csv-01
:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-01 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
{
"title": {
"string": "Zoo Station"
},
"album": {
"string": "Achtung Baby"
},
"duration": {
"string": "04:36"
},
"release": {
"string": "1991"
},
"artist": {
"string": "U2"
},
"type": {
"string": "Rock"
}
}
Filtering data
Sometimes you may want to keep only the lines in a file that have a field with a particular value. For this article, let's imagine we need to only keep the song from the band AC/DC.
For doing this we will extend our filter chain to use the DropFilter
and create a new connector (so that our file can be reprocessed):
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"musics-filepulse-csv-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"filters":"ParseLine,KeepACDC",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"filters.ParseLine.extractColumnName": "headers",
"filters.ParseLine.trimColumn": "true",
"filters.ParseLine.separator": ";",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
"filters.KeepACDC.invert":"true"
}'
The propery if
is set with a Simple Connect Expression Language (SCEL) which is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields.
Finally, you can check that you get the expected result by executing:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-02 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.artist
{
"string": "AC/DC"
}
Changing the field types
You have probably noticed that at no time have we defined the type of our fields. By default, the connector assumes that all fields are of type string and maybe you are happy with that. But in most cases, you will want to convert a few fields. For example, we can convert the field year
to be of type "Integer".
For doing this, you can use the AppendFilter
with the Simple connect Expression.
Lets' create our final configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-03/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"offset.strategy":"name",
"skip.headers": "1",
"topic":"musics-filepulse-csv-03",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"filters":"ParseLine,KeepACDC,ReleaseToInt",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"filters.ParseLine.extractColumnName": "headers",
"filters.ParseLine.trimColumn": "true",
"filters.ParseLine.separator": ";",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
"filters.KeepACDC.invert":"true",
"filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.ReleaseToInt.field": "$value.release",
"filters.ReleaseToInt.value": "{{ converts($value.release, '\''INTEGER'\'') }}",
"filters.ReleaseToInt.overwrite": "true"
}'
Finally, you can check converted field :
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-03 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.release
{
"release": {
"int": 1980
}
}
The AppendFilter
is a very handy filter that allows us to quickly modify a record. For example, we could also use it to set the key of each record to be equal to the album name by adding the following configuration:
"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"
Note: For this article, we have used here the filter chain mechanism provided by the Connect File Pulse connector. But it is also possible to use the Kafka Connect Single Message Transformations (SMT) to perform the same task using the org.apache.kafka.connect.transforms.Cast
.
Conclusion
We have seen in this article that it is fairly easy to load a CSV file into Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a β to the GitHub repository to support us. Thank you.
Top comments (2)
Nice article Florian! I'm currently working on a project whereby customers send us csv files and then we send a subset of these files in various formats to our vendors. How can I combine the file metadata (i.e. file name, last modified date) with the file in the csv so that I can insert each row into a db. Assume I have a file name foo.csv with 3 records:
first_name,last_name
Biff,Tannen
Florian,Hussonnois
I'm looking to insert the following into a db:
foo.csv,,Biff,Tannen
foo.csv,,Florian,Hussonnois
in the form of
[
{file_name: "foo.csv", file_date: , first_name: "Biff", last_name: "Tannen"},
{file_name: "foo.csv", file_date: , first_name: "Florian", last_name: "Hussonnois"}
]
How might I configure the connector to do this?
What configuration would you tell a source connector to process an Apache log file line by line as new lines are added to the log file, with the fields separated by spaces?
I do not know if it is possible with this connector, I do not see any example with the class io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader