Published on

Streaming LLM Responses — Server-Sent Events, Backpressure, and Error Handling

Authors

Introduction

Streaming LLM responses dramatically improves perceived latency by sending tokens as they arrive. But streaming introduces complexity: managing backpressure when clients can't keep up, handling mid-stream failures, buffering for tool calls, and graceful cancellation. This post covers Server-Sent Events implementation, OpenAI streaming APIs, backpressure handling, error recovery, and production patterns.

Server-Sent Events in Node.js/Express

SSE is HTTP-based streaming for server→client updates. Simpler than WebSockets for one-way server broadcasts.

import { Express, Response, Request } from 'express';
import { EventEmitter } from 'events';

class SSEStream {
  private response: Response;
  private abortController: AbortController;
  private messageQueue: string[] = [];
  private isFlushing = false;
  private isConnected = true;

  constructor(response: Response) {
    this.response = response;
    this.abortController = new AbortController();

    // Set SSE headers
    this.response.setHeader('Content-Type', 'text/event-stream');
    this.response.setHeader('Cache-Control', 'no-cache');
    this.response.setHeader('Connection', 'keep-alive');
    this.response.setHeader('X-Accel-Buffering', 'no'); // Disable proxy buffering

    // Handle client disconnect
    this.response.on('close', () => {
      this.isConnected = false;
      this.abortController.abort();
    });

    this.response.on('error', (err) => {
      console.error('SSE response error:', err);
      this.isConnected = false;
      this.abortController.abort();
    });
  }

  async send(data: any, eventType: string = 'message'): Promise<boolean> {
    if (!this.isConnected) {
      return false;
    }

    const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
    this.messageQueue.push(message);

    // Flush asynchronously to avoid blocking
    if (!this.isFlushing) {
      this.flush();
    }

    return true;
  }

  private async flush(): Promise<void> {
    if (this.isFlushing || this.messageQueue.length === 0) {
      return;
    }

    this.isFlushing = true;

    try {
      while (this.messageQueue.length > 0 && this.isConnected) {
        const message = this.messageQueue.shift()!;

        // Check if write would block (backpressure)
        const canContinue = this.response.write(message);

        if (!canContinue) {
          // Drain event: wait for writable before continuing
          await new Promise<void>((resolve, reject) => {
            const timeout = setTimeout(() => {
              reject(new Error('SSE write timeout'));
            }, 5000);

            this.response.once('drain', () => {
              clearTimeout(timeout);
              resolve();
            });

            this.response.once('error', () => {
              clearTimeout(timeout);
              reject(new Error('SSE response error'));
            });
          });
        }
      }
    } catch (error) {
      console.error('SSE flush error:', error);
      this.isConnected = false;
    } finally {
      this.isFlushing = false;

      // Continue flushing if messages arrived during flush
      if (this.messageQueue.length > 0 && this.isConnected) {
        this.flush();
      }
    }
  }

  async close(reason: string = 'Stream closed'): Promise<void> {
    if (this.isConnected) {
      await this.send({ type: 'done', reason }, 'close');
      this.response.end();
    }
    this.isConnected = false;
  }

  getAbortSignal(): AbortSignal {
    return this.abortController.signal;
  }

  isAlive(): boolean {
    return this.isConnected;
  }
}

// Express route handler
app.post('/api/stream', async (req: Request, res: Response) => {
  const stream = new SSEStream(res);
  const { prompt, model } = req.body;

  try {
    // Start streaming LLM response
    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const completion = await openai.chat.completions.create({
      model,
      messages: [{ role: 'user', content: prompt }],
      stream: true,
      signal: stream.getAbortSignal(), // Cancel if client disconnects
    });

    let tokenCount = 0;

    for await (const chunk of completion) {
      if (!stream.isAlive()) {
        break;
      }

      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        tokenCount++;

        // Send token with metadata
        const sent = await stream.send({
          type: 'token',
          content,
          tokenCount,
          timestamp: Date.now(),
        });

        if (!sent) {
          console.warn('Failed to send token to client');
          break;
        }
      }
    }

    await stream.close(`Stream completed with ${tokenCount} tokens`);
  } catch (error) {
    console.error('Stream error:', error);

    if (stream.isAlive()) {
      await stream.send(
        {
          type: 'error',
          message: error instanceof Error ? error.message : 'Unknown error',
        },
        'error'
      );
      await stream.close('Stream closed due to error');
    }
  }
});

OpenAI Streaming API Integration

