- Published on
Real-Time AI Streaming Architecture — SSE, WebSockets, and Chunked Responses at Scale
- Authors
- Name
Introduction
Users expect streaming responses from AI. Watching tokens arrive in real-time is better UX than waiting for a complete response. But streaming at scale introduces complexity: connection management, load balancing, backpressure, and distributed streaming across instances. This post covers production patterns.
- SSE vs WebSockets for AI Streaming
- Implementing SSE in Express/Fastify/Hono
- Streaming From OpenAI/Anthropic API → SSE → Browser
- Handling Client Disconnects
- Streaming Through a Load Balancer
- Redis Pub/Sub for Distributing Streams
- Backpressure With Streams
- Partial Response Recovery
- Streaming With Next.js App Router
- Checklist
- Conclusion
SSE vs WebSockets for AI Streaming
Both work. SSE wins for AI.
Server-Sent Events (SSE): One-way communication from server to client. Built on HTTP. Browser support via EventSource API. Simpler to implement, handles reconnects automatically, works through proxies.
WebSockets: Two-way TCP connection. Lower latency. Requires upgrade handshake. More complex to scale.
For AI streaming, SSE is sufficient. Users don't send data back during streaming; they consume the response. SSE's simplicity wins.
// SSE endpoint
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: req.query.prompt }],
});
for await (const event of stream) {
const delta = event.choices[0].delta.content || '';
if (delta) {
res.write(`data: ${JSON.stringify({ delta })}\n\n`);
}
}
res.end();
});
// Client
const eventSource = new EventSource('/api/stream?prompt=hello');
eventSource.onmessage = (event) => {
const { delta } = JSON.parse(event.data);
console.log(delta);
};
eventSource.onerror = () => eventSource.close();
SSE automatically handles retries. If the connection drops, the browser reconnects with the Last-Event-ID header.
Implementing SSE in Express/Fastify/Hono
All frameworks support SSE. The pattern is consistent: set headers, write data, handle backpressure.
// Express
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
// Don't buffer the entire response
res.socket.setNoDelay(true);
try {
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
res.write(`data: ${JSON.stringify({ data })}\n\n`);
}
} catch (err) {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
}
res.end();
});
// Fastify
fastify.get('/api/stream', async (request, reply) => {
reply.type('text/event-stream');
reply.header('Cache-Control', 'no-cache');
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
reply.sse({ data: JSON.stringify({ data }) });
}
});
// Hono
app.get('/api/stream', async (c) => {
return c.streaming(async (write) => {
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
await write(`data: ${JSON.stringify({ data })}\n\n`);
}
});
});
Fastify and Hono have built-in streaming support. Express requires manual management.
Streaming From OpenAI/Anthropic API → SSE → Browser
Chain the streams: LLM API → server → browser.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
try {
const openaiStream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: 'You are a helpful assistant.'
},
{
role: 'user',
content: req.query.prompt
}
],
temperature: 0.7,
max_tokens: 500
});
let tokenCount = 0;
for await (const event of openaiStream) {
const delta = event.choices[0].delta;
// Stream content tokens
if (delta.content) {
tokenCount += delta.content.length;
res.write(`data: ${JSON.stringify({
type: 'content',
delta: delta.content
})}\n\n`);
}
// Stream finish reason
if (delta.finish_reason) {
res.write(`data: ${JSON.stringify({
type: 'finish',
reason: delta.finish_reason,
tokens: tokenCount
})}\n\n`);
}
}
} catch (err) {
res.write(`data: ${JSON.stringify({
type: 'error',
message: err.message
})}\n\n`);
}
res.end();
});
// Client
const eventSource = new EventSource(`/api/stream?prompt=${encodeURIComponent(prompt)}`);
let response = '';
eventSource.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'content') {
response += msg.delta;
updateUI(response);
} else if (msg.type === 'finish') {
console.log(`Finished with reason: ${msg.reason}`);
eventSource.close();
} else if (msg.type === 'error') {
console.error(msg.message);
eventSource.close();
}
};
Handling Client Disconnects
Networks are unreliable. Clients disconnect. Don't crash.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const abortController = new AbortController();
req.on('close', () => {
console.log('Client disconnected');
abortController.abort();
});
try {
const stream = await openai.createChatCompletionStream(
{
model: 'gpt-4o',
messages: [...],
stream: true
},
{ signal: abortController.signal }
);
for await (const event of stream) {
if (abortController.signal.aborted) break;
const data = event.choices[0].delta.content || '';
const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);
// Handle backpressure: if write buffer is full, pause reading
if (!didWrite) {
// Stop consuming from the stream
}
}
} catch (err) {
if (err.name === 'AbortError') {
console.log('Stream aborted');
} else {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
}
}
res.end();
});
Always listen for close events. Unsubscribe, stop processing, clean up resources.
Streaming Through a Load Balancer
Load balancers break streaming. They expect request-response pairs. Sticky sessions or alternative routing fixes this.
Nginx with sticky sessions:
upstream api {
least_conn;
server backend1:3000;
server backend2:3000;
server backend3:3000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
location /api/stream {
proxy_pass http://api;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
proxy_buffering off forces streaming data through without buffering. Sticky routing (via least_conn or IP hash) keeps a client on the same backend instance.
Redis Pub/Sub for Distributing Streams
For massively scaled systems, Redis pub/sub distributes streams across instances.
// Instance A: request comes in
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const streamId = uuidv4();
const subscriber = redis.subscribe(`stream:${streamId}`);
// Call LLM in a background worker
await redis.publish('jobs', JSON.stringify({
type: 'stream',
streamId,
prompt: req.query.prompt
}));
// Forward messages from Redis to client
subscriber.on('message', (channel, message) => {
res.write(`data: ${message}\n\n`);
});
req.on('close', () => {
subscriber.unsubscribe();
});
});
// Worker (any instance)
redis.subscribe('jobs', async (job) => {
if (job.type === 'stream') {
const stream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: job.prompt }]
});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
await redis.publish(`stream:${job.streamId}`, JSON.stringify({ data }));
}
}
});
The requesting instance doesn't need to be the same instance processing the LLM. Redis pub/sub broadcasts across the cluster.
Backpressure With Streams
If the client is slow, the server backs up. Handle backpressure.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
// res.write returns false if the write buffer is full
const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);
if (!didWrite) {
// Wait for the client to drain the buffer
await new Promise((resolve) => {
res.once('drain', resolve);
});
}
}
res.end();
});
Without backpressure handling, large responses can cause memory leaks: the server accumulates buffered data waiting for slow clients.
Partial Response Recovery
If streaming fails midway, resume from a checkpoint.
app.get('/api/stream', async (req, res) => {
const messageId = req.query.messageId;
const cached = await cache.get(messageId);
if (cached) {
// Client reconnected; resend the cached response
for (const chunk of cached.chunks) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
return;
}
const chunks = [];
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const delta = event.choices[0].delta.content || '';
const chunk = { data: delta };
chunks.push(chunk);
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
// Cache for recovery
await cache.set(messageId, { chunks }, { ex: 3600 });
res.end();
});
// Client
const messageId = uuidv4();
const eventSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);
eventSource.onerror = () => {
// Reconnect with same messageId to resume
const newSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);
// Continue where we left off
};
Cache recent responses by ID. On reconnect, replay from cache before continuing.
Streaming With Next.js App Router
Next.js makes streaming first-class:
// app/api/stream/route.ts
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const prompt = searchParams.get('prompt') || '';
const stream = new ReadableStream({
async start(controller) {
try {
const openaiStream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const event of openaiStream) {
const delta = event.choices[0].delta.content || '';
if (delta) {
controller.enqueue(
`data: ${JSON.stringify({ delta })}\n\n`
);
}
}
controller.close();
} catch (err) {
controller.error(err);
}
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
}
Client-side with React:
export function StreamComponent() {
const [response, setResponse] = useState('');
useEffect(() => {
const eventSource = new EventSource('/api/stream?prompt=hello');
eventSource.onmessage = (event) => {
const { delta } = JSON.parse(event.data);
setResponse(prev => prev + delta);
};
return () => eventSource.close();
}, []);
return <div>{response}</div>;
}
Checklist
- Use SSE for one-way AI streaming (not WebSockets)
- Set
Cache-Control: no-cacheandConnection: keep-aliveheaders - Handle client disconnects with
req.on('close') - Use
proxy_buffering offin load balancer config - Implement backpressure with
res.write()return value anddrainevents - Cache responses for recovery on reconnect
- Test with slow clients to expose buffering issues
- Monitor connection count and stream durations
- Use Redis pub/sub for cross-instance streaming
- Consider sticky sessions if not using pub/sub
Conclusion
Real-time AI streaming delights users. SSE provides a simple, reliable foundation. Load balancers, backpressure, and recovery mechanisms ensure streams work at scale. Pair these patterns with Redis for distributed streaming, and you'll handle thousands of concurrent AI conversations.