Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.
Streams in Node.js
This is the fourth article of a series about streams in Node.js. It explains what pipeline does in Node.js, and how to connect streams using pipeline.
Streams in Node.js
- What is a Stream in Node.js?
- Connect streams with the pipe method
- Handle stream errors
- Connect streams with the pipeline method (this article)
How to connect streams with pipeline
The pipeline
is a module method to pipe between streams and generators. It forwards errors and cleans up. It also provides a callback when the pipeline is complete. The pipeline
method was added to Node.js v.10 to improve the experience of piping streams.
It takes any number of streams as arguments, and a callback function as its last argument. If an error occurs anywhere in the pipeline, the pipeline will end, and the callback will be invoked with the error. Also, if the pipeline successfully ends, the callback function is invoked. Hence, we have a way to see when the pipeline has completed.
Let's look at a code example. First we are going to create a sample file, then we are going to create a pipeline, with readable, PassThrough
and writable streams.
Create a file.
touch create-sample.js
Add code to create a sample file with lorem ipsum
.
const fs = require('fs');
fs.writeFileSync(
'input.txt',
"Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.",
{ encoding: 'utf8' },
);
Create a file.
touch streams-pipeline.js
Add sample code.
const { PassThrough, pipeline } = require('stream');
const fs = require('fs');
const input = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');
const passThrough = new PassThrough();
console.log('Starting pipeline...');
pipeline(input, passThrough, out, err => {
if (err) {
console.log('Pipeline failed with an error:', err);
} else {
console.log('Pipeline ended successfully');
}
});
Run the code with node streams-pipeline.js
from the terminal. The code will log Starting pipeline...
when the pipeline starts and Pipeline ended successfully
when the pipeline is done.
Now let's emit an error and see if the error handling is triggered. Add this line at the end of the code and run it again.
passThrough.emit('error', new Error('Oh no!'));
The code will log Starting pipeline...
when the pipeline starts, and then the error gets emitted by passThrough
and the pipeline will end with an error and log Pipeline failed with an error: Error: Oh no!
.
One of the big benefits with pipeline
is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up) This cleanup step prevents memory leaks, which can occur when a stream has ended, but has not released the memory it was using. When using the pipe
method, you are responsible for destroying streams yourself when an error occurs.
Using pipeline
simplifies error handling and stream cleanup. The method makes combining streams more readable and maintainable.
Transform stream with pipeline
Let's make a more powerful stream and create our own transform stream to alter data as it is streamed from the source to the destination.
Let's implement a simple transform with the pipeline
method, which transforms all strings that pass through to upper case. For input and output we are going to use process.stdin
and process.stdout
.
Create a file.
touch transform-it.js
Copy code.
const { Transform, pipeline } = require('stream');
const upperCaseTransform = new Transform({
transform: function(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
});
pipeline(process.stdin, upperCaseTransform, process.stdout, err => {
if (err) {
console.log('Pipeline encountered an error:', err);
} else {
console.log('Pipeline ended');
}
});
Run the file with node transform-it.js
and type your name in lower case. You will see that it gets transformed to upper case. You can exit the stream with ctrl+c
.
What happened in the code? We created a Transform stream using the constructor from the stream module. We are required to implement a transform
method on our transform stream. This transform function will receive a chunk of data that pass through the transform stream, the encoding of the chunk, and a callback function, which we can use to return the transformed data or an error. We are also converting the chunk data to a string, because by default the data chunk will be a Buffer
.
Transform streams can be very powerful for creating pipelines to alter or process streaming data and are much more composable than a listening to stream events like .on('data')
and then altering it.
TL;DR
- Using
pipeline
simplifies error handling and stream cleanup. - The
pipeline
method makes combining streams more readable and maintainable. - One of the big benefits with
pipeline
is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up).
Thanks for reading and if you have any questions , use the comment function or send me a message @mariokandut.
If you want to know more about Node, have a look at these Node Tutorials.
References (and Big thanks):
Top comments (0)