Published on

Streaming LLM Responses in Node.js — SSE, Backpressure, and UX Patterns

Authors

Introduction

Streaming LLM responses reduces perceived latency from 5+ seconds to 500ms+ first token. This guide covers Server-Sent Events, async iterators, backpressure handling, and metrics that matter.

OpenAI Streaming with Async Iterators

OpenAI's Node SDK returns async iterables for streaming. Proper handling prevents memory leaks.

import OpenAI from 'openai';

interface StreamMetrics {
  ttft_ms: number; // Time to first token
  tokens_received: number;
  total_duration_ms: number;
}

class OpenAIStreamHandler {
  private client = new OpenAI();

  async streamCompletion(
    prompt: string,
    onToken: (token: string) => void,
    onError: (error: Error) => void
  ): Promise<StreamMetrics> {
    const startTime = Date.now();
    let firstTokenTime: number | null = null;
    let tokenCount = 0;

    try {
      const stream = await this.client.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: prompt }],
        stream: true,
        max_tokens: 2000
      });

      // Async iterator automatically handles cleanup on break/error
      for await (const chunk of stream) {
        if (!firstTokenTime) {
          firstTokenTime = Date.now();
        }

        if (chunk.choices[0]?.delta?.content) {
          onToken(chunk.choices[0].delta.content);
          tokenCount++;
        }
      }

      return {
        ttft_ms: firstTokenTime ? firstTokenTime - startTime : 0,
        tokens_received: tokenCount,
        total_duration_ms: Date.now() - startTime
      };
    } catch (error) {
      onError(error as Error);
      throw error;
    }
  }

  async *streamCompletionIterator(prompt: string): AsyncGenerator<string> {
    const stream = await this.client.chat.completions.create({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: prompt }],
      stream: true
    });

    for await (const chunk of stream) {
      if (chunk.choices[0]?.delta?.content) {
        yield chunk.choices[0].delta.content;
      }
    }
  }
}

Server-Sent Events Endpoint

Expose streaming via SSE for browser clients. Handle disconnection gracefully.

import { Request, Response } from 'express';

class SSEStreamingEndpoint {
  setupSSERoute(app: any): void {
    app.get('/api/chat/stream', async (req: Request, res: Response) => {
      const prompt = req.query.q as string;
      const clientId = req.get('X-Client-ID') || 'unknown';

      // Set SSE headers
      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'X-Accel-Buffering': 'no' // Disable nginx buffering
      });

      const handler = new OpenAIStreamHandler();
      let tokenCount = 0;
      const startTime = Date.now();

      try {
        for await (const token of handler.streamCompletionIterator(prompt)) {
          if (res.closed || res.destroyed) {
            console.log(`Client ${clientId} disconnected`);
            break;
          }

          tokenCount++;
          res.write(
            `data: ${JSON.stringify({ token, index: tokenCount })}\n\n`
          );

          // Allow event loop to process
          if (tokenCount % 10 === 0) {
            await new Promise(resolve => setImmediate(resolve));
          }
        }

        const duration = Date.now() - startTime;
        res.write(
          `data: ${JSON.stringify({
            done: true,
            tokens: tokenCount,
            duration_ms: duration
          })}\n\n`
        );
        res.end();
      } catch (error) {
        res.write(
          `data: ${JSON.stringify({ error: (error as Error).message })}\n\n`
        );
        res.end();
      }
    });
  }
}

Backpressure Handling

Pause writing when buffers fill. Prevents memory bloat and OOM crashes.

class BackpressureStreamHandler {
  async streamWithBackpressure(
    prompt: string,
    res: Response,
    handler: OpenAIStreamHandler
  ): Promise<void> {
    const queue: string[] = [];
    let draining = false;

    const write = async (data: string): Promise<void> => {
      return new Promise((resolve, reject) => {
        const ok = res.write(data);
        if (!ok) {
          // Buffer is full, pause writing
          draining = true;
          res.once('drain', () => {
            draining = false;
            resolve();
          });
          res.once('error', reject);
        } else {
          resolve();
        }
      });
    };

    const processQueue = async (): Promise<void> => {
      while (queue.length > 0 && !draining) {
        const chunk = queue.shift()!;
        await write(chunk);
      }
    };

    try {
      for await (const token of handler.streamCompletionIterator(prompt)) {
        queue.push(
          `data: ${JSON.stringify({ token })}\n\n`
        );

        // Process queue if not draining
        if (!draining) {
          await processQueue();
        }

        // If queue is large, throttle
        if (queue.length > 100) {
          await new Promise(resolve => setTimeout(resolve, 10));
        }
      }

      // Drain remaining queue
      await processQueue();
      res.write('data: {"done":true}\n\n');
      res.end();
    } catch (error) {
      res.write(`data: ${JSON.stringify({ error: (error as Error).message })}\n\n`);
      res.end();
    }
  }
}