Handle OpenAI's streaming response format with error recovery.

interface StreamConfig {
  model: string;
  temperature: number;
  maxTokens: number;
  topP: number;
  frequencyPenalty?: number;
  presencePenalty?: number;
}

class OpenAIStreamer {
  private openai: any;

  constructor(apiKey: string) {
    this.openai = new OpenAI({ apiKey });
  }

  async *streamCompletion(
    messages: Array<{ role: string; content: string }>,
    config: StreamConfig,
    signal?: AbortSignal
  ): AsyncGenerator<{ type: string; content?: string; error?: string; metadata?: any }> {
    try {
      const stream = await this.openai.chat.completions.create({
        model: config.model,
        messages,
        temperature: config.temperature,
        max_tokens: config.maxTokens,
        top_p: config.topP,
        stream: true,
        signal,
      });

      let tokenCount = 0;
      let buffer = '';

      for await (const chunk of stream) {
        // Handle finish reason (model completed response)
        if (chunk.choices[0]?.finish_reason) {
          yield {
            type: 'finish',
            metadata: {
              reason: chunk.choices[0].finish_reason,
              tokenCount,
            },
          };
          return;
        }

        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          tokenCount++;
          buffer += content;

          yield {
            type: 'token',
            content,
            metadata: { tokenCount },
          };
        }
      }
    } catch (error) {
      if (error instanceof OpenAI.APIError) {
        const message = `OpenAI API Error ${error.status}: ${error.message}`;
        yield { type: 'error', error: message };
      } else if (error instanceof Error && error.message === 'This operation has been cancelled') {
        yield { type: 'cancelled', error: 'Stream cancelled by client' };
      } else {
        yield { type: 'error', error: error instanceof Error ? error.message : 'Unknown error' };
      }
    }
  }
}

Handling Mid-Stream Errors

Errors mid-stream require special handling: send error token, offer retry option.

interface StreamContext {
  requestId: string;
  startTime: number;
  tokens: string[];
  lastSuccessfulToken: number;
  retryCount: number;
}

class MidStreamErrorHandler {
  async handleStreamError(
    error: Error,
    context: StreamContext,
    recoveryFn?: () => Promise<AsyncGenerator<any>>
  ): Promise<{ recovered: boolean; recovery?: AsyncGenerator<any> }> {
    console.error(`Stream error at token ${context.lastSuccessfulToken}:`, error.message);

    // Categorize error
    const isRetryable = this.isRetryableError(error);
    const canRecover = isRetryable && context.retryCount < 3 && recoveryFn !== undefined;

    if (canRecover) {
      console.log(`Attempting recovery (attempt ${context.retryCount + 1}/3)...`);
      context.retryCount++;

      try {
        const recovered = await recoveryFn!();
        return { recovered: true, recovery: recovered };
      } catch (recoveryError) {
        console.error('Recovery failed:', recoveryError);
        return { recovered: false };
      }
    }

    return { recovered: false };
  }

  private isRetryableError(error: Error): boolean {
    const retryableMessages = [
      'temporarily unavailable',
      'timeout',
      'connection reset',
      '429', // Rate limit
      '500', // Server error
      '502', // Bad gateway
      '503', // Service unavailable
    ];

    return retryableMessages.some(msg => error.message.toLowerCase().includes(msg.toLowerCase()));
  }
}

Content Buffering for Tool Calls

Some LLM responses include tool calls. Buffer content until tool execution completes.

interface BufferedStreamChunk {
  type: 'text' | 'tool_call' | 'tool_result';
  content: string;
  toolName?: string;
  toolInput?: Record<string, any>;
  toolCallId?: string;
}

class BufferedStreamProcessor {
  private buffer: BufferedStreamChunk[] = [];
  private currentToolCall: { id: string; name: string; input: string } | null = null;

  processChunk(chunk: any): BufferedStreamChunk | null {
    // If this is a tool call start, buffer it
    if (chunk.type === 'tool_call_start') {
      this.currentToolCall = {
        id: chunk.toolCallId,
        name: chunk.toolName,
        input: '',
      };
      return null; // Don't emit yet
    }

    // If we're building a tool call, buffer the input
    if (this.currentToolCall && chunk.type === 'tool_call_input') {
      this.currentToolCall.input += chunk.content;
      return null; // Don't emit yet
    }

    // Tool call complete: emit it
    if (chunk.type === 'tool_call_end' && this.currentToolCall) {
      const buffered: BufferedStreamChunk = {
        type: 'tool_call',
        content: this.currentToolCall.input,
        toolName: this.currentToolCall.name,
        toolCallId: this.currentToolCall.id,
        toolInput: JSON.parse(this.currentToolCall.input),
      };

      this.currentToolCall = null;
      return buffered;
    }

    // Regular text: emit immediately
    if (chunk.type === 'text') {
      return {
        type: 'text',
        content: chunk.content,
      };
    }

    return null;
  }

