Streaming LLM Responses — Server-Sent Events, Backpressure, and Error Handling

Sanjeev SharmaSanjeev Sharma
11 min read

Advertisement

Introduction

Streaming LLM responses dramatically improves perceived latency by sending tokens as they arrive. But streaming introduces complexity: managing backpressure when clients can't keep up, handling mid-stream failures, buffering for tool calls, and graceful cancellation. This post covers Server-Sent Events implementation, OpenAI streaming APIs, backpressure handling, error recovery, and production patterns.

Server-Sent Events in Node.js/Express

SSE is HTTP-based streaming for server→client updates. Simpler than WebSockets for one-way server broadcasts.

import { Express, Response, Request } from 'express';
import { EventEmitter } from 'events';

class SSEStream {
  private response: Response;
  private abortController: AbortController;
  private messageQueue: string[] = [];
  private isFlushing = false;
  private isConnected = true;

  constructor(response: Response) {
    this.response = response;
    this.abortController = new AbortController();

    // Set SSE headers
    this.response.setHeader('Content-Type', 'text/event-stream');
    this.response.setHeader('Cache-Control', 'no-cache');
    this.response.setHeader('Connection', 'keep-alive');
    this.response.setHeader('X-Accel-Buffering', 'no'); // Disable proxy buffering

    // Handle client disconnect
    this.response.on('close', () => {
      this.isConnected = false;
      this.abortController.abort();
    });

    this.response.on('error', (err) => {
      console.error('SSE response error:', err);
      this.isConnected = false;
      this.abortController.abort();
    });
  }

  async send(data: any, eventType: string = 'message'): Promise<boolean> {
    if (!this.isConnected) {
      return false;
    }

    const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
    this.messageQueue.push(message);

    // Flush asynchronously to avoid blocking
    if (!this.isFlushing) {
      this.flush();
    }

    return true;
  }

  private async flush(): Promise<void> {
    if (this.isFlushing || this.messageQueue.length === 0) {
      return;
    }

    this.isFlushing = true;

    try {
      while (this.messageQueue.length > 0 && this.isConnected) {
        const message = this.messageQueue.shift()!;

        // Check if write would block (backpressure)
        const canContinue = this.response.write(message);

        if (!canContinue) {
          // Drain event: wait for writable before continuing
          await new Promise<void>((resolve, reject) => {
            const timeout = setTimeout(() => {
              reject(new Error('SSE write timeout'));
            }, 5000);

            this.response.once('drain', () => {
              clearTimeout(timeout);
              resolve();
            });

            this.response.once('error', () => {
              clearTimeout(timeout);
              reject(new Error('SSE response error'));
            });
          });
        }
      }
    } catch (error) {
      console.error('SSE flush error:', error);
      this.isConnected = false;
    } finally {
      this.isFlushing = false;

      // Continue flushing if messages arrived during flush
      if (this.messageQueue.length > 0 && this.isConnected) {
        this.flush();
      }
    }
  }

  async close(reason: string = 'Stream closed'): Promise<void> {
    if (this.isConnected) {
      await this.send({ type: 'done', reason }, 'close');
      this.response.end();
    }
    this.isConnected = false;
  }

  getAbortSignal(): AbortSignal {
    return this.abortController.signal;
  }

  isAlive(): boolean {
    return this.isConnected;
  }
}

// Express route handler
app.post('/api/stream', async (req: Request, res: Response) => {
  const stream = new SSEStream(res);
  const { prompt, model } = req.body;

  try {
    // Start streaming LLM response
    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const completion = await openai.chat.completions.create({
      model,
      messages: [{ role: 'user', content: prompt }],
      stream: true,
      signal: stream.getAbortSignal(), // Cancel if client disconnects
    });

    let tokenCount = 0;

    for await (const chunk of completion) {
      if (!stream.isAlive()) {
        break;
      }

      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        tokenCount++;

        // Send token with metadata
        const sent = await stream.send({
          type: 'token',
          content,
          tokenCount,
          timestamp: Date.now(),
        });

        if (!sent) {
          console.warn('Failed to send token to client');
          break;
        }
      }
    }

    await stream.close(`Stream completed with ${tokenCount} tokens`);
  } catch (error) {
    console.error('Stream error:', error);

    if (stream.isAlive()) {
      await stream.send(
        {
          type: 'error',
          message: error instanceof Error ? error.message : 'Unknown error',
        },
        'error'
      );
      await stream.close('Stream closed due to error');
    }
  }
});

