Real-Time AI Streaming Architecture — SSE, WebSockets, and Chunked Responses at Scale
Advertisement
Introduction
Users expect streaming responses from AI. Watching tokens arrive in real-time is better UX than waiting for a complete response. But streaming at scale introduces complexity: connection management, load balancing, backpressure, and distributed streaming across instances. This post covers production patterns.
- SSE vs WebSockets for AI Streaming
- Implementing SSE in Express/Fastify/Hono
- Streaming From OpenAI/Anthropic API → SSE → Browser
- Handling Client Disconnects
- Streaming Through a Load Balancer
- Redis Pub/Sub for Distributing Streams
- Backpressure With Streams
- Partial Response Recovery
- Streaming With Next.js App Router
- Checklist
- Conclusion
SSE vs WebSockets for AI Streaming
Both work. SSE wins for AI.
Server-Sent Events (SSE): One-way communication from server to client. Built on HTTP. Browser support via EventSource API. Simpler to implement, handles reconnects automatically, works through proxies.
WebSockets: Two-way TCP connection. Lower latency. Requires upgrade handshake. More complex to scale.
For AI streaming, SSE is sufficient. Users don't send data back during streaming; they consume the response. SSE's simplicity wins.
// SSE endpoint
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: req.query.prompt }],
});
for await (const event of stream) {
const delta = event.choices[0].delta.content || '';
if (delta) {
res.write(`data: ${JSON.stringify({ delta })}\n\n`);
}
}
res.end();
});
// Client
const eventSource = new EventSource('/api/stream?prompt=hello');
eventSource.onmessage = (event) => {
const { delta } = JSON.parse(event.data);
console.log(delta);
};
eventSource.onerror = () => eventSource.close();
SSE automatically handles retries. If the connection drops, the browser reconnects with the Last-Event-ID header.
Implementing SSE in Express/Fastify/Hono
All frameworks support SSE. The pattern is consistent: set headers, write data, handle backpressure.
// Express
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
// Don't buffer the entire response
res.socket.setNoDelay(true);
try {
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
res.write(`data: ${JSON.stringify({ data })}\n\n`);
}
} catch (err) {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
}
res.end();
});
// Fastify
fastify.get('/api/stream', async (request, reply) => {
reply.type('text/event-stream');
reply.header('Cache-Control', 'no-cache');
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
reply.sse({ data: JSON.stringify({ data }) });
}
});
// Hono
app.get('/api/stream', async (c) => {
return c.streaming(async (write) => {
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
await write(`data: ${JSON.stringify({ data })}\n\n`);
}
});
});
Fastify and Hono have built-in streaming support. Express requires manual management.
Streaming From OpenAI/Anthropic API → SSE → Browser
Chain the streams: LLM API → server → browser.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
try {
const openaiStream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: 'You are a helpful assistant.'
},
{
role: 'user',
content: req.query.prompt
}
],
temperature: 0.7,
max_tokens: 500
});
let tokenCount = 0;
for await (const event of openaiStream) {
const delta = event.choices[0].delta;
// Stream content tokens
if (delta.content) {
tokenCount += delta.content.length;
res.write(`data: ${JSON.stringify({
type: 'content',
delta: delta.content
})}\n\n`);
}
// Stream finish reason
if (delta.finish_reason) {
res.write(`data: ${JSON.stringify({
type: 'finish',
reason: delta.finish_reason,
tokens: tokenCount
})}\n\n`);
}
}
} catch (err) {
res.write(`data: ${JSON.stringify({
type: 'error',
message: err.message
})}\n\n`);
}
res.end();
});
// Client
const eventSource = new EventSource(`/api/stream?prompt=${encodeURIComponent(prompt)}`);
let response = '';
eventSource.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'content') {
response += msg.delta;
updateUI(response);
} else if (msg.type === 'finish') {
console.log(`Finished with reason: ${msg.reason}`);
eventSource.close();
} else if (msg.type === 'error') {
console.error(msg.message);
eventSource.close();
}
};
Handling Client Disconnects
Networks are unreliable. Clients disconnect. Don't crash.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const abortController = new AbortController();
req.on('close', () => {
console.log('Client disconnected');
abortController.abort();
});
try {
const stream = await openai.createChatCompletionStream(
{
model: 'gpt-4o',
messages: [...],
stream: true
},
{ signal: abortController.signal }
);
for await (const event of stream) {
if (abortController.signal.aborted) break;
const data = event.choices[0].delta.content || '';
const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);
// Handle backpressure: if write buffer is full, pause reading
if (!didWrite) {
// Stop consuming from the stream
}
}
} catch (err) {
if (err.name === 'AbortError') {
console.log('Stream aborted');
} else {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
}
}
res.end();
});
Always listen for close events. Unsubscribe, stop processing, clean up resources.
Streaming Through a Load Balancer
Load balancers break streaming. They expect request-response pairs. Sticky sessions or alternative routing fixes this.
Nginx with sticky sessions:
upstream api {
least_conn;
server backend1:3000;
server backend2:3000;
server backend3:3000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
location /api/stream {
proxy_pass http://api;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
proxy_buffering off forces streaming data through without buffering. Sticky routing (via least_conn or IP hash) keeps a client on the same backend instance.
Redis Pub/Sub for Distributing Streams
For massively scaled systems, Redis pub/sub distributes streams across instances.
// Instance A: request comes in
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const streamId = uuidv4();
const subscriber = redis.subscribe(`stream:${streamId}`);
// Call LLM in a background worker
await redis.publish('jobs', JSON.stringify({
type: 'stream',
streamId,
prompt: req.query.prompt
}));
// Forward messages from Redis to client
subscriber.on('message', (channel, message) => {
res.write(`data: ${message}\n\n`);
});
req.on('close', () => {
subscriber.unsubscribe();
});
});
// Worker (any instance)
redis.subscribe('jobs', async (job) => {
if (job.type === 'stream') {
const stream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: job.prompt }]
});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
await redis.publish(`stream:${job.streamId}`, JSON.stringify({ data }));
}
}
});
The requesting instance doesn't need to be the same instance processing the LLM. Redis pub/sub broadcasts across the cluster.
Backpressure With Streams
If the client is slow, the server backs up. Handle backpressure.
app.get('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const data = event.choices[0].delta.content || '';
// res.write returns false if the write buffer is full
const didWrite = res.write(`data: ${JSON.stringify({ data })}\n\n`);
if (!didWrite) {
// Wait for the client to drain the buffer
await new Promise((resolve) => {
res.once('drain', resolve);
});
}
}
res.end();
});
Without backpressure handling, large responses can cause memory leaks: the server accumulates buffered data waiting for slow clients.
Partial Response Recovery
If streaming fails midway, resume from a checkpoint.
app.get('/api/stream', async (req, res) => {
const messageId = req.query.messageId;
const cached = await cache.get(messageId);
if (cached) {
// Client reconnected; resend the cached response
for (const chunk of cached.chunks) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
return;
}
const chunks = [];
const stream = await openai.createChatCompletionStream({...});
for await (const event of stream) {
const delta = event.choices[0].delta.content || '';
const chunk = { data: delta };
chunks.push(chunk);
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
// Cache for recovery
await cache.set(messageId, { chunks }, { ex: 3600 });
res.end();
});
// Client
const messageId = uuidv4();
const eventSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);
eventSource.onerror = () => {
// Reconnect with same messageId to resume
const newSource = new EventSource(`/api/stream?prompt=hello&messageId=${messageId}`);
// Continue where we left off
};
Cache recent responses by ID. On reconnect, replay from cache before continuing.
Streaming With Next.js App Router
Next.js makes streaming first-class:
// app/api/stream/route.ts
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const prompt = searchParams.get('prompt') || '';
const stream = new ReadableStream({
async start(controller) {
try {
const openaiStream = await openai.createChatCompletionStream({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const event of openaiStream) {
const delta = event.choices[0].delta.content || '';
if (delta) {
controller.enqueue(
`data: ${JSON.stringify({ delta })}\n\n`
);
}
}
controller.close();
} catch (err) {
controller.error(err);
}
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
}
Client-side with React:
export function StreamComponent() {
const [response, setResponse] = useState('');
useEffect(() => {
const eventSource = new EventSource('/api/stream?prompt=hello');
eventSource.onmessage = (event) => {
const { delta } = JSON.parse(event.data);
setResponse(prev => prev + delta);
};
return () => eventSource.close();
}, []);
return <div>{response}</div>;
}
Checklist
- Use SSE for one-way AI streaming (not WebSockets)
- Set
Cache-Control: no-cacheandConnection: keep-aliveheaders - Handle client disconnects with
req.on('close') - Use
proxy_buffering offin load balancer config - Implement backpressure with
res.write()return value anddrainevents - Cache responses for recovery on reconnect
- Test with slow clients to expose buffering issues
- Monitor connection count and stream durations
- Use Redis pub/sub for cross-instance streaming
- Consider sticky sessions if not using pub/sub
Conclusion
Real-time AI streaming delights users. SSE provides a simple, reliable foundation. Load balancers, backpressure, and recovery mechanisms ensure streams work at scale. Pair these patterns with Redis for distributed streaming, and you'll handle thousands of concurrent AI conversations.
Advertisement
Written by