Published on

Duplicate Event Processing — When Your Queue Delivers the Same Message Twice

Authors

Introduction

Kafka, RabbitMQ, SQS — every major message queue makes an "at-least-once" delivery guarantee. That means they will, under failure conditions, deliver the same message more than once. Your consumer must handle this gracefully.

If it doesn't, duplicate events cause real damage: orders shipped twice, emails sent twice, balances debited twice.

Why Queues Deliver Duplicates

Consumer                  Kafka/SQS
   |                          |
   |<-- deliver message ------|
   |                          |
   |-- processing...          |
   |   (takes 3 minutes)      |
   |                          |
   |                          | <-- visibility timeout (2 min) expires
   |                          |<-- message becomes visible again
   |                          |
   [Consumer 2 picks it up]   |
   |<-- deliver same message --|DUPLICATE
   |                          |
   |-- ack ------------------>|  (Consumer 1 finally acks)
   |                          |

Processing took longer than the visibility timeout. SQS made the message visible again. Consumer 2 processed the same message. Both consumers acknowledged it. You now have two orders.

Fix 1: Track Processed Event IDs in Redis

import { Redis } from 'ioredis'

class IdempotentConsumer {
  constructor(
    private redis: Redis,
    private ttlSeconds: number = 86400 * 7  // 7 days
  ) {}

  async processOnce<T>(
    eventId: string,
    handler: () => Promise<T>
  ): Promise<{ processed: boolean; result?: T }> {
    const key = `processed:${eventId}`

    // SET NX — only set if not exists, atomic
    const acquired = await this.redis.set(key, '1', 'EX', this.ttlSeconds, 'NX')

    if (!acquired) {
      // Already processed — skip
      console.log(`Skipping duplicate event: ${eventId}`)
      return { processed: false }
    }

    try {
      const result = await handler()
      return { processed: true, result }
    } catch (err) {
      // Remove the key so we can retry on actual failures
      await this.redis.del(key)
      throw err
    }
  }
}

// Usage
const consumer = new IdempotentConsumer(redis)

async function handleOrderPlaced(event: OrderPlacedEvent) {
  const { processed } = await consumer.processOnce(
    event.eventId,
    async () => {
      await db.order.create(event.order)
      await emailService.sendConfirmation(event.order.email)
      await inventoryService.reserve(event.order.items)
    }
  )

  if (!processed) {
    console.log(`Duplicate event ${event.eventId} skipped`)
  }
}

Fix 2: Database Unique Constraint (Stronger Guarantee)

// Redis can fail — database constraint is more durable

// Schema
// CREATE TABLE processed_events (
//   event_id VARCHAR(255) PRIMARY KEY,
//   processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//   handler VARCHAR(100) NOT NULL
// );

async function processWithDbDedup(
  eventId: string,
  handler: string,
  fn: () => Promise<void>
) {
  return db.transaction(async (trx) => {
    // Try to insert — unique constraint prevents double processing
    const inserted = await trx.raw(`
      INSERT INTO processed_events (event_id, handler)
      VALUES (?, ?)
      ON CONFLICT (event_id) DO NOTHING
      RETURNING event_id
    `, [eventId, handler])

    if (inserted.rows.length === 0) {
      // Row already exists — duplicate event
      console.log(`Duplicate event ${eventId} ignored`)
      return
    }

    // Process within same transaction
    await fn()
    // If fn() throws, the INSERT rolls back too → can retry safely
  })
}

// Kafka consumer
consumer.on('message', async (message) => {
  const event = JSON.parse(message.value)

  await processWithDbDedup(
    event.eventId,
    'order-fulfillment',
    async () => {
      await fulfillOrder(event.orderId)
    }
  )

  await message.commit()
})

Fix 3: Idempotent Operations by Design

// Some operations can be made naturally idempotent

// ❌ Not idempotent — inserts new row each time
await db.query('INSERT INTO emails_sent (order_id, email) VALUES (?, ?)', [orderId, email])

// ✅ Idempotent — INSERT IGNORE or ON CONFLICT DO NOTHING
await db.query(`
  INSERT INTO emails_sent (order_id, email)
  VALUES (?, ?)
  ON CONFLICT (order_id) DO NOTHING
`, [orderId, email])

