Published on

Real-Time AI Streaming Architecture — SSE, WebSockets, and Chunked Responses at Scale

Authors
  • Name
    Twitter

Introduction

Users expect streaming responses from AI. Watching tokens arrive in real-time is better UX than waiting for a complete response. But streaming at scale introduces complexity: connection management, load balancing, backpressure, and distributed streaming across instances. This post covers production patterns.

SSE vs WebSockets for AI Streaming

Both work. SSE wins for AI.

Server-Sent Events (SSE): One-way communication from server to client. Built on HTTP. Browser support via EventSource API. Simpler to implement, handles reconnects automatically, works through proxies.

WebSockets: Two-way TCP connection. Lower latency. Requires upgrade handshake. More complex to scale.

For AI streaming, SSE is sufficient. Users don't send data back during streaming; they consume the response. SSE's simplicity wins.

// SSE endpoint
app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const stream = await openai.createChatCompletionStream({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: req.query.prompt }],
  });

  for await (const event of stream) {
    const delta = event.choices[0].delta.content || '';
    if (delta) {
      res.write(`data: ${JSON.stringify({ delta })}\n\n`);
    }
  }
  res.end();
});

// Client
const eventSource = new EventSource('/api/stream?prompt=hello');
eventSource.onmessage = (event) => {
  const { delta } = JSON.parse(event.data);
  console.log(delta);
};
eventSource.onerror = () => eventSource.close();

SSE automatically handles retries. If the connection drops, the browser reconnects with the Last-Event-ID header.

Implementing SSE in Express/Fastify/Hono

All frameworks support SSE. The pattern is consistent: set headers, write data, handle backpressure.

// Express
app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  // Don't buffer the entire response
  res.socket.setNoDelay(true);

  try {
    const stream = await openai.createChatCompletionStream({...});
    for await (const event of stream) {
      const data = event.choices[0].delta.content || '';
      res.write(`data: ${JSON.stringify({ data })}\n\n`);
    }
  } catch (err) {
    res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
  }
  res.end();
});

// Fastify
fastify.get('/api/stream', async (request, reply) => {
  reply.type('text/event-stream');
  reply.header('Cache-Control', 'no-cache');

  const stream = await openai.createChatCompletionStream({...});
  for await (const event of stream) {
    const data = event.choices[0].delta.content || '';
    reply.sse({ data: JSON.stringify({ data }) });
  }
});

// Hono
app.get('/api/stream', async (c) => {
  return c.streaming(async (write) => {
    const stream = await openai.createChatCompletionStream({...});
    for await (const event of stream) {
      const data = event.choices[0].delta.content || '';
      await write(`data: ${JSON.stringify({ data })}\n\n`);
    }
  });
});

Fastify and Hono have built-in streaming support. Express requires manual management.

Streaming From OpenAI/Anthropic API → SSE → Browser

Chain the streams: LLM API → server → browser.

app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  try {
    const openaiStream = await openai.createChatCompletionStream({
      model: 'gpt-4o',
      messages: [
        {
          role: 'system',
          content: 'You are a helpful assistant.'
        },
        {
          role: 'user',
          content: req.query.prompt
        }
      ],
      temperature: 0.7,
      max_tokens: 500
    });

    let tokenCount = 0;
    for await (const event of openaiStream) {
      const delta = event.choices[0].delta;

      // Stream content tokens
      if (delta.content) {
        tokenCount += delta.content.length;
        res.write(`data: ${JSON.stringify({
          type: 'content',
          delta: delta.content
        })}\n\n`);
      }

      // Stream finish reason
      if (delta.finish_reason) {
        res.write(`data: ${JSON.stringify({
          type: 'finish',
          reason: delta.finish_reason,
          tokens: tokenCount
        })}\n\n`);
      }
    }
  } catch (err) {
    res.write(`data: ${JSON.stringify({
      type: 'error',
      message: err.message
    })}\n\n`);
  }

  res.end();
});

// Client
const eventSource = new EventSource(`/api/stream?prompt=${encodeURIComponent(prompt)}`);
let response = '';

eventSource.onmessage = (event) => {
  const msg = JSON.parse(event.data);

  if (msg.type === 'content') {
    response += msg.delta;
    updateUI(response);
  } else if (msg.type === 'finish') {
    console.log(`Finished with reason: ${msg.reason}`);
    eventSource.close();
  } else if (msg.type === 'error') {
    console.error(msg.message);
    eventSource.close();
  }
};

Handling Client Disconnects

Networks are unreliable. Clients disconnect. Don't crash.

app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');

  const abortController = new AbortController();

  req.on('close', () => {
    console.log('Client disconnected');
    abortController.abort();
  });

  try {
    const stream = await openai.createChatCompletionStream(
      {
        model: 'gpt-4o',
        messages: [...],
        stream: true
      },
      { signal: abortController.signal }
    );

    for await (const event of stream) {
      if (abortController.signal.aborted) break;
      const data = event.choices[0].delta.content || '';
      const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);

      // Handle backpressure: if write buffer is full, pause reading
      if (!didWrite) {
        // Stop consuming from the stream
      }
    }
  } catch (err) {
    if (err.name === 'AbortError') {
      console.log('Stream aborted');
    } else {
      res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
    }
  }

  res.end();
});