  flush(): BufferedStreamChunk[] {
    const remaining = this.buffer.splice(0);

    if (this.currentToolCall) {
      // Incomplete tool call at end of stream
      remaining.push({
        type: 'tool_call',
        content: this.currentToolCall.input,
        toolName: this.currentToolCall.name,
        toolCallId: this.currentToolCall.id,
      });

      this.currentToolCall = null;
    }

    return remaining;
  }
}

Client Reconnection and Resumption

Handle client reconnects by tracking progress server-side.

interface StreamSession {
  sessionId: string;
  userId: string;
  createdAt: number;
  lastActivityAt: number;
  tokensGenerated: string[];
  status: 'active' | 'paused' | 'completed' | 'failed';
  error?: string;
}

class StreamSessionManager {
  private sessions: Map<string, StreamSession> = new Map();
  private sessionTimeout = 600000; // 10 minutes

  createSession(userId: string): string {
    const sessionId = `stream_${userId}_${Date.now()}`;

    this.sessions.set(sessionId, {
      sessionId,
      userId,
      createdAt: Date.now(),
      lastActivityAt: Date.now(),
      tokensGenerated: [],
      status: 'active',
    });

    return sessionId;
  }

  resumeStream(sessionId: string): { resumed: boolean; session?: StreamSession } {
    const session = this.sessions.get(sessionId);

    if (!session) {
      return { resumed: false };
    }

    // Check if session expired
    if (Date.now() - session.createdAt > this.sessionTimeout) {
      this.sessions.delete(sessionId);
      return { resumed: false };
    }

    // Resume
    session.lastActivityAt = Date.now();
    session.status = 'active';

    return { resumed: true, session };
  }

  recordToken(sessionId: string, token: string): boolean {
    const session = this.sessions.get(sessionId);

    if (!session || session.status !== 'active') {
      return false;
    }

    session.tokensGenerated.push(token);
    session.lastActivityAt = Date.now();

    return true;
  }

  completeStream(sessionId: string): void {
    const session = this.sessions.get(sessionId);

    if (session) {
      session.status = 'completed';
      session.lastActivityAt = Date.now();

      // Delete after 1 hour
      setTimeout(() => this.sessions.delete(sessionId), 3600000);
    }
  }

  failStream(sessionId: string, error: string): void {
    const session = this.sessions.get(sessionId);

    if (session) {
      session.status = 'failed';
      session.error = error;
      session.lastActivityAt = Date.now();
    }
  }
}

// Client-side reconnect logic
async function streamWithResumeCapability(
  prompt: string,
  sessionId?: string
): Promise<string> {
  const response = await fetch('/api/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt, sessionId }),
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  let buffer = '';

  try {
    while (reader) {
      const { done, value } = await reader.read();

      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');

      for (let i = 0; i < lines.length - 1; i++) {
        const line = lines[i];

        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6));

          if (data.type === 'error') {
            throw new Error(data.message);
          } else if (data.type === 'token') {
            // Update UI with token
            console.log(data.content);
          }
        }
      }

      buffer = lines[lines.length - 1];
    }
  } catch (error) {
    console.error('Stream error:', error);

    // Attempt reconnect with sessionId
    if (sessionId) {
      console.log('Reconnecting to stream...');
      return streamWithResumeCapability(prompt, sessionId);
    }

    throw error;
  }

  return 'Stream completed';
}

TTFB Optimization

Time To First Byte is critical for perceived performance. Minimize latency to first token.

interface TTFBMetrics {
  requestTime: number;
  dnsTime: number;
  tcpConnectTime: number;
  tlsTime: number;
  firstTokenTime: number;
  p50TTFB: number;
  p95TTFB: number;
  p99TTFB: number;
}

class TTFBOptimizer {
  private measurements: number[] = [];