Partial JSON Parsing for Streaming Structured Output

Parse incomplete JSON as it streams for real-time structured data.

interface PartialJSON {
  isComplete: boolean;
  data: any;
  lastValidJSON: string;
}

class PartialJSONParser {
  parse(stream: string): PartialJSON {
    let currentBuffer = stream.trim();
    let lastValidJSON = '';

    try {
      // Try to parse as-is
      const data = JSON.parse(currentBuffer);
      return { isComplete: true, data, lastValidJSON: currentBuffer };
    } catch {
      // Incomplete JSON, find last valid structure
      for (let i = currentBuffer.length - 1; i >= 0; i--) {
        try {
          const candidate = currentBuffer.substring(0, i) + '}';
          const data = JSON.parse(candidate);
          return {
            isComplete: false,
            data,
            lastValidJSON: candidate
          };
        } catch {
          // Continue searching
        }
      }
    }

    return { isComplete: false, data: {}, lastValidJSON: '' };
  }

  streamStructuredOutput(
    stream: AsyncIterable<string>,
    onPartial: (data: any) => void
  ): Promise<any> {
    return new Promise(async (resolve, reject) => {
      let buffer = '';

      try {
        for await (const token of stream) {
          buffer += token;

          // Try parsing every 10 tokens
          if (buffer.length > 50) {
            const result = this.parse(buffer);
            if (result.data && Object.keys(result.data).length > 0) {
              onPartial(result.data);
            }
          }
        }

        // Final parse
        const result = this.parse(buffer);
        resolve(result.data);
      } catch (error) {
        reject(error);
      }
    });
  }
}

Stream Cancellation on Disconnect

Properly abort LLM requests when client disconnects.

class CancellableStreamHandler {
  private activeStreams: Map<string, AbortController> = new Map();

  async streamWithCancellation(
    clientId: string,
    prompt: string,
    res: Response,
    handler: OpenAIStreamHandler
  ): Promise<void> {
    const controller = new AbortController();
    this.activeStreams.set(clientId, controller);

    // Cancel when client disconnects
    res.on('close', () => {
      console.log(`Cancelling stream for ${clientId}`);
      controller.abort();
      this.activeStreams.delete(clientId);
    });

    try {
      // Note: OpenAI SDK doesn't currently expose abort in streaming,
      // so we use a wrapper
      const generator = handler.streamCompletionIterator(prompt);

      for await (const token of generator) {
        if (controller.signal.aborted) {
          console.log(`Stream aborted for ${clientId}`);
          break;
        }

        res.write(`data: ${JSON.stringify({ token })}\n\n`);
      }

      res.write('data: {"done":true}\n\n');
      res.end();
    } catch (error) {
      if ((error as Error).name === 'AbortError') {
        console.log(`Stream cancelled: ${clientId}`);
      } else {
        res.write(`data: ${JSON.stringify({ error: (error as Error).message })}\n\n`);
      }
      res.end();
    } finally {
      this.activeStreams.delete(clientId);
    }
  }
}

Streaming with Tool Calls

Handle tool/function calls that appear mid-stream.

interface ToolCall {
  id: string;
  name: string;
  arguments: Record<string, any>;
}

class StreamingToolCallHandler {
  async streamWithToolCalls(
    prompt: string,
    tools: Array<{ name: string; description: string }>,
    handler: OpenAIStreamHandler
  ): Promise<string | ToolCall> {
    let buffer = '';
    let toolCallBuffer: Partial<ToolCall> = {};

    // In production, would use actual streaming API with tool_calls
    const client = new OpenAI();
    const stream = await client.chat.completions.create({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: prompt }],
      tools: tools.map(t => ({
        type: 'function' as const,
        function: { name: t.name, description: t.description }
      })),
      stream: true
    });

    for await (const chunk of stream) {
      if (chunk.choices[0]?.delta?.tool_calls) {
        const toolCall = chunk.choices[0].delta.tool_calls[0];
        if (toolCall?.function?.name) {
          toolCallBuffer.name = toolCall.function.name;
        }
        if (toolCall?.function?.arguments) {
          toolCallBuffer.arguments = JSON.parse(
            toolCall.function.arguments || '{}'
          );
        }
      }

      if (chunk.choices[0]?.delta?.content) {
        buffer += chunk.choices[0].delta.content;
      }
    }

    if (toolCallBuffer.name) {
      return {
        id: crypto.randomUUID(),
        name: toolCallBuffer.name,
        arguments: toolCallBuffer.arguments || {}
      };
    }

    return buffer;
  }
}

