- Published on
Redis Streams — The Missing Middle Ground Between Queues and Kafka
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Kafka is powerful but heavy. Redis pub/sub loses messages on publish. Redis Streams lives in the middle: ordered, persistent, with consumer groups—all in the single Redis instance you already run.
Many teams deploy Redis Streams to production and never look back. It handles activity feeds, notifications, event logs, and job queues with one data structure. Understand when it fits and when you need Kafka.
- Redis Streams vs Redis Pub/Sub vs Kafka
- XADD and XREAD Commands
- Consumer Groups for Parallel Processing
- XACK and Dead Letter Handling
- Trimming Streams with MAXLEN
- Using Redis Streams as an Event Log
- Building a Real-Time Activity Feed
- Building a Live Notification System
- Pending Entries List (PEL) for Reliability
- Redis Streams vs BullMQ for Background Jobs
- When Redis Streams is Enough
- Checklist
- Conclusion
Redis Streams vs Redis Pub/Sub vs Kafka
Redis pub/sub: Fires-and-forgets. Subscribers must be connected at publish time. Lost if disconnected. Use for chat, live updates when you don't care if a message disappears.
Redis Streams: Persistent ordered log. Consumers can replay history. Consumer groups enable parallel processing with automatic offset management. No external dependencies. Latency in microseconds.
Kafka: Distributed, replicated, fault-tolerant. Scales horizontally. Supports complex topologies (Streams, KSQL, Flink). Operational overhead. Latency in milliseconds.
For sub-100k QPS with a single deployment zone, Redis Streams wins on simplicity. Multi-region deployments, massive scale, or complex topologies? Choose Kafka.
XADD and XREAD Commands
XADD appends an entry to a stream:
import redis from 'redis';
const client = redis.createClient();
await client.connect();
const id = await client.xAdd('activity', '*', {
userId: '123',
action: 'purchase',
amount: 9.99,
timestamp: Date.now().toString()
});
console.log('Added with ID:', id); // "1710761234567-0"
The asterisk tells Redis to auto-generate an ID (milliseconds since epoch + sequence number).
XREAD consumes entries:
const entries = await client.xRead(
[{ key: 'activity', id: '0' }], // Start from beginning
{ count: 10 }
);
entries?.forEach(([stream, messages]) => {
messages.forEach(({ id, message }) => {
console.log(`ID ${id}:`, message);
});
});
Consumer Groups for Parallel Processing
Without consumer groups, every reader sees all messages. With consumer groups, messages are distributed across consumers—each processed once.
const groupName = 'notification-consumers';
// Create the group (idempotent)
await client.xGroupCreate('activity', groupName, '0', {
MKSTREAM: true
});
// Consumer 1
const consumer1Id = 'consumer-1';
const messages1 = await client.xReadGroup(
{ key: 'activity', id: '>' },
groupName,
consumer1Id,
{ count: 1 }
);
// Consumer 2
const consumer2Id = 'consumer-2';
const messages2 = await client.xReadGroup(
{ key: 'activity', id: '>' },
groupName,
consumer2Id,
{ count: 1 }
);
Now multiple consumers pull from the same stream, each processing different messages. Redis automatically distributes based on pending entries.
XACK and Dead Letter Handling
When a consumer processes a message, acknowledge it:
const messages = await client.xReadGroup(
{ key: 'activity', id: '>' },
groupName,
consumerId,
{ count: 10 }
);
for (const { id, message } of messages[0][1]) {
try {
await processMessage(message);
// Acknowledge success
await client.xAck('activity', groupName, id);
} catch (error) {
console.error('Failed to process:', error);
// Don't acknowledge; message stays in pending list
}
}
Messages that fail remain in the Pending Entries List (PEL). A separate consumer can retry them or move them to a dead-letter stream:
// Check pending messages
const pending = await client.xPending('activity', groupName);
console.log('Pending:', pending.count);
// Manually move to dead-letter stream after max retries
const deadLetterStream = 'activity-dlq';
for (const id of deadLetterIds) {
const message = await client.xRangeCount('activity', id, id);
if (message.length) {
await client.xAdd(deadLetterStream, '*', message[0][1]);
await client.xAck('activity', groupName, id);
}
}
Trimming Streams with MAXLEN
Streams grow unbounded. Trim old entries:
// Keep last 1 million entries
await client.xTrimMaxLen('activity', {
count: 1000000,
strategy: 'MAXLEN' // Or '~' for approximate
});
// Or trim by age (keep last 24 hours)
const dayAgo = (Date.now() - 24 * 60 * 60 * 1000).toString();
await client.xTrimMinId('activity', dayAgo);
Use approximate trimming (~) for high-throughput streams to reduce CPU cost.
Using Redis Streams as an Event Log
Treat a stream as an immutable event log. Append-only, ordered by timestamp. Perfect for auditing:
await client.xAdd('audit-log', '*', {
userId: '123',
action: 'login',
ipAddress: '192.0.2.1',
timestamp: new Date().toISOString()
});
// Replay all events for a user
const events = await client.xRead([
{ key: 'audit-log', id: '0' }
]);
const userEvents = events?.[0][1]
.map(([id, data]) => data)
.filter((data) => data.userId === '123');
console.log('Audit trail:', userEvents);
Event logs in Streams enable compliance, debugging, and understanding system state at any point in time.
Building a Real-Time Activity Feed
Users see a feed of recent activity from followed accounts. Use a stream as the activity source:
// User A posts
await client.xAdd('activity', '*', {
actor: 'userA',
type: 'post',
postId: '456',
timestamp: Date.now().toString()
});
// Subscribe followers
async function getActivityFeed(userId: string) {
const followedUsers = await client.sMembers(`user:${userId}:following`);
// Read last 100 entries and filter
const entries = await client.xRevRange('activity', '+', '-', { count: 100 });
const feed = entries
.map(([id, data]) => ({ id, ...data }))
.filter(({ actor }) => followedUsers.includes(actor));
return feed;
}
Real-time: combine with pub/sub to push new activities to connected clients:
const subscriber = client.duplicate();
await subscriber.subscribe('activity:new', (message) => {
const activity = JSON.parse(message);
console.log('New activity:', activity);
});
await client.xAdd('activity', '*', activityData);
await client.publish('activity:new', JSON.stringify(activityData));
Building a Live Notification System
Append notifications to a stream per user:
async function sendNotification(userId: string, content: string) {
const notificationId = await client.xAdd(
`notifications:${userId}`,
'*',
{
content,
read: 'false',
createdAt: Date.now().toString()
}
);
// Push to connected clients
await client.publish(
`notifications:${userId}:live`,
JSON.stringify({ id: notificationId, content })
);
}
async function markNotificationRead(userId: string, notificationId: string) {
// Notifications are append-only; create a read marker entry
await client.xAdd(
`notifications:${userId}`,
'*',
{
markedReadId: notificationId,
readAt: Date.now().toString()
}
);
}
// Get unread notifications
async function getUnreadNotifications(userId: string) {
const notifications = await client.xRevRange(
`notifications:${userId}`,
'+',
'-',
{ count: 100 }
);
let readIds = new Set<string>();
for (const [id, data] of notifications) {
if (data.markedReadId) {
readIds.add(data.markedReadId);
}
}
return notifications.filter(
([id, data]) => !data.markedReadId && !readIds.has(id)
);
}
Pending Entries List (PEL) for Reliability
When a consumer claims a message, it enters the PEL. If the consumer crashes before acknowledging, the message remains pending. Other consumers can claim and retry it:
// Consumer 1 dies mid-processing
// Consumer 2 checks for pending messages
const pending = await client.xPending(
'activity',
groupName,
{ start: '-', end: '+', count: 10 }
);
for (const entry of pending) {
const messageId = entry.id;
const attempts = entry.deliveryCount;
if (attempts > 5) {
// Too many failures, send to DLQ
continue;
}
// Claim the message
const [claimed] = await client.xAutoClaim(
'activity',
groupName,
consumer2Id,
1000, // Min idle time (ms)
'0' // Start from oldest pending
);
// Retry processing
for (const [id, data] of claimed) {
await processMessage(data);
await client.xAck('activity', groupName, id);
}
}
PEL provides exactly-once semantics at the Stream level. Combine with idempotent handlers for true end-to-end delivery guarantees.
Redis Streams vs BullMQ for Background Jobs
BullMQ is a job queue library on Redis. When do you use Streams vs BullMQ?
Streams: When you need an audit trail, replaying events, or low-latency processing. Multiple independent consumers process the same event stream.
BullMQ: When you need reliable job processing with retries, delays, priorities, and job-specific state. One job is processed by one worker.
Use Streams for event logs and activity feeds. Use BullMQ for background job processing. They're complementary.
When Redis Streams is Enough
Redis Streams suffices when:
- Single deployment region
- <100k QPS
- Durability via persistence (AOF or RDB snapshots)
- No complex topology (Streams is simpler than Kafka Streams)
- You already run Redis
Graduate to Kafka when:
- Multi-region deployment
- >100k QPS
- Complex event processing (correlation, windowing, joins)
- Need isolation from your operational database (Redis)
Checklist
- Choose between Streams, pub/sub, and Kafka for your use case
- Design your stream schema
- Implement consumer groups for parallel processing
- Add dead-letter handling for failed messages
- Set appropriate stream trimming policies
- Monitor PEL size and consumer lag
- Test failure scenarios (consumer crash, network partition)
- Implement idempotent message handlers
Conclusion
Redis Streams bridge the gap between pub/sub simplicity and Kafka complexity. For most teams operating at mid-scale, Streams handle activity feeds, notifications, and event logs without operational overhead.
Start with Streams. Monitor your QPS, latency, and data retention. Only migrate to Kafka when Streams hits its limits. Until then, keep infrastructure simple.