  async optimizeTTFB(
    prompt: string,
    model: string
  ): Promise<{
    firstToken: string;
    ttfbMs: number;
  }> {
    const requestStart = performance.now();

    // 1. Parallelize input processing
    const [normalizedPrompt, embeddingTask] = await Promise.all([
      this.normalizePrompt(prompt),
      this.preComputeEmbedding(prompt), // Optional: use for re-ranking
    ]);

    // 2. Stream response immediately, don't wait for full computation
    let firstTokenReceived = false;
    let firstToken = '';
    const streamStart = performance.now();

    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const stream = await openai.chat.completions.create({
      model,
      messages: [{ role: 'user', content: normalizedPrompt }],
      stream: true,
    });

    for await (const chunk of stream) {
      if (!firstTokenReceived && chunk.choices[0]?.delta?.content) {
        firstTokenReceived = true;
        firstToken = chunk.choices[0].delta.content;

        const ttfbMs = performance.now() - streamStart;
        this.measurements.push(ttfbMs);

        return {
          firstToken,
          ttfbMs,
        };
      }
    }

    throw new Error('No tokens received from LLM');
  }

  private async normalizePrompt(prompt: string): Promise<string> {
    // Lightweight preprocessing
    return prompt.trim();
  }

  private async preComputeEmbedding(prompt: string): Promise<number[]> {
    // Optional: pre-compute embedding for re-ranking candidates
    return [];
  }

  getTTFBStats(): { p50: number; p95: number; p99: number } {
    if (this.measurements.length === 0) {
      return { p50: 0, p95: 0, p99: 0 };
    }

    const sorted = [...this.measurements].sort((a, b) => a - b);

    return {
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
    };
  }
}

Abort Controller Pattern

Graceful cancellation when client disconnects or timeout occurs.

class StreamAbortManager {
  private abortControllers: Map<string, AbortController> = new Map();
  private timeouts: Map<string, NodeJS.Timeout> = new Map();

  createStream(streamId: string, timeoutMs: number = 300000): AbortController {
    const controller = new AbortController();
    this.abortControllers.set(streamId, controller);

    // Auto-abort after timeout
    const timeout = setTimeout(() => {
      console.log(`Stream ${streamId} timeout after ${timeoutMs}ms`);
      controller.abort();
    }, timeoutMs);

    this.timeouts.set(streamId, timeout);

    return controller;
  }

  abort(streamId: string, reason: string = 'Manual abort'): void {
    const controller = this.abortControllers.get(streamId);

    if (controller) {
      console.log(`Aborting stream ${streamId}: ${reason}`);
      controller.abort();
    }

    this.cleanup(streamId);
  }

  cleanup(streamId: string): void {
    this.abortControllers.delete(streamId);

    const timeout = this.timeouts.get(streamId);
    if (timeout) {
      clearTimeout(timeout);
      this.timeouts.delete(streamId);
    }
  }
}

// Usage
const abortManager = new StreamAbortManager();

app.post('/api/stream', async (req: Request, res: Response) => {
  const streamId = `stream_${Date.now()}`;
  const controller = abortManager.createStream(streamId, 300000); // 5 min timeout

  try {
    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const stream = await openai.chat.completions.create({
      model: 'gpt-4-turbo-preview',
      messages: req.body.messages,
      stream: true,
      signal: controller.signal,
    });

    for await (const chunk of stream) {
      res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    }

    res.end();
  } catch (error) {
    if (error instanceof Error && error.name === 'AbortError') {
      res.write(`data: ${JSON.stringify({ type: 'abort' })}\n\n`);
    } else {
      res.write(`data: ${JSON.stringify({ type: 'error', message: error })}\n\n`);
    }
    res.end();
  } finally {
    abortManager.cleanup(streamId);
  }
});

Streaming LLM Implementation Checklist

  • Implement SSE with proper headers and connection handling
  • Handle client disconnect (close event) to stop streaming
  • Implement backpressure: check write() return value
  • Use drain event when write returns false
  • Stream directly from OpenAI API without buffering
  • Handle mid-stream errors with recovery logic
  • Buffer tool calls until execution completes
  • Implement session tracking for reconnections
  • Minimize TTFB with parallel preprocessing
  • Use AbortController for graceful cancellation
  • Set and enforce stream timeouts
  • Monitor TTFB percentiles (p50/p95/p99)

Conclusion

Streaming transforms LLM UX by showing progress instantly. Production implementations require handling backpressure, buffering for tool calls, recovering from mid-stream errors, supporting reconnection, and minimizing TTFB. Master these patterns and your LLM API will feel as responsive as a local model.