- Published on
Streaming LLM Responses in Node.js — SSE, Backpressure, and UX Patterns
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Streaming LLM responses reduces perceived latency from 5+ seconds to 500ms+ first token. This guide covers Server-Sent Events, async iterators, backpressure handling, and metrics that matter.
- OpenAI Streaming with Async Iterators
- Server-Sent Events Endpoint
- Backpressure Handling
- Partial JSON Parsing for Streaming Structured Output
- Stream Cancellation on Disconnect
- Streaming with Tool Calls
- Abort Signal Propagation
- Latency Metrics: TTFT and TBT
- Checklist
- Conclusion
OpenAI Streaming with Async Iterators
OpenAI's Node SDK returns async iterables for streaming. Proper handling prevents memory leaks.
import OpenAI from 'openai';
interface StreamMetrics {
ttft_ms: number; // Time to first token
tokens_received: number;
total_duration_ms: number;
}
class OpenAIStreamHandler {
private client = new OpenAI();
async streamCompletion(
prompt: string,
onToken: (token: string) => void,
onError: (error: Error) => void
): Promise<StreamMetrics> {
const startTime = Date.now();
let firstTokenTime: number | null = null;
let tokenCount = 0;
try {
const stream = await this.client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
max_tokens: 2000
});
// Async iterator automatically handles cleanup on break/error
for await (const chunk of stream) {
if (!firstTokenTime) {
firstTokenTime = Date.now();
}
if (chunk.choices[0]?.delta?.content) {
onToken(chunk.choices[0].delta.content);
tokenCount++;
}
}
return {
ttft_ms: firstTokenTime ? firstTokenTime - startTime : 0,
tokens_received: tokenCount,
total_duration_ms: Date.now() - startTime
};
} catch (error) {
onError(error as Error);
throw error;
}
}
async *streamCompletionIterator(prompt: string): AsyncGenerator<string> {
const stream = await this.client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
yield chunk.choices[0].delta.content;
}
}
}
}
Server-Sent Events Endpoint
Expose streaming via SSE for browser clients. Handle disconnection gracefully.
import { Request, Response } from 'express';
class SSEStreamingEndpoint {
setupSSERoute(app: any): void {
app.get('/api/chat/stream', async (req: Request, res: Response) => {
const prompt = req.query.q as string;
const clientId = req.get('X-Client-ID') || 'unknown';
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no' // Disable nginx buffering
});
const handler = new OpenAIStreamHandler();
let tokenCount = 0;
const startTime = Date.now();
try {
for await (const token of handler.streamCompletionIterator(prompt)) {
if (res.closed || res.destroyed) {
console.log(`Client ${clientId} disconnected`);
break;
}
tokenCount++;
res.write(
`data: ${JSON.stringify({ token, index: tokenCount })}\n\n`
);
// Allow event loop to process
if (tokenCount % 10 === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
const duration = Date.now() - startTime;
res.write(
`data: ${JSON.stringify({
done: true,
tokens: tokenCount,
duration_ms: duration
})}\n\n`
);
res.end();
} catch (error) {
res.write(
`data: ${JSON.stringify({ error: (error as Error).message })}\n\n`
);
res.end();
}
});
}
}
Backpressure Handling
Pause writing when buffers fill. Prevents memory bloat and OOM crashes.
class BackpressureStreamHandler {
async streamWithBackpressure(
prompt: string,
res: Response,
handler: OpenAIStreamHandler
): Promise<void> {
const queue: string[] = [];
let draining = false;
const write = async (data: string): Promise<void> => {
return new Promise((resolve, reject) => {
const ok = res.write(data);
if (!ok) {
// Buffer is full, pause writing
draining = true;
res.once('drain', () => {
draining = false;
resolve();
});
res.once('error', reject);
} else {
resolve();
}
});
};
const processQueue = async (): Promise<void> => {
while (queue.length > 0 && !draining) {
const chunk = queue.shift()!;
await write(chunk);
}
};
try {
for await (const token of handler.streamCompletionIterator(prompt)) {
queue.push(
`data: ${JSON.stringify({ token })}\n\n`
);
// Process queue if not draining
if (!draining) {
await processQueue();
}
// If queue is large, throttle
if (queue.length > 100) {
await new Promise(resolve => setTimeout(resolve, 10));
}
}
// Drain remaining queue
await processQueue();
res.write('data: {"done":true}\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: (error as Error).message })}\n\n`);
res.end();
}
}
}
Partial JSON Parsing for Streaming Structured Output
Parse incomplete JSON as it streams for real-time structured data.
interface PartialJSON {
isComplete: boolean;
data: any;
lastValidJSON: string;
}
class PartialJSONParser {
parse(stream: string): PartialJSON {
let currentBuffer = stream.trim();
let lastValidJSON = '';
try {
// Try to parse as-is
const data = JSON.parse(currentBuffer);
return { isComplete: true, data, lastValidJSON: currentBuffer };
} catch {
// Incomplete JSON, find last valid structure
for (let i = currentBuffer.length - 1; i >= 0; i--) {
try {
const candidate = currentBuffer.substring(0, i) + '}';
const data = JSON.parse(candidate);
return {
isComplete: false,
data,
lastValidJSON: candidate
};
} catch {
// Continue searching
}
}
}
return { isComplete: false, data: {}, lastValidJSON: '' };
}
streamStructuredOutput(
stream: AsyncIterable<string>,
onPartial: (data: any) => void
): Promise<any> {
return new Promise(async (resolve, reject) => {
let buffer = '';
try {
for await (const token of stream) {
buffer += token;
// Try parsing every 10 tokens
if (buffer.length > 50) {
const result = this.parse(buffer);
if (result.data && Object.keys(result.data).length > 0) {
onPartial(result.data);
}
}
}
// Final parse
const result = this.parse(buffer);
resolve(result.data);
} catch (error) {
reject(error);
}
});
}
}
Stream Cancellation on Disconnect
Properly abort LLM requests when client disconnects.
class CancellableStreamHandler {
private activeStreams: Map<string, AbortController> = new Map();
async streamWithCancellation(
clientId: string,
prompt: string,
res: Response,
handler: OpenAIStreamHandler
): Promise<void> {
const controller = new AbortController();
this.activeStreams.set(clientId, controller);
// Cancel when client disconnects
res.on('close', () => {
console.log(`Cancelling stream for ${clientId}`);
controller.abort();
this.activeStreams.delete(clientId);
});
try {
// Note: OpenAI SDK doesn't currently expose abort in streaming,
// so we use a wrapper
const generator = handler.streamCompletionIterator(prompt);
for await (const token of generator) {
if (controller.signal.aborted) {
console.log(`Stream aborted for ${clientId}`);
break;
}
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
res.write('data: {"done":true}\n\n');
res.end();
} catch (error) {
if ((error as Error).name === 'AbortError') {
console.log(`Stream cancelled: ${clientId}`);
} else {
res.write(`data: ${JSON.stringify({ error: (error as Error).message })}\n\n`);
}
res.end();
} finally {
this.activeStreams.delete(clientId);
}
}
}
Streaming with Tool Calls
Handle tool/function calls that appear mid-stream.
interface ToolCall {
id: string;
name: string;
arguments: Record<string, any>;
}
class StreamingToolCallHandler {
async streamWithToolCalls(
prompt: string,
tools: Array<{ name: string; description: string }>,
handler: OpenAIStreamHandler
): Promise<string | ToolCall> {
let buffer = '';
let toolCallBuffer: Partial<ToolCall> = {};
// In production, would use actual streaming API with tool_calls
const client = new OpenAI();
const stream = await client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
tools: tools.map(t => ({
type: 'function' as const,
function: { name: t.name, description: t.description }
})),
stream: true
});
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.tool_calls) {
const toolCall = chunk.choices[0].delta.tool_calls[0];
if (toolCall?.function?.name) {
toolCallBuffer.name = toolCall.function.name;
}
if (toolCall?.function?.arguments) {
toolCallBuffer.arguments = JSON.parse(
toolCall.function.arguments || '{}'
);
}
}
if (chunk.choices[0]?.delta?.content) {
buffer += chunk.choices[0].delta.content;
}
}
if (toolCallBuffer.name) {
return {
id: crypto.randomUUID(),
name: toolCallBuffer.name,
arguments: toolCallBuffer.arguments || {}
};
}
return buffer;
}
}
Abort Signal Propagation
Propagate cancellation through entire request chain.
class AbortSignalPropagator {
async streamWithAbort(
prompt: string,
abortSignal: AbortSignal
): Promise<string> {
const client = new OpenAI({
httpClient: {
request: async (request, options) => {
// Propagate abort signal to underlying HTTP request
if (abortSignal.aborted) {
throw new Error('Request aborted');
}
abortSignal.addEventListener('abort', () => {
request.abort?.();
});
return fetch(request.url, {
...options,
signal: abortSignal
} as RequestInit);
}
}
});
let result = '';
try {
const stream = await client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
if (abortSignal.aborted) {
break;
}
if (chunk.choices[0]?.delta?.content) {
result += chunk.choices[0].delta.content;
}
}
} catch (error) {
if (abortSignal.aborted) {
console.log('Stream aborted by client');
} else {
throw error;
}
}
return result;
}
}
Latency Metrics: TTFT and TBT
Measure Time-To-First-Token and Token-Between-Time for UX optimization.
interface LatencyMetrics {
ttft_ms: number; // Time to first token
tbt_avg_ms: number; // Average time between tokens
tbt_p95_ms: number; // 95th percentile TBT
total_tokens: number;
total_duration_ms: number;
}
class LatencyMetricsCollector {
collect(stream: AsyncIterable<string>): Promise<LatencyMetrics> {
return new Promise(async (resolve, reject) => {
const startTime = Date.now();
let firstTokenTime = 0;
let lastTokenTime = startTime;
let tokenCount = 0;
const tokenIntervals: number[] = [];
try {
for await (const token of stream) {
const now = Date.now();
if (tokenCount === 0) {
firstTokenTime = now;
} else {
const interval = now - lastTokenTime;
tokenIntervals.push(interval);
}
lastTokenTime = now;
tokenCount++;
}
const ttft = firstTokenTime - startTime;
const avgTBT = tokenIntervals.length > 0
? tokenIntervals.reduce((a, b) => a + b, 0) / tokenIntervals.length
: 0;
const sortedIntervals = [...tokenIntervals].sort((a, b) => a - b);
const p95Index = Math.floor(sortedIntervals.length * 0.95);
const p95TBT = sortedIntervals[p95Index] || 0;
resolve({
ttft_ms: ttft,
tbt_avg_ms: avgTBT,
tbt_p95_ms: p95TBT,
total_tokens: tokenCount,
total_duration_ms: Date.now() - startTime
});
} catch (error) {
reject(error);
}
});
}
logMetrics(metrics: LatencyMetrics): void {
console.log(`
TTFT: ${metrics.ttft_ms}ms
Avg TBT: ${metrics.tbt_avg_ms.toFixed(2)}ms
P95 TBT: ${metrics.tbt_p95_ms.toFixed(2)}ms
Total: ${metrics.total_tokens} tokens in ${metrics.total_duration_ms}ms
`);
}
}
Checklist
- Always use async iterators for proper cleanup on errors
- Set
X-Accel-Buffering: noto disable proxy buffering for SSE - Implement backpressure handling to prevent OOM
- Handle client disconnects with AbortController
- Parse partial JSON for real-time structured output
- Measure TTFT (should be <500ms) and TBT (should be <100ms avg)
- Cancel upstream requests when client disconnects
- Test with slow network conditions and connection drops
- Use
setImmediate()to yield to event loop every ~10 tokens - Monitor queue sizes and add throttling if exceeding 100+ items
Conclusion
Streaming transforms UX from waiting for answers to watching them appear. The difference between 5 second latency and 0.5s perceived latency is streaming. Master backpressure, cancellation, and metrics to ship production-grade streaming.