- Published on
Duplicate Event Processing — When Your Queue Delivers the Same Message Twice
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Kafka, RabbitMQ, SQS — every major message queue makes an "at-least-once" delivery guarantee. That means they will, under failure conditions, deliver the same message more than once. Your consumer must handle this gracefully.
If it doesn't, duplicate events cause real damage: orders shipped twice, emails sent twice, balances debited twice.
- Why Queues Deliver Duplicates
- Fix 1: Track Processed Event IDs in Redis
- Fix 2: Database Unique Constraint (Stronger Guarantee)
- Fix 3: Idempotent Operations by Design
- Fix 4: Kafka Consumer Group Exactly-Once (Kafka 0.11+)
- Fix 5: Extend Visibility Timeout for Long Processing
- Detecting Duplicate Events in Production
- Conclusion
Why Queues Deliver Duplicates
Consumer Kafka/SQS
| |
|<-- deliver message ------|
| |
|-- processing... |
| (takes 3 minutes) |
| |
| | <-- visibility timeout (2 min) expires
| |<-- message becomes visible again
| |
[Consumer 2 picks it up] |
|<-- deliver same message --| ← DUPLICATE
| |
|-- ack ------------------>| (Consumer 1 finally acks)
| |
Processing took longer than the visibility timeout. SQS made the message visible again. Consumer 2 processed the same message. Both consumers acknowledged it. You now have two orders.
Fix 1: Track Processed Event IDs in Redis
import { Redis } from 'ioredis'
class IdempotentConsumer {
constructor(
private redis: Redis,
private ttlSeconds: number = 86400 * 7 // 7 days
) {}
async processOnce<T>(
eventId: string,
handler: () => Promise<T>
): Promise<{ processed: boolean; result?: T }> {
const key = `processed:${eventId}`
// SET NX — only set if not exists, atomic
const acquired = await this.redis.set(key, '1', 'EX', this.ttlSeconds, 'NX')
if (!acquired) {
// Already processed — skip
console.log(`Skipping duplicate event: ${eventId}`)
return { processed: false }
}
try {
const result = await handler()
return { processed: true, result }
} catch (err) {
// Remove the key so we can retry on actual failures
await this.redis.del(key)
throw err
}
}
}
// Usage
const consumer = new IdempotentConsumer(redis)
async function handleOrderPlaced(event: OrderPlacedEvent) {
const { processed } = await consumer.processOnce(
event.eventId,
async () => {
await db.order.create(event.order)
await emailService.sendConfirmation(event.order.email)
await inventoryService.reserve(event.order.items)
}
)
if (!processed) {
console.log(`Duplicate event ${event.eventId} skipped`)
}
}
Fix 2: Database Unique Constraint (Stronger Guarantee)
// Redis can fail — database constraint is more durable
// Schema
// CREATE TABLE processed_events (
// event_id VARCHAR(255) PRIMARY KEY,
// processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// handler VARCHAR(100) NOT NULL
// );
async function processWithDbDedup(
eventId: string,
handler: string,
fn: () => Promise<void>
) {
return db.transaction(async (trx) => {
// Try to insert — unique constraint prevents double processing
const inserted = await trx.raw(`
INSERT INTO processed_events (event_id, handler)
VALUES (?, ?)
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id
`, [eventId, handler])
if (inserted.rows.length === 0) {
// Row already exists — duplicate event
console.log(`Duplicate event ${eventId} ignored`)
return
}
// Process within same transaction
await fn()
// If fn() throws, the INSERT rolls back too → can retry safely
})
}
// Kafka consumer
consumer.on('message', async (message) => {
const event = JSON.parse(message.value)
await processWithDbDedup(
event.eventId,
'order-fulfillment',
async () => {
await fulfillOrder(event.orderId)
}
)
await message.commit()
})
Fix 3: Idempotent Operations by Design
// Some operations can be made naturally idempotent
// ❌ Not idempotent — inserts new row each time
await db.query('INSERT INTO emails_sent (order_id, email) VALUES (?, ?)', [orderId, email])
// ✅ Idempotent — INSERT IGNORE or ON CONFLICT DO NOTHING
await db.query(`
INSERT INTO emails_sent (order_id, email)
VALUES (?, ?)
ON CONFLICT (order_id) DO NOTHING
`, [orderId, email])
// ❌ Not idempotent — adds to balance each time
await db.query('UPDATE wallets SET balance = balance + ? WHERE user_id = ?', [amount, userId])
// ✅ Idempotent — use SET instead of increment with dedup check
await db.transaction(async (trx) => {
const alreadyApplied = await trx('transactions')
.where({ idempotency_key: txKey })
.first()
if (!alreadyApplied) {
await trx('wallets').where({ user_id: userId }).increment('balance', amount)
await trx('transactions').insert({ idempotency_key: txKey, amount, user_id: userId })
}
})
Fix 4: Kafka Consumer Group Exactly-Once (Kafka 0.11+)
import { Kafka } from 'kafkajs'
const kafka = new Kafka({ brokers: ['localhost:9092'] })
const consumer = kafka.consumer({ groupId: 'order-processor' })
// With transactional producer: exactly-once semantics
const producer = kafka.producer({
transactionalId: 'order-processor-txn',
maxInFlightRequests: 1, // Required for exactly-once
idempotent: true,
})
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString())
// Process in a transaction that commits offset + output atomically
await producer.transaction(async (tx) => {
// Do work
await processOrder(event)
// Produce output event
await tx.send({
topic: 'order-confirmed',
messages: [{ value: JSON.stringify({ orderId: event.orderId }) }],
})
// Commit the input offset within the same transaction
await tx.sendOffsets({
consumerGroupId: 'order-processor',
topics: [{
topic,
partitions: [{ partition, offset: (BigInt(message.offset) + 1n).toString() }],
}],
})
})
// If transaction aborts, offset doesn't advance — message redelivered
// But the transaction ensures output + offset commit are atomic
},
})
Fix 5: Extend Visibility Timeout for Long Processing
// SQS: extend visibility timeout while processing to prevent re-delivery
import { SQSClient, ChangeMessageVisibilityCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs'
const sqs = new SQSClient({})
async function processWithExtension(message: SQSMessage, handler: () => Promise<void>) {
const PROCESSING_TIMEOUT = 120_000 // Expected max processing time
const EXTENSION_INTERVAL = 60_000 // Extend every 60s
const EXTENSION_AMOUNT = 90 // Extend by 90s each time
// Keep extending visibility timeout while we process
const extender = setInterval(async () => {
await sqs.send(new ChangeMessageVisibilityCommand({
QueueUrl: process.env.QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: EXTENSION_AMOUNT,
}))
console.log('Extended visibility timeout for long-running message')
}, EXTENSION_INTERVAL)
try {
await handler()
// Delete message after successful processing
await sqs.send(new DeleteMessageCommand({
QueueUrl: process.env.QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
}))
} finally {
clearInterval(extender)
}
}
Detecting Duplicate Events in Production
// Monitor duplicate rates — high rate indicates a problem
const metrics = {
totalEvents: new Counter({ name: 'events_received_total' }),
duplicateEvents: new Counter({ name: 'events_duplicate_total' }),
}
async function handleEvent(event: Event) {
metrics.totalEvents.inc({ handler: event.type })
const isDuplicate = !(await consumer.processOnce(event.eventId, async () => {
await processEvent(event)
})).processed
if (isDuplicate) {
metrics.duplicateEvents.inc({ handler: event.type })
}
}
// Alert if duplicate rate > 1% — indicates infrastructure or consumer issues
Conclusion
At-least-once delivery is the default in every production message queue — duplicates will happen. Your consumers must be idempotent. The simplest approach is a Redis SET NX check on event ID at the start of each handler. For stronger guarantees, combine with a unique database constraint inside the same transaction as your business logic. For Kafka, transactional producers achieve exactly-once by atomically committing output + consumer offset. Regardless of approach, monitor your duplicate rate — a sudden spike indicates either infrastructure issues or a consumer that's crashing before acknowledging messages.