TLDR: I build a simple multipart/form-data parser in TypeScript to demonstrate how stream processing works. You can checkout the github repo here
Streams are integral to Node.js, empowering developers to handle data in a more resource-efficient and scalable manner. In essence, a stream is an abstract interface within Node.js designed for the management of streaming data. It offers a mechanism for reading or writing data progressively over a certain period, instead of loading all data simultaneously into memory. This makes streams instrumental in efficiently processing massive data volumes without obstructing the event loop or overwhelming the memory. Node.js features four distinct stream types: Readable, Writable, Duplex, and Transform. Each stream type is equipped with its unique set of methods and events for seamless streaming data manipulation.
The NodeJS http.IncomingMessage extends stream.Readable which means we can read incoming request data using the Readable stream API. We can read data calling its "read" method or listening for its "data" event. We will be using through2 to transform this IncomingMessage
stream into one that outputs the form data.
The multipart/form-data format
"Multipart" refers to a type of message that contains multiple parts of pieces of data that are combined into a single message. HTTP clients use the media type "multipart/form-data" to send
files and data to an HTTP Server.
Each part of the multipart message is separated by a boundary. The boundary is a string of characters, and it is provided by the HTTP client in the content-type
header.
See an example of a multipart/form-data
request:
POST /upload HTTP/1.1
Host: reqbin.com
Content-Type: multipart/form-data; boundary=0000
Content-Length: 526
--0000
Content-Disposition: form-data; name="username"
[Username]
--0000
Content-Disposition: form-data; name="file"; filename="image1.jpg"
Content-Type: image/jpeg
[image data]
--0000
Content-Disposition: form-data; name="file"; filename="image2.jpg"
Content-Type: image/jpeg
[image data]
--0000--
In our case, each part of the multipart message is a field from a form. The Content Disposition header gives us information about the field. We can identify file-type fields by checking for a filename
in the disposition header.
The plan
Extracting fields from the request stream is way harder than it seems (at least to me it was). There are some key ideas that I employed to eventually achieve this.
Split data processing into multiple stages
Trying to identify the field boundaries and then processing each field data at the same time was difficult. I couldn't get it working perfectly.
In contrast, separating the problem into:
- identifying field boundaries
- parsing field data was surprisingly easy. I decided to implement this as two different streams:
- The first stream detects field boundaries, collects the data between the boundaries and outputs that data
- The second stream processes the field data and outputs a Javascript object. I used through2 to create the two streams and combine them into one "form" stream. ### Sliding Windows Detecting boundary and new line characters required another breakthrough. The input stream provides data in chunks. A chunk of data is an array of bytes. In each chunk, you may have partial or full boundary or new line characters. To reliably detect these strings of characters, I slide each character in each chunk through a buffer of length n from the right. Eventually, the buffer will get full and the oldest character in the buffer will be shifted out for the next character.
After I shift the buffer left for each new character, I compare the contents of the buffer to see if it matches my target string
This acts like a sliding window for detecting strings of characters of length, n. See a pseudo code below;
boundary = 'boudary...chars...'
slidingBuffer = createFixedLengthBuffer(boudary.length)
stream.on((chunk) => {
data = []
for (byte in chunk) {
shiftedOutChar = shiftLeft(slidingBuffer, byte)
data.push(shiftedOutChar)
if (equals(slidingBuffer, boundary)) {
// Boundary detected
}
}
})
The Implementation
Let's create a quick http
server to receive our multipart/form-data
request.
import http from 'http';
import { createFormStream } from './create-form-stream';
import { Field } from './Field';
interface IMakeServerProps {
onText: (data: Field) => void;
onFile: (data: Field) => void;
}
export function makeServer({
onText,
onFile
}: IMakeServerProps) {
const server = http.createServer();
server.on('request', (request, res) => {
const contentType = request.headers['content-type'];
const method = request.method;
if (method?.toLowerCase() !== 'post') {
res.writeHead(405);
res.end("Method Not Allowed");
return;
}
if (!contentType?.startsWith('multipart/form-data')) {
res.writeHead(415);
res.end("Unsupported Media Type");
return;
}
const boundary = contentType.split("boundary=")[1]?.split(";")[0]; // Extract boundary characters
const form = createFormStream(request, Buffer.from("--" + boundary));
form.on('data', (field: Field) => {
if (field.type === 'text') onText(field);
if (field.type === 'file') onFile(field);
});
form.on('end', () => {
res.writeHead(200);
res.end();
});
});
return {
server
}
}
In create-form-stream.ts
, we define the createFormStream(stream, boundary)
which starts by defining some buffers that we will need:
let boundarySlidingBuffer = Buffer.alloc(boundary.length);
let fieldBuffer = Buffer.alloc(0);
We create our first stream to output each field data in one chunk:
return stream
.pipe(
through2(function (chunk, _enc, callback) {
// Detect boundaries and output each field chunk
let data: Buffer;
for (let i = 0; i < chunk.length; i++) {
data = shiftLeft(boundarySlidingBuffer, chunk[i]); // Shift into our sliding buffer
fieldBuffer = Buffer.concat([fieldBuffer, data]); // Concat shifted out data with existing field data in fieldBuffer
if (boundarySlidingBuffer.compare(boundary) === 0) {
// Boundary detected
// Remove "\r\n" from the beginning and end then push
this.push(fieldBuffer.subarray(2, -2));
fieldBuffer = Buffer.alloc(0); // Clear fieldBuffer
boundarySlidingBuffer = Buffer.alloc(boundary.length); // Clear sliding buffer
}
}
callback();
})
)
The second stream processes each field data chunk:
.pipe(
through2.obj(function (chunk, _enc, callback) {
const lines: string[] = [];
let dataStartIndex = 0;
const newLine = Buffer.from("\r\n");
let newLineSlidingBuffer = Buffer.alloc(newLine.length);
let line = Buffer.alloc(0);
let data: Buffer;
for (let i = 0; i < chunk.length; i++) {
data = shiftLeft(newLineSlidingBuffer, chunk[i]);
line = Buffer.concat([line, data]);
if (newLineSlidingBuffer.compare(newLine) === 0) {
// New line detected
lines.push(line.toString());
if (lines[lines.length - 1] === '') {
// Break at first empty line. Time for data!
dataStartIndex = i + 1;
break;
}
line = Buffer.alloc(0);
newLineSlidingBuffer = Buffer.alloc(newLine.length);
}
}
const builder = new FieldBuilder();
const disposition = parseDisposition(lines[0]);
builder
.name(disposition.name)
.filename(disposition.filename);
if (lines[1] !== '') {
const contentType = parseContentType(lines[1]);
builder.contentType(contentType);
}
this.push(
builder
.content(chunk.subarray(dataStartIndex))
.build()
);
callback();
})
);
Conclusion
I hope that this article has given you a better understanding of how to work with streams in Node.js and how your favorite multipart/form-data parser library works.
The code for this article can be found here.
Top comments (0)