Always listen for close events. Unsubscribe, stop processing, clean up resources.

Streaming Through a Load Balancer

Load balancers break streaming. They expect request-response pairs. Sticky sessions or alternative routing fixes this.

Nginx with sticky sessions:

upstream api {
  least_conn;
  server backend1:3000;
  server backend2:3000;
  server backend3:3000;
}

map $http_upgrade $connection_upgrade {
  default upgrade;
  '' close;
}

server {
  listen 80;

  location /api/stream {
    proxy_pass http://api;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection $connection_upgrade;
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  }
}

proxy_buffering off forces streaming data through without buffering. Sticky routing (via least_conn or IP hash) keeps a client on the same backend instance.

Redis Pub/Sub for Distributing Streams

For massively scaled systems, Redis pub/sub distributes streams across instances.

// Instance A: request comes in
app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');

  const streamId = uuidv4();
  const subscriber = redis.subscribe(`stream:${streamId}`);

  // Call LLM in a background worker
  await redis.publish('jobs', JSON.stringify({
    type: 'stream',
    streamId,
    prompt: req.query.prompt
  }));

  // Forward messages from Redis to client
  subscriber.on('message', (channel, message) => {
    res.write(`data: ${message}\n\n`);
  });

  req.on('close', () => {
    subscriber.unsubscribe();
  });
});

// Worker (any instance)
redis.subscribe('jobs', async (job) => {
  if (job.type === 'stream') {
    const stream = await openai.createChatCompletionStream({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: job.prompt }]
    });

    for await (const event of stream) {
      const data = event.choices[0].delta.content || '';
      await redis.publish(`stream:${job.streamId}`, JSON.stringify({ data }));
    }
  }
});

The requesting instance doesn't need to be the same instance processing the LLM. Redis pub/sub broadcasts across the cluster.

Backpressure With Streams

If the client is slow, the server backs up. Handle backpressure.

app.get('/api/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');

  const stream = await openai.createChatCompletionStream({...});

  for await (const event of stream) {
    const data = event.choices[0].delta.content || '';

    // res.write returns false if the write buffer is full
    const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);

    if (!didWrite) {
      // Wait for the client to drain the buffer
      await new Promise((resolve) => {
        res.once('drain', resolve);
      });
    }
  }

  res.end();
});

Without backpressure handling, large responses can cause memory leaks: the server accumulates buffered data waiting for slow clients.

Partial Response Recovery

If streaming fails midway, resume from a checkpoint.

app.get('/api/stream', async (req, res) => {
  const messageId = req.query.messageId;
  const cached = await cache.get(messageId);

  if (cached) {
    // Client reconnected; resend the cached response
    for (const chunk of cached.chunks) {
      res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    }
    res.end();
    return;
  }

  const chunks = [];
  const stream = await openai.createChatCompletionStream({...});

  for await (const event of stream) {
    const delta = event.choices[0].delta.content || '';
    const chunk = { data: delta };
    chunks.push(chunk);
    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }

  // Cache for recovery
  await cache.set(messageId, { chunks }, { ex: 3600 });
  res.end();
});

// Client
const messageId = uuidv4();
const eventSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);

eventSource.onerror = () => {
  // Reconnect with same messageId to resume
  const newSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);
  // Continue where we left off
};

Cache recent responses by ID. On reconnect, replay from cache before continuing.

Streaming With Next.js App Router

Next.js makes streaming first-class:

// app/api/stream/route.ts
export async function GET(request: Request) {
  const { searchParams } = new URL(request.url);
  const prompt = searchParams.get('prompt') || '';

  const stream = new ReadableStream({
    async start(controller) {
      try {
        const openaiStream = await openai.createChatCompletionStream({
          model: 'gpt-4o',
          messages: [{ role: 'user', content: prompt }],
          stream: true
        });

        for await (const event of openaiStream) {
          const delta = event.choices[0].delta.content || '';
          if (delta) {
            controller.enqueue(
              `data: ${JSON.stringify({ delta })}\n\n`
            );
          }
        }

        controller.close();
      } catch (err) {
        controller.error(err);
      }
    }
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    }
  });
}

Client-side with React:

export function StreamComponent() {
  const [response, setResponse] = useState('');

  useEffect(() => {
    const eventSource = new EventSource('/api/stream?prompt=hello');

    eventSource.onmessage = (event) => {
      const { delta } = JSON.parse(event.data);
      setResponse(prev => prev + delta);
    };

    return () => eventSource.close();
  }, []);

  return <div>{response}</div>;
}

Checklist

  • Use SSE for one-way AI streaming (not WebSockets)
  • Set Cache-Control: no-cache and Connection: keep-alive headers
  • Handle client disconnects with req.on('close')
  • Use proxy_buffering off in load balancer config
  • Implement backpressure with res.write() return value and drain events
  • Cache responses for recovery on reconnect
  • Test with slow clients to expose buffering issues
  • Monitor connection count and stream durations
  • Use Redis pub/sub for cross-instance streaming
  • Consider sticky sessions if not using pub/sub

Conclusion

Real-time AI streaming delights users. SSE provides a simple, reliable foundation. Load balancers, backpressure, and recovery mechanisms ensure streams work at scale. Pair these patterns with Redis for distributed streaming, and you'll handle thousands of concurrent AI conversations.