Abort Signal Propagation

Propagate cancellation through entire request chain.

class AbortSignalPropagator {
  async streamWithAbort(
    prompt: string,
    abortSignal: AbortSignal
  ): Promise<string> {
    const client = new OpenAI({
      httpClient: {
        request: async (request, options) => {
          // Propagate abort signal to underlying HTTP request
          if (abortSignal.aborted) {
            throw new Error('Request aborted');
          }

          abortSignal.addEventListener('abort', () => {
            request.abort?.();
          });

          return fetch(request.url, {
            ...options,
            signal: abortSignal
          } as RequestInit);
        }
      }
    });

    let result = '';

    try {
      const stream = await client.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: prompt }],
        stream: true
      });

      for await (const chunk of stream) {
        if (abortSignal.aborted) {
          break;
        }

        if (chunk.choices[0]?.delta?.content) {
          result += chunk.choices[0].delta.content;
        }
      }
    } catch (error) {
      if (abortSignal.aborted) {
        console.log('Stream aborted by client');
      } else {
        throw error;
      }
    }

    return result;
  }
}

Latency Metrics: TTFT and TBT

Measure Time-To-First-Token and Token-Between-Time for UX optimization.

interface LatencyMetrics {
  ttft_ms: number; // Time to first token
  tbt_avg_ms: number; // Average time between tokens
  tbt_p95_ms: number; // 95th percentile TBT
  total_tokens: number;
  total_duration_ms: number;
}

class LatencyMetricsCollector {
  collect(stream: AsyncIterable<string>): Promise<LatencyMetrics> {
    return new Promise(async (resolve, reject) => {
      const startTime = Date.now();
      let firstTokenTime = 0;
      let lastTokenTime = startTime;
      let tokenCount = 0;
      const tokenIntervals: number[] = [];

      try {
        for await (const token of stream) {
          const now = Date.now();

          if (tokenCount === 0) {
            firstTokenTime = now;
          } else {
            const interval = now - lastTokenTime;
            tokenIntervals.push(interval);
          }

          lastTokenTime = now;
          tokenCount++;
        }

        const ttft = firstTokenTime - startTime;
        const avgTBT = tokenIntervals.length > 0
          ? tokenIntervals.reduce((a, b) => a + b, 0) / tokenIntervals.length
          : 0;

        const sortedIntervals = [...tokenIntervals].sort((a, b) => a - b);
        const p95Index = Math.floor(sortedIntervals.length * 0.95);
        const p95TBT = sortedIntervals[p95Index] || 0;

        resolve({
          ttft_ms: ttft,
          tbt_avg_ms: avgTBT,
          tbt_p95_ms: p95TBT,
          total_tokens: tokenCount,
          total_duration_ms: Date.now() - startTime
        });
      } catch (error) {
        reject(error);
      }
    });
  }

  logMetrics(metrics: LatencyMetrics): void {
    console.log(`
      TTFT: ${metrics.ttft_ms}ms
      Avg TBT: ${metrics.tbt_avg_ms.toFixed(2)}ms
      P95 TBT: ${metrics.tbt_p95_ms.toFixed(2)}ms
      Total: ${metrics.total_tokens} tokens in ${metrics.total_duration_ms}ms
    `);
  }
}

Checklist

  • Always use async iterators for proper cleanup on errors
  • Set X-Accel-Buffering: no to disable proxy buffering for SSE
  • Implement backpressure handling to prevent OOM
  • Handle client disconnects with AbortController
  • Parse partial JSON for real-time structured output
  • Measure TTFT (should be <500ms) and TBT (should be <100ms avg)
  • Cancel upstream requests when client disconnects
  • Test with slow network conditions and connection drops
  • Use setImmediate() to yield to event loop every ~10 tokens
  • Monitor queue sizes and add throttling if exceeding 100+ items

Conclusion

Streaming transforms UX from waiting for answers to watching them appear. The difference between 5 second latency and 0.5s perceived latency is streaming. Master backpressure, cancellation, and metrics to ship production-grade streaming.