OpenAI Streaming API Integration

Handle OpenAI's streaming response format with error recovery.

interface StreamConfig {
  model: string;
  temperature: number;
  maxTokens: number;
  topP: number;
  frequencyPenalty?: number;
  presencePenalty?: number;
}

class OpenAIStreamer {
  private openai: any;

  constructor(apiKey: string) {
    this.openai = new OpenAI({ apiKey });
  }

  async *streamCompletion(
    messages: Array<{ role: string; content: string }>,
    config: StreamConfig,
    signal?: AbortSignal
  ): AsyncGenerator<{ type: string; content?: string; error?: string; metadata?: any }> {
    try {
      const stream = await this.openai.chat.completions.create({
        model: config.model,
        messages,
        temperature: config.temperature,
        max_tokens: config.maxTokens,
        top_p: config.topP,
        stream: true,
        signal,
      });

      let tokenCount = 0;
      let buffer = '';

      for await (const chunk of stream) {
        // Handle finish reason (model completed response)
        if (chunk.choices[0]?.finish_reason) {
          yield {
            type: 'finish',
            metadata: {
              reason: chunk.choices[0].finish_reason,
              tokenCount,
            },
          };
          return;
        }

        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          tokenCount++;
          buffer += content;

          yield {
            type: 'token',
            content,
            metadata: { tokenCount },
          };
        }
      }
    } catch (error) {
      if (error instanceof OpenAI.APIError) {
        const message = `OpenAI API Error ${error.status}: ${error.message}`;
        yield { type: 'error', error: message };
      } else if (error instanceof Error && error.message === 'This operation has been cancelled') {
        yield { type: 'cancelled', error: 'Stream cancelled by client' };
      } else {
        yield { type: 'error', error: error instanceof Error ? error.message : 'Unknown error' };
      }
    }
  }
}

Handling Mid-Stream Errors

Errors mid-stream require special handling: send error token, offer retry option.

interface StreamContext {
  requestId: string;
  startTime: number;
  tokens: string[];
  lastSuccessfulToken: number;
  retryCount: number;
}

class MidStreamErrorHandler {
  async handleStreamError(
    error: Error,
    context: StreamContext,
    recoveryFn?: () => Promise<AsyncGenerator<any>>
  ): Promise<{ recovered: boolean; recovery?: AsyncGenerator<any> }> {
    console.error(`Stream error at token ${context.lastSuccessfulToken}:`, error.message);

    // Categorize error
    const isRetryable = this.isRetryableError(error);
    const canRecover = isRetryable && context.retryCount < 3 && recoveryFn !== undefined;

    if (canRecover) {
      console.log(`Attempting recovery (attempt ${context.retryCount + 1}/3)...`);
      context.retryCount++;

      try {
        const recovered = await recoveryFn!();
        return { recovered: true, recovery: recovered };
      } catch (recoveryError) {
        console.error('Recovery failed:', recoveryError);
        return { recovered: false };
      }
    }

    return { recovered: false };
  }

  private isRetryableError(error: Error): boolean {
    const retryableMessages = [
      'temporarily unavailable',
      'timeout',
      'connection reset',
      '429', // Rate limit
      '500', // Server error
      '502', // Bad gateway
      '503', // Service unavailable
    ];

    return retryableMessages.some(msg => error.message.toLowerCase().includes(msg.toLowerCase()));
  }
}

Content Buffering for Tool Calls

Some LLM responses include tool calls. Buffer content until tool execution completes.

interface BufferedStreamChunk {
  type: 'text' | 'tool_call' | 'tool_result';
  content: string;
  toolName?: string;
  toolInput?: Record<string, any>;
  toolCallId?: string;
}

class BufferedStreamProcessor {
  private buffer: BufferedStreamChunk[] = [];
  private currentToolCall: { id: string; name: string; input: string } | null = null;

  processChunk(chunk: any): BufferedStreamChunk | null {
    // If this is a tool call start, buffer it
    if (chunk.type === 'tool_call_start') {
      this.currentToolCall = {
        id: chunk.toolCallId,
        name: chunk.toolName,
        input: '',
      };
      return null; // Don't emit yet
    }

    // If we're building a tool call, buffer the input
    if (this.currentToolCall && chunk.type === 'tool_call_input') {
      this.currentToolCall.input += chunk.content;
      return null; // Don't emit yet
    }

    // Tool call complete: emit it
    if (chunk.type === 'tool_call_end' && this.currentToolCall) {
      const buffered: BufferedStreamChunk = {
        type: 'tool_call',
        content: this.currentToolCall.input,
        toolName: this.currentToolCall.name,
        toolCallId: this.currentToolCall.id,
        toolInput: JSON.parse(this.currentToolCall.input),
      };

      this.currentToolCall = null;
      return buffered;
    }

    // Regular text: emit immediately
    if (chunk.type === 'text') {
      return {
        type: 'text',
        content: chunk.content,
      };
    }

    return null;
  }

