- Published on
Node.js Streams in Production — Backpressure, Pipeline, and Memory-Efficient Processing
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Streams are Node.js's most powerful abstraction for handling unbounded data. Reading 10GB files or processing millions of API responses without consuming gigabytes of memory is streams' promise. Yet most teams ignore backpressure, misuse pipeline, and leak memory in stream chains. This post covers production streaming patterns that actually scale.
- The Four Stream Types and Their Purpose
- highWaterMark and Backpressure Mechanics
- stream.pipeline() for Robust Error Handling
- Async Generators as Readable Sources
- Streaming CSV Processing with Transform
- Streaming S3 Download to HTTP Response
- AbortSignal for Stream Cancellation
- Common Memory Leak Patterns
- Checklist
- Conclusion
The Four Stream Types and Their Purpose
Readable, Writable, Transform, and Duplex streams each solve specific problems. Understanding their interfaces is foundational.
import {
Readable,
Writable,
Transform,
Duplex,
} from 'stream';
// Readable: emit data
const readable = new Readable({
read(size: number): void {
this.push('chunk of data\n');
this.push(null); // EOF
},
});
// Writable: consume data
const writable = new Writable({
write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void
): void {
console.log('Received:', chunk.toString());
callback();
},
});
// Transform: read input, write output
const transform = new Transform({
transform(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null, data?: any) => void
): void {
const uppered = chunk.toString().toUpperCase();
callback(null, uppered);
},
});
// Duplex: separate read and write queues
const duplex = new Duplex({
read(size: number): void {
this.push('readable side\n');
},
write(
chunk: Buffer,
encoding: string,
callback: (error?: Error | null) => void
): void {
console.log('writable side:', chunk.toString());
callback();
},
});
readable.pipe(transform).pipe(writable);
highWaterMark and Backpressure Mechanics
Backpressure happens when a downstream consumer can't keep pace with an upstream source. Ignoring backpressure causes unbounded memory growth.
import fs from 'fs';
import { Transform } from 'stream';
// ANTI-PATTERN: ignoring backpressure
function slowConsumer(): void {
const readable = fs.createReadStream('10gb-file.json', {
highWaterMark: 64 * 1024, // 64KB chunks
});
readable.on('data', (chunk) => {
// Slow process that takes 1 second per chunk
processSlowly(chunk);
// BUG: never checks if drain is needed
// Data accumulates in buffer → OOM
});
}
// CORRECT: respect backpressure
function respectfulConsumer(): void {
const readable = fs.createReadStream('10gb-file.json', {
highWaterMark: 64 * 1024,
});
const process = (chunk: Buffer): void => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Downstream buffer full, pause upstream
readable.pause();
}
};
const writable = fs.createWriteStream('output.json');
writable.on('drain', () => {
// Downstream consumed data, resume upstream
readable.resume();
});
readable.on('data', process);
readable.on('end', () => writable.end());
}
// BEST: use pipe or pipeline (automatic backpressure)
function automaticBackpressure(): void {
const readable = fs.createReadStream('10gb-file.json');
const writable = fs.createWriteStream('output.json');
// pipe() handles backpressure automatically
readable.pipe(writable);
}
stream.pipeline() for Robust Error Handling
pipe() is convenient but doesn't properly clean up on errors. pipeline() is the production choice.
import { pipeline } from 'stream';
import fs from 'fs';
import { Transform } from 'stream';
import zlib from 'zlib';
// ANTI-PATTERN: pipe chain doesn't clean up on error
function fragileChain(): void {
fs.createReadStream('input.json')
.pipe(JSON.parse()) // throws error
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.json.gz'));
// If any stream errors, others remain open → fd leak
}
// CORRECT: use pipeline with error handling
function robustChain(): void {
const readable = fs.createReadStream('input.json');
const transform = new Transform({
transform(chunk, enc, cb) {
try {
const data = JSON.parse(chunk.toString());
cb(null, JSON.stringify(data) + '\n');
} catch (err) {
cb(err);
}
},
});
const gzip = zlib.createGzip();
const writable = fs.createWriteStream('output.json.gz');
pipeline(
readable,
transform,
gzip,
writable,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
// Streams automatically cleaned up
} else {
console.log('Pipeline succeeded');
}
}
);
}
// Promise-based pipeline (modern)
async function modernPipeline(): Promise<void> {
const { pipeline: pipelineAsync } = await import('stream/promises');
try {
await pipelineAsync(
fs.createReadStream('input.json'),
new Transform({
transform(chunk, enc, cb) {
cb(null, chunk.toString().toUpperCase());
},
}),
fs.createWriteStream('output.json')
);
} catch (err) {
console.error('Pipeline error:', err);
}
}
Async Generators as Readable Sources
Modern async generators provide a clean syntax for readable streams.
import { Readable } from 'stream';
async function* generateData(): AsyncGenerator<string> {
for (let i = 0; i < 1000; i++) {
await new Promise((resolve) => setTimeout(resolve, 10));
yield `Line ${i}\n`;
}
}
const readable = Readable.from(generateData());
readable.on('data', (chunk) => {
console.log('Chunk:', chunk.toString());
});
// Real example: streaming database results
async function* streamFromDatabase(
query: string
): AsyncGenerator<Buffer> {
const client = await getDbClient();
const cursor = client.query(query, { batchSize: 1000 });
try {
for await (const batch of cursor) {
yield Buffer.from(JSON.stringify(batch) + '\n');
}
} finally {
await cursor.close();
}
}
const dbStream = Readable.from(
streamFromDatabase('SELECT * FROM users WHERE active = true')
);
Streaming CSV Processing with Transform
Transform streams parse and process data on-the-fly without buffering entire files.
import { Transform, pipeline } from 'stream';
import fs from 'fs';
import csv from 'csv-parser';
// ANTI-PATTERN: load entire CSV into memory
async function loadAllCSV(filePath: string): Promise<any[]> {
return new Promise((resolve) => {
const rows: any[] = [];
fs.createReadStream(filePath)
.pipe(csv())
.on('data', (row) => rows.push(row))
.on('end', () => resolve(rows));
});
// OOM for 1GB+ CSV files
}
// CORRECT: process streaming
function processCSVStreaming(
filePath: string,
onRow: (row: Record<string, string>) => Promise<void>
): Promise<void> {
return new Promise((resolve, reject) => {
const transform = new Transform({
objectMode: true,
async transform(
row: Record<string, string>,
enc,
callback
) {
try {
await onRow(row);
callback();
} catch (err) {
callback(err);
}
},
});
pipeline(
fs.createReadStream(filePath),
csv(),
transform,
(err) => {
if (err) reject(err);
else resolve();
}
);
});
}
// Usage
await processCSVStreaming('data.csv', async (row) => {
// Process each row without loading entire file
await database.insert('users', row);
});
// Real: concurrent row processing with rate limiting
class ConcurrentProcessor extends Transform {
private active = 0;
private queue: any[] = [];
constructor(
private maxConcurrency: number,
private processor: (row: any) => Promise<void>
) {
super({ objectMode: true });
}
_transform(
row: any,
enc: string,
callback: Function
): void {
this.queue.push({ row, callback });
this.process();
}
private async process(): Promise<void> {
while (this.active < this.maxConcurrency && this.queue.length > 0) {
const { row, callback } = this.queue.shift()!;
this.active++;
try {
await this.processor(row);
callback();
} catch (err) {
callback(err);
}
this.active--;
this.process();
}
}
}
const processor = new ConcurrentProcessor(4, async (row) => {
await database.insert('users', row);
});
pipeline(
fs.createReadStream('data.csv'),
csv(),
processor,
(err) => {
if (err) console.error(err);
}
);
Streaming S3 Download to HTTP Response
Real example: serve large files from S3 via HTTP without buffering.
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
interface S3StreamOptions {
bucket: string;
key: string;
contentType: string;
}
class S3StreamHandler {
private s3: S3Client;
constructor() {
this.s3 = new S3Client({ region: 'us-east-1' });
}
async handleDownload(
req: any,
res: any,
options: S3StreamOptions
): Promise<void> {
try {
const command = new GetObjectCommand({
Bucket: options.bucket,
Key: options.key,
});
const response = await this.s3.send(command);
// Set headers before piping
res.setHeader('Content-Type', options.contentType);
res.setHeader(
'Content-Length',
response.ContentLength || 0
);
res.setHeader('Cache-Control', 'public, max-age=3600');
// Pipe S3 stream directly to HTTP response
// Backpressure handled automatically
const readable = response.Body as Readable;
readable.pipe(res);
readable.on('error', (err) => {
console.error('S3 stream error:', err);
res.statusCode = 500;
res.end('Download failed');
});
} catch (err) {
console.error('S3 error:', err);
res.statusCode = 500;
res.end('File not found');
}
}
}
// Express integration
import express from 'express';
const app = express();
const handler = new S3StreamHandler();
app.get('/download/:fileId', async (req, res) => {
await handler.handleDownload(req, res, {
bucket: 'my-bucket',
key: `files/${req.params.fileId}`,
contentType: 'application/octet-stream',
});
});
AbortSignal for Stream Cancellation
Cancel in-flight streams gracefully using AbortSignal.
import { Readable, Transform, pipeline } from 'stream';
import fs from 'fs';
function streamWithCancellation(): void {
const controller = new AbortController();
const readable = fs.createReadStream('large-file.bin');
const transform = new Transform({
transform(chunk, enc, cb) {
cb(null, chunk);
},
});
const writable = fs.createWriteStream('output.bin');
pipeline(readable, transform, writable, (err) => {
if (err?.code === 'ERR_STREAM_DESTROYED') {
console.log('Stream cancelled');
} else if (err) {
console.error('Pipeline error:', err);
}
});
// Cancel after 5 seconds
setTimeout(() => {
controller.abort();
readable.destroy();
}, 5000);
}
// HTTP request cancellation
async function handleFileDownload(req: any, res: any): Promise<void> {
const controller = new AbortController();
req.on('close', () => {
controller.abort();
});
try {
const { pipeline: pipelineAsync } = await import('stream/promises');
await pipelineAsync(
fs.createReadStream('large-file.bin'),
res,
{ signal: controller.signal }
);
} catch (err: any) {
if (err.name === 'AbortError') {
console.log('Download aborted by client');
} else {
console.error('Error:', err);
}
}
}
Common Memory Leak Patterns
Most stream leaks come from listeners not being cleaned up.
import { Readable, Writable } from 'stream';
import fs from 'fs';
// ANTI-PATTERN 1: Anonymous listeners accumulate
function leakingListeners(): void {
const readable = fs.createReadStream('file.txt');
// Each call adds another listener
readable.on('data', (chunk) => console.log(chunk));
readable.on('data', (chunk) => console.log(chunk));
// Memory leak: multiple identical listeners
// Calling again later
readable.on('data', (chunk) => console.log(chunk)); // More listeners
}
// FIX: removeListener or single handler
function properListeners(): void {
const readable = fs.createReadStream('file.txt');
const handler = (chunk: Buffer): void => {
console.log(chunk);
};
readable.on('data', handler);
// Remove later
readable.removeListener('data', handler);
}
// ANTI-PATTERN 2: Error listeners not cleaned up in pipeline
function errorLeakInPipeline(): void {
const r1 = fs.createReadStream('file1.txt');
const r2 = fs.createReadStream('file2.txt');
const w = fs.createWriteStream('output.txt');
r1.on('error', (err) => console.error('r1:', err));
r1.pipe(w);
// If r1 pipes to w, r2 is never cleaned up on error
// r2's listeners remain attached
r2.pipe(w);
}
// FIX: use pipeline with proper cleanup
import { pipeline } from 'stream';
function properErrorHandling(): void {
const r1 = fs.createReadStream('file1.txt');
const w = fs.createWriteStream('output.txt');
pipeline(r1, w, (err) => {
if (err) {
console.error('Pipeline failed:', err);
// Streams auto-destroyed
}
});
}
// ANTI-PATTERN 3: Readable not resumed after pause
function stalledReader(): void {
const readable = fs.createReadStream('file.txt');
readable.pause();
// Never resumed → stream hangs, memory held
}
// ANTI-PATTERN 4: Destroy called without destroying pipes
function unsafedDestroy(): void {
const r = fs.createReadStream('file.txt');
const w = fs.createWriteStream('output.txt');
r.pipe(w);
r.destroy(); // w still expects data → error
}
Checklist
- ✓ Always respect backpressure: check write() return value or use pipe/pipeline
- ✓ Use pipeline() over pipe() for automatic cleanup and error propagation
- ✓ Set appropriate highWaterMark for your data throughput
- ✓ Use objectMode: true for non-buffer streams (objects, JSON)
- ✓ Cancel streams via AbortSignal and check abort in transform handlers
- ✓ Remove event listeners and destroy streams when done
- ✓ Test memory usage with streams processing 1GB+ files
- ✓ Use streaming for database exports, file uploads, and API responses
Conclusion
Streams are non-negotiable for processing unbounded data in Node.js. Master backpressure, leverage pipeline() for error safety, and avoid listener leaks. Combined with async generators and transform streams, you can build systems that process terabytes of data with constant memory usage.