Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.
Streams in Node.js
This is the fifth article of a series about streams in Node.js. This article is about how to perform ETL operations (Extract, Transform, Load) on CSV data using streams.
Streams in Node.js
- What is a Stream in Node.js?
- Connect streams with the pipe method
- Handle stream errors
- Connect streams with the pipeline method
- Use streams to extract, transform and load data (this article)
Overview
When working with a flat data, we can just use the fs
module and streams
to process the data (memory-efficient). Instead of reading all the data into memory, we can read it in small chunks with the help of streams to avoid overconsumption of the memory.
In this article we are going to create sample data in a CSV file, extract this data, transform it and load the data.
A C omma- S eparated V alues file is a delimited text file that uses a comma to separate values. Read more here. We will transform the CSV
data to JSON
or better to ndjson, which is basically a file of JSON records separated by newlines and with the file extension .ndjson
. For sure, you are asking yourself - why are we not just using JSON? The main reason is fault tolerance. If only one single invalid record is written to JSON, the entire JSON file will be corrupted. The main difference between JSON and ndjson is that in ndjson files each line of a file must contain a single JSON record. Hence, a ndjson file contains valid JSON, but ndjson is not a valid JSON document. The ndjson format works well with streaming data and large sets of data, where each record is processed individually.
We are going to:
-
- Create CSV sample data
-
- Initialize project for NPM
-
- Create a CSV parser
-
- Add Transform stream
-
- Run & Done
1. Create CSV data
Let's create some sample CSV data, you can use the sample data below, or create your own data with FakerJS and convert it to CSV.
id,firstName,lastName,email,email2,randomized
100,Jobi,Taam,Jobi.Taam@yopmail.com,Jobi.Taam@gmail.com,Z lsmDLjL
101,Dacia,Elephus,Dacia.Elephus@yopmail.com,Dacia.Elephus@gmail.com,Za jfPaJof
102,Arlina,Bibi,Arlina.Bibi@yopmail.com,Arlina.Bibi@gmail.com,zmzlfER
103,Lindie,Torray,Lindie.Torray@yopmail.com,Lindie.Torray@gmail.com,ibVggFEh
104,Modestia,Leonard,Modestia.Leonard@yopmail.com,Modestia.Leonard@gmail.com," Tit KCrdh"
105,Karlee,Cornelia,Karlee.Cornelia@yopmail.com,Karlee.Cornelia@gmail.com,PkQCUXzq
106,Netty,Travax,Netty.Travax@yopmail.com,Netty.Travax@gmail.com,psJKWDBrXm
107,Dede,Romelda,Dede.Romelda@yopmail.com,Dede.Romelda@gmail.com,heUrfT
108,Sissy,Crudden,Sissy.Crudden@yopmail.com,Sissy.Crudden@gmail.com,cDJxC
109,Sherrie,Sekofski,Sherrie.Sekofski@yopmail.com,Sherrie.Sekofski@gmail.com,dvYHUJ
110,Sarette,Maryanne,Sarette.Maryanne@yopmail.com,Sarette.Maryanne@gmail.com,rskGIJNF
111,Selia,Waite,Selia.Waite@yopmail.com,Selia.Waite@gmail.com,DOPBe
112,Karly,Tjon,Karly.Tjon@yopmail.com,Karly.Tjon@gmail.com,zzef nCMVL
113,Sherrie,Berriman,Sherrie.Berriman@yopmail.com,Sherrie.Berriman@gmail.com,rQqmjw
114,Nadine,Greenwald,Nadine.Greenwald@yopmail.com,Nadine.Greenwald@gmail.com,JZsmKafeIf
115,Antonietta,Gino,Antonietta.Gino@yopmail.com,Antonietta.Gino@gmail.com,IyuCBqwlj
116,June,Dorothy,June.Dorothy@yopmail.com,June.Dorothy@gmail.com,vyCTyOjt
117,Belva,Merriott,Belva.Merriott@yopmail.com,Belva.Merriott@gmail.com,MwwiGEjDfR
118,Robinia,Hollingsworth,Robinia.Hollingsworth@yopmail.com,Robinia.Hollingsworth@gmail.com,wCaIu
119,Dorthy,Pozzy,Dorthy.Pozzy@yopmail.com,Dorthy.Pozzy@gmail.com,fmWOUCIM
120,Barbi,Buffum,Barbi.Buffum@yopmail.com,Barbi.Buffum@gmail.com,VOZEKSqrZa
121,Priscilla,Hourigan,Priscilla.Hourigan@yopmail.com,Priscilla.Hourigan@gmail.com,XouVGeWwJ
122,Tarra,Hunfredo,Tarra.Hunfredo@yopmail.com,Tarra.Hunfredo@gmail.com,NVzIduxd
123,Madalyn,Westphal,Madalyn.Westphal@yopmail.com,Madalyn.Westphal@gmail.com,XIDAOx
124,Ruthe,McAdams,Ruthe.McAdams@yopmail.com,Ruthe.McAdams@gmail.com,iwVelLKZH
125,Maryellen,Brotherson,Maryellen.Brotherson@yopmail.com,Maryellen.Brotherson@gmail.com,nfoiVBjjqw
126,Shirlee,Mike,Shirlee.Mike@yopmail.com,Shirlee.Mike@gmail.com,MnTkBSFDfo
127,Orsola,Giule,Orsola.Giule@yopmail.com,Orsola.Giule@gmail.com,VPrfEYJi
128,Linzy,Bennie,Linzy.Bennie@yopmail.com,Linzy.Bennie@gmail.com,ZHctp
129,Vanessa,Cohdwell,Vanessa.Cohdwell@yopmail.com,Vanessa.Cohdwell@gmail.com,RvUcbJihHf
130,Jaclyn,Salvidor,Jaclyn.Salvidor@yopmail.com,Jaclyn.Salvidor@gmail.com,gbbIxz
131,Mildrid,Pettiford,Mildrid.Pettiford@yopmail.com,Mildrid.Pettiford@gmail.com,snyeV
132,Carol-Jean,Eliathas,Carol-Jean.Eliathas@yopmail.com,Carol-Jean.Eliathas@gmail.com,EAAjYHiij
133,Susette,Ogren,Susette.Ogren@yopmail.com,Susette.Ogren@gmail.com," BhYgr"
134,Farrah,Suanne,Farrah.Suanne@yopmail.com,Farrah.Suanne@gmail.com,hYZbZIc
135,Cissiee,Idelia,Cissiee.Idelia@yopmail.com,Cissiee.Idelia@gmail.com,PNuxbvjx
136,Alleen,Clara,Alleen.Clara@yopmail.com,Alleen.Clara@gmail.com,YkonJWtV
137,Merry,Letsou,Merry.Letsou@yopmail.com,Merry.Letsou@gmail.com,sLfCumcwco
138,Fanny,Clywd,Fanny.Clywd@yopmail.com,Fanny.Clywd@gmail.com,Go kx
139,Trixi,Pascia,Trixi.Pascia@yopmail.com,Trixi.Pascia@gmail.com,lipLcqRAHr
140,Sandie,Quinn,Sandie.Quinn@yopmail.com,Sandie.Quinn@gmail.com,KrGazhI
141,Dania,Wenda,Dania.Wenda@yopmail.com,Dania.Wenda@gmail.com,CXzs kDv
142,Kellen,Vivle,Kellen.Vivle@yopmail.com,Kellen.Vivle@gmail.com,RrKPYqq
143,Jany,Whittaker,Jany.Whittaker@yopmail.com,Jany.Whittaker@gmail.com,XAIufn
144,Lusa,Fillbert,Lusa.Fillbert@yopmail.com,Lusa.Fillbert@gmail.com,FBFQnPm
145,Farrah,Edee,Farrah.Edee@yopmail.com,Farrah.Edee@gmail.com,TrCwKb
146,Felice,Peonir,Felice.Peonir@yopmail.com,Felice.Peonir@gmail.com,YtVZywf
147,Starla,Juan,Starla.Juan@yopmail.com,Starla.Juan@gmail.com,aUTvjVNyw
148,Briney,Elvyn,Briney.Elvyn@yopmail.com,Briney.Elvyn@gmail.com,tCEvgeUbwF
149,Marcelline,Ricarda,Marcelline.Ricarda@yopmail.com,Marcelline.Ricarda@gmail.com,sDwIlLckbd
150,Mureil,Rubie,Mureil.Rubie@yopmail.com,Mureil.Rubie@gmail.com,HbcfbKd
151,Nollie,Dudley,Nollie.Dudley@yopmail.com,Nollie.Dudley@gmail.com,EzjjrNwVUm
152,Yolane,Melony,Yolane.Melony@yopmail.com,Yolane.Melony@gmail.com,wfqSgpgL
153,Brena,Reidar,Brena.Reidar@yopmail.com,Brena.Reidar@gmail.com,iTlvaS
154,Glenda,Sabella,Glenda.Sabella@yopmail.com,Glenda.Sabella@gmail.com,zzaWxeI
155,Paola,Virgin,Paola.Virgin@yopmail.com,Paola.Virgin@gmail.com,gJO hXTWZl
156,Aryn,Erich,Aryn.Erich@yopmail.com,Aryn.Erich@gmail.com,qUoLwH
157,Tiffie,Borrell,Tiffie.Borrell@yopmail.com,Tiffie.Borrell@gmail.com,cIYuVMHwF
158,Anestassia,Daniele,Anestassia.Daniele@yopmail.com,Anestassia.Daniele@gmail.com,JsDbQbc
159,Ira,Glovsky,Ira.Glovsky@yopmail.com,Ira.Glovsky@gmail.com,zKITnYXyhC
160,Sara-Ann,Dannye,Sara-Ann.Dannye@yopmail.com,Sara-Ann.Dannye@gmail.com,wPClmU
161,Modestia,Zina,Modestia.Zina@yopmail.com,Modestia.Zina@gmail.com,YRwcMqPK
162,Kelly,Poll,Kelly.Poll@yopmail.com,Kelly.Poll@gmail.com,zgklmO
163,Ernesta,Swanhildas,Ernesta.Swanhildas@yopmail.com,Ernesta.Swanhildas@gmail.com,tWafP
164,Giustina,Erminia,Giustina.Erminia@yopmail.com,Giustina.Erminia@gmail.com,XgOKKAps
165,Jerry,Kravits,Jerry.Kravits@yopmail.com,Jerry.Kravits@gmail.com,olzBzS
166,Magdalena,Khorma,Magdalena.Khorma@yopmail.com,Magdalena.Khorma@gmail.com,BBKPB
167,Lory,Pacorro,Lory.Pacorro@yopmail.com,Lory.Pacorro@gmail.com,YmWQB
168,Carilyn,Ethban,Carilyn.Ethban@yopmail.com,Carilyn.Ethban@gmail.com,KUXenrJh
169,Tierney,Swigart,Tierney.Swigart@yopmail.com,Tierney.Swigart@gmail.com,iQCQJ
170,Beverley,Stacy,Beverley.Stacy@yopmail.com,Beverley.Stacy@gmail.com,NMrS Zpa f
171,Ida,Dex,Ida.Dex@yopmail.com,Ida.Dex@gmail.com,hiIgOCxNg
172,Sam,Hieronymus,Sam.Hieronymus@yopmail.com,Sam.Hieronymus@gmail.com,dLSkVe
173,Lonnie,Colyer,Lonnie.Colyer@yopmail.com,Lonnie.Colyer@gmail.com,ZeDosRy
174,Rori,Ethban,Rori.Ethban@yopmail.com,Rori.Ethban@gmail.com,SXFZQmX
175,Lelah,Niles,Lelah.Niles@yopmail.com,Lelah.Niles@gmail.com,NwxvCXeszl
176,Kathi,Hepsibah,Kathi.Hepsibah@yopmail.com,Kathi.Hepsibah@gmail.com,SOcAOSn
177,Dominga,Cyrie,Dominga.Cyrie@yopmail.com,Dominga.Cyrie@gmail.com,IkjDyuqK
178,Pearline,Bakerman,Pearline.Bakerman@yopmail.com,Pearline.Bakerman@gmail.com,vHVCkQ
179,Selma,Gillan,Selma.Gillan@yopmail.com,Selma.Gillan@gmail.com,hSZgpBNsw
180,Bernardine,Muriel,Bernardine.Muriel@yopmail.com,Bernardine.Muriel@gmail.com,AnSDTDa U
181,Ermengarde,Hollingsworth,Ermengarde.Hollingsworth@yopmail.com,Ermengarde.Hollingsworth@gmail.com,IYQZ Nmv
182,Marguerite,Newell,Marguerite.Newell@yopmail.com,Marguerite.Newell@gmail.com,kSaD uaHH
183,Albertina,Nisbet,Albertina.Nisbet@yopmail.com,Albertina.Nisbet@gmail.com,Y jHyluB
184,Chere,Torray,Chere.Torray@yopmail.com,Chere.Torray@gmail.com,loElYdo
185,Vevay,O'Neill,Vevay.O'Neill@yopmail.com,Vevay.O'Neill@gmail.com,uLZSdatVn
186,Ann-Marie,Gladstone,Ann-Marie.Gladstone@yopmail.com,Ann-Marie.Gladstone@gmail.com,fwKlEksI
187,Donnie,Lymann,Donnie.Lymann@yopmail.com,Donnie.Lymann@gmail.com,deBrqXyyjf
188,Myriam,Posner,Myriam.Posner@yopmail.com,Myriam.Posner@gmail.com,gEMZo
189,Dale,Pitt,Dale.Pitt@yopmail.com,Dale.Pitt@gmail.com,OeMdG
190,Cindelyn,Thornburg,Cindelyn.Thornburg@yopmail.com,Cindelyn.Thornburg@gmail.com,kvhFmKGoMZ
191,Maisey,Hertzfeld,Maisey.Hertzfeld@yopmail.com,Maisey.Hertzfeld@gmail.com,OajjJ
192,Corina,Heisel,Corina.Heisel@yopmail.com,Corina.Heisel@gmail.com,luoDJeHo
193,Susette,Marcellus,Susette.Marcellus@yopmail.com,Susette.Marcellus@gmail.com,AXHtR AyV
194,Lanae,Sekofski,Lanae.Sekofski@yopmail.com,Lanae.Sekofski@gmail.com,FgToedU
195,Linet,Beebe,Linet.Beebe@yopmail.com,Linet.Beebe@gmail.com,DYGfRP
196,Emilia,Screens,Emilia.Screens@yopmail.com,Emilia.Screens@gmail.com,LXUcleSs
197,Tierney,Avi,Tierney.Avi@yopmail.com,Tierney.Avi@gmail.com,VegzbHH
198,Pollyanna,Thar,Pollyanna.Thar@yopmail.com,Pollyanna.Thar@gmail.com,GjYeEGK
199,Darci,Elephus,Darci.Elephus@yopmail.com,Darci.Elephus@gmail.com,DaQNdN
Create a project folder:
mkdir node-streams-etl
Create a csv file in the folder:
cd node-streams-etl
touch sample-data.csv
Copy all sample data into the csv file and save it. Use copy+paste or fs.writeFile
in the REPL or with the -p
flag in the terminal.
2. Initialize project for NPM
We are going to use npm packages, hence, we have to initialize the project to get a package.json
npm init -y
Let's add a main file for the code.
touch index.js
First, we are going to create a readable stream to read the CSV data from sample-date.csv
, and a writable stream, which will be the destination. For now, we just copy the sample data. To connect readStream and writeStream we are going to use the pipeline
method. Error handling is much easier than with the pipe
method. Check out the article How to Connect streams with the pipeline method.
const fs = require('fs');
const { pipeline } = require('stream');
const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');
pipeline(inputStream, outputStream, err => {
if (err) {
console.log('Pipeline encountered an error.', err);
} else {
console.log('Pipeline completed successfully.');
}
});
3. Create a CSV parser
We have to convert the CSV file to JSON, as so often, for every problem, there is a package. In that use-case, there is csvtojson. This module will parse the header row to get key and then parse each row to create a JSON object.
Let's install it.
npm install csvtojson
After the successful installation we can require
the module and add it to the pipeline
after the inputStream
. The data will flow from CSV file
to CSV Parser
then into Output file
.
We are going to use the pipeline
method, since it's the preferred way since Node.js v.10 to connect streams and pipe data between them. It also helps to clean up streams on completion or failure, because when an error occurs the streams involved will be destroyed to avoid memory leaks.
const fs = require('fs');
const { pipeline } = require('stream');
const csv = require('csvtojson');
const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');
const csvParser = csv();
pipeline(inputStream, csvParser, outputStream, err => {
if (err) {
console.log('Pipeline encountered an error.', err);
} else {
console.log('Pipeline completed successfully.');
}
});
4. Add Transform stream
The data is now emitted to the outputStream
as ndjson
with each data row a valid JSON. Now, we want to transform the data. Since we are using csvtojson
, we could utilize the built-in subscribe
method, which could be used to handle each record after it has been parsed. Though, we want to create a transform stream. Our sample data has the keys id, firstName, lastName, email, email2, randomized
. We want to get rid of the randomized
property in each entry and rename email2
to emailBusiness
.
Transform
streams must implement a transform
method that receives chunk of data as the first argument. It will also receive the encoding type of the data chunk, and a callback function.
const transformStream = new Transform({
transform(chunk, encoding, cb) {
try {
// clone person object
let person = Object.assign({}, JSON.parse(chunk));
// remove randomized property and rename email2 to emailBusiness
person = {
id: person.id,
firstName: person.firstName,
lastName: person.lastName,
emailBusiness: person.email2,
};
cb(null, JSON.stringify(person) + `\n`);
} catch (err) {
cb(err);
}
},
});
Now let's add the transformStream
to the pipeline.
pipeline(
inputStream,
csvParser,
transformStream,
outputStream,
err => {
if (err) {
console.log('Pipeline encountered an error.', err);
} else {
console.log('Pipeline completed successfully.');
}
},
);
5. Run & Done
Run the application with node index.js
and the data in the ndjson
file should look like this.
{"id":"100","firstName":"Jobi","lastName":"Taam","emailBusiness":"Jobi.Taam@gmail.com"}
{"id":"101","firstName":"Dacia","lastName":"Elephus","emailBusiness":"Dacia.Elephus@gmail.com"}
{"id":"102","firstName":"Arlina","lastName":"Bibi","emailBusiness":"Arlina.Bibi@gmail.com"}
Error handling always has to be done, when working with streams. Since we already did the error handling for all streams, because we are using the pipeline method, the sample project is done.
Congratulations. 🚀✨
TL;DR
- The Newline-delimited JSON (ndjson) format works well with streaming data and large sets of data, where each record is processed individually, and it helps to reduce errors.
- Using pipeline simplifies error handling and stream cleanup, and it makes combining streams more readable and maintainable.
Thanks for reading and if you have any questions , use the comment function or send me a message @mariokandut.
If you want to know more about Node, have a look at these Node Tutorials.
References (and Big thanks):
HeyNode,Node.js - Streams,MDN - Streams,Format and MIME Type,ndjson,csvtojson
Top comments (0)