  flush(): BufferedStreamChunk[] {
    const remaining = this.buffer.splice(0);

    if (this.currentToolCall) {
      // Incomplete tool call at end of stream
      remaining.push({
        type: 'tool_call',
        content: this.currentToolCall.input,
        toolName: this.currentToolCall.name,
        toolCallId: this.currentToolCall.id,
      });

      this.currentToolCall = null;
    }

    return remaining;
  }
}

Client Reconnection and Resumption

Handle client reconnects by tracking progress server-side.

interface StreamSession {
  sessionId: string;
  userId: string;
  createdAt: number;
  lastActivityAt: number;
  tokensGenerated: string[];
  status: 'active' | 'paused' | 'completed' | 'failed';
  error?: string;
}

class StreamSessionManager {
  private sessions: Map<string, StreamSession> = new Map();
  private sessionTimeout = 600000; // 10 minutes

  createSession(userId: string): string {
    const sessionId = `stream_${userId}_${Date.now()}`;

    this.sessions.set(sessionId, {
      sessionId,
      userId,
      createdAt: Date.now(),
      lastActivityAt: Date.now(),
      tokensGenerated: [],
      status: 'active',
    });

    return sessionId;
  }

  resumeStream(sessionId: string): { resumed: boolean; session?: StreamSession } {
    const session = this.sessions.get(sessionId);

    if (!session) {
      return { resumed: false };
    }

    // Check if session expired
    if (Date.now() - session.createdAt > this.sessionTimeout) {
      this.sessions.delete(sessionId);
      return { resumed: false };
    }

    // Resume
    session.lastActivityAt = Date.now();
    session.status = 'active';

    return { resumed: true, session };
  }

  recordToken(sessionId: string, token: string): boolean {
    const session = this.sessions.get(sessionId);

    if (!session || session.status !== 'active') {
      return false;
    }

    session.tokensGenerated.push(token);
    session.lastActivityAt = Date.now();

    return true;
  }

  completeStream(sessionId: string): void {
    const session = this.sessions.get(sessionId);

    if (session) {
      session.status = 'completed';
      session.lastActivityAt = Date.now();

      // Delete after 1 hour
      setTimeout(() => this.sessions.delete(sessionId), 3600000);
    }
  }

  failStream(sessionId: string, error: string): void {
    const session = this.sessions.get(sessionId);

    if (session) {
      session.status = 'failed';
      session.error = error;
      session.lastActivityAt = Date.now();
    }
  }
}

// Client-side reconnect logic
async function streamWithResumeCapability(
  prompt: string,
  sessionId?: string
): Promise<string> {
  const response = await fetch('/api/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt, sessionId }),
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  let buffer = '';

  try {
    while (reader) {
      const { done, value } = await reader.read();

      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');

      for (let i = 0; i < lines.length - 1; i++) {
        const line = lines[i];

        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6));

          if (data.type === 'error') {
            throw new Error(data.message);
          } else if (data.type === 'token') {
            // Update UI with token
            console.log(data.content);
          }
        }
      }

      buffer = lines[lines.length - 1];
    }
  } catch (error) {
    console.error('Stream error:', error);

    // Attempt reconnect with sessionId
    if (sessionId) {
      console.log('Reconnecting to stream...');
      return streamWithResumeCapability(prompt, sessionId);
    }

    throw error;
  }

  return 'Stream completed';
}

TTFB Optimization

Time To First Byte is critical for perceived performance. Minimize latency to first token.

interface TTFBMetrics {
  requestTime: number;
  dnsTime: number;
  tcpConnectTime: number;
  tlsTime: number;
  firstTokenTime: number;
  p50TTFB: number;
  p95TTFB: number;
  p99TTFB: number;
}

class TTFBOptimizer {
  private measurements: number[] = [];

