TL;DR
Need to parse a large file using AWS Lambda in Node and split into individual files for later processing? Sample repo here:
drmikecrowe / serverless-s3-streaming-example
Serverless Project Streaming and Parsing S3 files
Serverless Project Streaming and Parsing S3 files
This repo illustrates how to stream a large file from S3 and split it into separate S3 files after removing prior files
Goals
- Parse a large file without loading the whole file into memory
- Remove old data when new data arrives
- Wait for all these secondary streams to finish uploading to s3
Managing Complex Timing
- Writing to S3 is slow. You must ensure you wait until the S3 upload is complete
- We can't start writing to S3 until all the old files are deleted.
- We don't know how many output files will be created, so we must wait until the input file has finished processing before starting to waiting for the outputs to finish
Demonstration Problem Statement
- A school district central computer uploads all the grades for the district for a semester
- The data file is has the following headers
School,Semester,Grade,Subject,Class,Student Name,Score
- Process…
Background
Let's face it, data is sometimes ugly. Sure, it's easy to get data from external systems. But how often is that external system giving the data to you in the right format?
Recently, I had to parse a large CSV file that had been uploaded to S3. This is an ideal fit for using AWS Lambda, and using serverless.com makes that process very smooth.
However, Lambda imposes memory limitations on processing. Fortunately, AWS supports the Node Streaming interface. No need to read the whole file into memory, simply stream it and process it with the excellent Node CSV package.
Here's where this story gets interested. What if the data is updated? To make matters worse, what if you have to replace the processed files with new files when an update comes in?
So, here are the challenges:
- Parse a large file without loading the whole file into memory
- Remove old data when new data arrives
- Wait for all these secondary streams to finish uploading to s3
So what is the difficulty here?
- Writing to S3 is slow. You must ensure you wait until the S3 upload is complete
- We can't start writing to S3 until all the old files are deleted.
- We don't know how many output files will be created, so we must wait until the input file has finished processing before starting to waiting for the outputs to finish
Demo Repository
To simulate this scenario, I contrived the following:
- A school district central computer uploads all the grades for the district for a semester
- The data file is has the following headers:
School,Semester,Grade,Subject,Class,Student Name,Score
- Process the uploaded file, splitting it into the following structure:
- Semester/School/Grade
- Create a file called Subject-Class.csv with all the grades for that class
- For this simulation, the central computer can update an entire Semester by uploading a new file. This could be set differently based on the application: For instance, if the central computer could upload the grades for a specific Semester + School, then we could update this line with the revised criteria to only clear that block of data
Here's the general outline of the demo program flow:
- Open the S3 file as a Stream (
readStream
) - Create a
csvStream
from the inputreadStream
- Pipe
readStream
tocsvStream
- While we have New Lines
- Is this line for a new school (i.e. new CSV file)?
- Start a PassThru stream (
passThruStream
) - Does this line start a new Semester (top-level folder we're replacing) in S3?
- Start deleting S3 folder
- Are all files deleted?
- Use
s3.upload
withBody
=passThruStream
to upload the file
- Use
- Start a PassThru stream (
- Write New Line to the
passThruStream
- Is this line for a new school (i.e. new CSV file)?
- Loop thru all
passThruStream
streams and close/end - Wait for all
passThruStream
streams to finish writing to S3
Key Concepts
Don't Call Promise.all()
Too Early
First, the main processing loop must wait for all lines to be processed before starting the Promise.all()
to wait for the writes to finish. In the above repo, see these lines:
this.pAllRecordsRead = this.openReadStream();
await this.pAllRecordsRead;
const promises: Promise<any>[] = [];
for (let group of Object.keys(this.outputStreams)) {
promises.push(this.outputStreams[group].pFinished);
}
await Promise.all(promises);
Use s3.upload
instead of s3.PutObject
s3.PutObject
requires knowing the length of the output. Use s3.upload
instead to stream an unknown size to your new file.
Wait for the S3.DeleteObjects
to complete
Timing is critical:
- Start the file/folder deletion promise
- Wait until that completes
- Open the output stream
You can see the specific timing here in the demo code.
Boiled down, it looks like the code below. In short:
- Every line is written to the
passThruStream
- When a new file must be created:
- If the old contents must be deleted
- Start the delete promise
- Otherwise
- Wait for the ongoing delete promise
- Open the
outputStream
- Pipe the
passThruStream
to the outputStream
- If the old contents must be deleted
if (!outputStreams[outputFileName]) {
const topLevelFolder = ...
if (!deletePromises[topLevelFolder]) {
deletePromises[topLevelFolder] = deleteOldFiles(topLevelFolder);
}
const passThruStream = ...
inputStream.on("end", () => passThruStream.end()); // End passThruStream when the reader completes
const pFinished = new Promise((resolve, reject) => {
(async () => {
await deletePromises[topLevelFolder];
outputStream = ...
passThruStream.pipe(outputStream);
...
})().catch((err) => {
reject(err);
});
});
const outputFile: IOutputFile = {
passThruStream,
pFinished,
};
outputStreams[outputFileName] = outputFile;
}
outputStreams[outputFileName].passThruStream.write(record);
Conclusion
Use Node Streams to buffer your S3 uploads. By using the PassThrough
stream, you can perform operations on your S3 bucket/folder prior to actually starting the s3 upload process.
Top comments (0)