// ❌ Not idempotent — adds to balance each time
await db.query('UPDATE wallets SET balance = balance + ? WHERE user_id = ?', [amount, userId])

// ✅ Idempotent — use SET instead of increment with dedup check
await db.transaction(async (trx) => {
  const alreadyApplied = await trx('transactions')
    .where({ idempotency_key: txKey })
    .first()

  if (!alreadyApplied) {
    await trx('wallets').where({ user_id: userId }).increment('balance', amount)
    await trx('transactions').insert({ idempotency_key: txKey, amount, user_id: userId })
  }
})

Fix 4: Kafka Consumer Group Exactly-Once (Kafka 0.11+)

import { Kafka } from 'kafkajs'

const kafka = new Kafka({ brokers: ['localhost:9092'] })
const consumer = kafka.consumer({ groupId: 'order-processor' })

// With transactional producer: exactly-once semantics
const producer = kafka.producer({
  transactionalId: 'order-processor-txn',
  maxInFlightRequests: 1,  // Required for exactly-once
  idempotent: true,
})

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString())

    // Process in a transaction that commits offset + output atomically
    await producer.transaction(async (tx) => {
      // Do work
      await processOrder(event)

      // Produce output event
      await tx.send({
        topic: 'order-confirmed',
        messages: [{ value: JSON.stringify({ orderId: event.orderId }) }],
      })

      // Commit the input offset within the same transaction
      await tx.sendOffsets({
        consumerGroupId: 'order-processor',
        topics: [{
          topic,
          partitions: [{ partition, offset: (BigInt(message.offset) + 1n).toString() }],
        }],
      })
    })
    // If transaction aborts, offset doesn't advance — message redelivered
    // But the transaction ensures output + offset commit are atomic
  },
})

Fix 5: Extend Visibility Timeout for Long Processing

// SQS: extend visibility timeout while processing to prevent re-delivery

import { SQSClient, ChangeMessageVisibilityCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs'

const sqs = new SQSClient({})

async function processWithExtension(message: SQSMessage, handler: () => Promise<void>) {
  const PROCESSING_TIMEOUT = 120_000  // Expected max processing time
  const EXTENSION_INTERVAL = 60_000  // Extend every 60s
  const EXTENSION_AMOUNT = 90        // Extend by 90s each time

  // Keep extending visibility timeout while we process
  const extender = setInterval(async () => {
    await sqs.send(new ChangeMessageVisibilityCommand({
      QueueUrl: process.env.QUEUE_URL,
      ReceiptHandle: message.ReceiptHandle,
      VisibilityTimeout: EXTENSION_AMOUNT,
    }))
    console.log('Extended visibility timeout for long-running message')
  }, EXTENSION_INTERVAL)

  try {
    await handler()

    // Delete message after successful processing
    await sqs.send(new DeleteMessageCommand({
      QueueUrl: process.env.QUEUE_URL,
      ReceiptHandle: message.ReceiptHandle,
    }))
  } finally {
    clearInterval(extender)
  }
}

Detecting Duplicate Events in Production

// Monitor duplicate rates — high rate indicates a problem
const metrics = {
  totalEvents: new Counter({ name: 'events_received_total' }),
  duplicateEvents: new Counter({ name: 'events_duplicate_total' }),
}

async function handleEvent(event: Event) {
  metrics.totalEvents.inc({ handler: event.type })

  const isDuplicate = !(await consumer.processOnce(event.eventId, async () => {
    await processEvent(event)
  })).processed

  if (isDuplicate) {
    metrics.duplicateEvents.inc({ handler: event.type })
  }
}

// Alert if duplicate rate > 1% — indicates infrastructure or consumer issues

Conclusion

At-least-once delivery is the default in every production message queue — duplicates will happen. Your consumers must be idempotent. The simplest approach is a Redis SET NX check on event ID at the start of each handler. For stronger guarantees, combine with a unique database constraint inside the same transaction as your business logic. For Kafka, transactional producers achieve exactly-once by atomically committing output + consumer offset. Regardless of approach, monitor your duplicate rate — a sudden spike indicates either infrastructure issues or a consumer that's crashing before acknowledging messages.