  async optimizeTTFB(
    prompt: string,
    model: string
  ): Promise<{
    firstToken: string;
    ttfbMs: number;
  }> {
    const requestStart = performance.now();

    // 1. Parallelize input processing
    const [normalizedPrompt, embeddingTask] = await Promise.all([
      this.normalizePrompt(prompt),
      this.preComputeEmbedding(prompt), // Optional: use for re-ranking
    ]);

    // 2. Stream response immediately, don't wait for full computation
    let firstTokenReceived = false;
    let firstToken = '';
    const streamStart = performance.now();

    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const stream = await openai.chat.completions.create({
      model,
      messages: [{ role: 'user', content: normalizedPrompt }],
      stream: true,
    });

    for await (const chunk of stream) {
      if (!firstTokenReceived && chunk.choices[0]?.delta?.content) {
        firstTokenReceived = true;
        firstToken = chunk.choices[0].delta.content;

        const ttfbMs = performance.now() - streamStart;
        this.measurements.push(ttfbMs);

        return {
          firstToken,
          ttfbMs,
        };
      }
    }

    throw new Error('No tokens received from LLM');
  }

  private async normalizePrompt(prompt: string): Promise<string> {
    // Lightweight preprocessing
    return prompt.trim();
  }

  private async preComputeEmbedding(prompt: string): Promise<number[]> {
    // Optional: pre-compute embedding for re-ranking candidates
    return [];
  }

  getTTFBStats(): { p50: number; p95: number; p99: number } {
    if (this.measurements.length === 0) {
      return { p50: 0, p95: 0, p99: 0 };
    }

    const sorted = [...this.measurements].sort((a, b) => a - b);

    return {
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
    };
  }
}

Abort Controller Pattern

Graceful cancellation when client disconnects or timeout occurs.

class StreamAbortManager {
  private abortControllers: Map<string, AbortController> = new Map();
  private timeouts: Map<string, NodeJS.Timeout> = new Map();

  createStream(streamId: string, timeoutMs: number = 300000): AbortController {
    const controller = new AbortController();
    this.abortControllers.set(streamId, controller);

    // Auto-abort after timeout
    const timeout = setTimeout(() => {
      console.log(`Stream ${streamId} timeout after ${timeoutMs}ms`);
      controller.abort();
    }, timeoutMs);

    this.timeouts.set(streamId, timeout);

    return controller;
  }

  abort(streamId: string, reason: string = 'Manual abort'): void {
    const controller = this.abortControllers.get(streamId);

    if (controller) {
      console.log(`Aborting stream ${streamId}: ${reason}`);
      controller.abort();
    }

    this.cleanup(streamId);
  }

  cleanup(streamId: string): void {
    this.abortControllers.delete(streamId);

    const timeout = this.timeouts.get(streamId);
    if (timeout) {
      clearTimeout(timeout);
      this.timeouts.delete(streamId);
    }
  }
}

// Usage
const abortManager = new StreamAbortManager();

app.post('/api/stream', async (req: Request, res: Response) => {
  const streamId = `stream_${Date.now()}`;
  const controller = abortManager.createStream(streamId, 300000); // 5 min timeout

  try {
    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    const stream = await openai.chat.completions.create({
      model: 'gpt-4-turbo-preview',
      messages: req.body.messages,
      stream: true,
      signal: controller.signal,
    });

    for await (const chunk of stream) {
      res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    }

    res.end();
  } catch (error) {
    if (error instanceof Error && error.name === 'AbortError') {
      res.write(`data: ${JSON.stringify({ type: 'abort' })}\n\n`);
    } else {
      res.write(`data: ${JSON.stringify({ type: 'error', message: error })}\n\n`);
    }
    res.end();
  } finally {
    abortManager.cleanup(streamId);
  }
});

Streaming LLM Implementation Checklist

  • Implement SSE with proper headers and connection handling
  • Handle client disconnect (close event) to stop streaming
  • Implement backpressure: check write() return value
  • Use drain event when write returns false
  • Stream directly from OpenAI API without buffering
  • Handle mid-stream errors with recovery logic
  • Buffer tool calls until execution completes
  • Implement session tracking for reconnections
  • Minimize TTFB with parallel preprocessing
  • Use AbortController for graceful cancellation
  • Set and enforce stream timeouts
  • Monitor TTFB percentiles (p50/p95/p99)

Conclusion

Streaming transforms LLM UX by showing progress instantly. Production implementations require handling backpressure, buffering for tool calls, recovering from mid-stream errors, supporting reconnection, and minimizing TTFB. Master these patterns and your LLM API will feel as responsive as a local model.

Advertisement

Sanjeev Sharma

Written by

Sanjeev Sharma

Full Stack Engineer · E-mopro