In a recent project I needed to collect the output from a process that was occurring on the server and output it to the user on the frontend. Ideally that would occur in nearly real time.
The process I needed to collect output from is a third-party library that outputs everything to STDOUT
or STDERR
. The easiest way for me to accomplish that without shenanigans was to create a module to run the library and then fork the module as a child process and then pipe the output to a stream handler. In additional to providing output to the user I needed to collect all the output for saving to AWS S3, so after forking the process I attached a data
event listener to STDOUT
and STDERR
for the child process and then write all the output to a file write stream.
const fs = require('fs');
// ...
const outputFileStream = fs.createWriteStream(tmpFilePath);
// ...
const forkedProcess = fork('path/to/my/module.js', {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
});
// ...
forkedProcess.stdout.on('data', (data) => {
outputFileStream.write(data.toString());
});
Once I had this working I needed to get the output to the frontend. Seeing as I didn't want to be reading from the output file I settled on using Redis. I hadn't used Redis's different data types much at this point, but it turned out that the List type worked well for me. I began calling LPUSH
as I received data to load it into Redis.
const fs = require('fs');
const redis = require('redis').createClient();
// ...
const outputFileStream = fs.createWriteStream(tmpFilePath);
// ...
forkedProcess.stdout.on('data', (data) => {
outputFileStream.write(data.toString());
redis.lpush(redisKey, data.toString());
});
I'd read a bit about Server-Sent Events (SSEs) and played around with them but I had never had an opportunity to use them in production. Since I only needed information to flow one-way I figured they would be the perfect solution. Once you have the data format expected they are very easy to implement from scratch in Express. The tricky part for me was making sure the Last-Event-ID
value restarted things in the correct place in the event of a disconnect.
async function sseHandler(req, res, next) {
// I was handling normal text responses and SSE responses with the same endpoint
// this ensures SSE only starts if that is what the request wants
if (req.header('accept').indexOf('text/event-stream') >= 0) {
// You need to send an initial response but keep the connection alive
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
// I was using a polyfill that had specific requirements for IE.
// It requires 2kB of padding for IE
res.write(`:${Array(2049).join(' ')}\n`);
// There is an explanation of the flushes later
res.flush();
const intervalId = setInterval(() => {
// Load redis info
// ...
res.write('id: LAST_REDIS_LIST_INDEX');
res.write('data: ....');
res.write('data: JSON_OBJECT_WITH_DATA');
res.write('data: ....\n\n');
res.flush();
}, 500);
// When the connection closes stop the interval
res.on('close', () => {
clearInterval(intervalId);
});
}
}
I started by sending random data every second just to make sure I had the basics of the SSEs working, then I went back through and added the logic to pull things from Redis. The real application also has a heartbeat event that it sends every 15 seconds to ensure the polyfill keeps the connection open.
I send back a JSON string with two properties. The first is a boolean flag indicating if the connection should be kept open; the second is an array of strings. Each item in the array is a single line of output from the background process. I included the boolean flag because if I closed the connection from the server side the client kept
reopening the connection. This allowed me to instruct the client to close the connection when there would be no new information coming through.
If you look at the snippet above you'll see a few res.flush();
lines. It took me longer than it should have to remember this, but if you have a compression middleware enabled for your Express application you may need those. In my case the compression
package was in use, so res.flush()
is what is added. With the compression package data is collected in a buffer, compressed, and then released all at once (in most cases). So what was happening was the output from my SSE was being trapped in the buffer and not being released until the connection was ending and then it would all be released at once. So if you are working with SSEs and have all your data arriving at one time instead of in chunks like it should go ahead and look in your app for compression and caching libraries.
That's all there really is to the general solution.
Top comments (0)