Published on

Event Ordering Problem — When Events Arrive Out of Sequence

Authors

Introduction

Event-driven systems assume events are processed in the order they were produced. But networks don't guarantee order. Kafka partitions can rebalance. Consumers can crash mid-batch. Retries can reorder messages.

When "order cancelled" arrives before "order created," your state machine breaks.

The Ordering Problem

Events produced (in order):
  t=0ms  OrderCreated { orderId: "123" }
  t=100ms OrderPaid   { orderId: "123" }
  t=200ms OrderShipped { orderId: "123" }

Events consumed (out of order):
  t=0ms  OrderShippedERROR: order doesn't exist
  t=10ms OrderCreated → creates order
  t=20ms OrderPaid    → order not in "created" state, transition fails

Cause 1: Multiple Kafka Partitions Without Partition Key

// ❌ No partition key — events for same order go to random partitions
// Kafka guarantees order WITHIN a partition, not across partitions

await producer.send({
  topic: 'orders',
  messages: [
    { value: JSON.stringify(orderCreatedEvent) },    // → Partition 0
    { value: JSON.stringify(orderPaidEvent) },       // → Partition 2
    { value: JSON.stringify(orderShippedEvent) },    // → Partition 1
  ]
})
// Three different partitions = three different queues = no order guarantee
// ✅ Use partition key — all events for same entity go to same partition
await producer.send({
  topic: 'orders',
  messages: [
    {
      key: orderId,  // Same key → same partition → ordered
      value: JSON.stringify(orderCreatedEvent),
    },
    {
      key: orderId,
      value: JSON.stringify(orderPaidEvent),
    },
    {
      key: orderId,
      value: JSON.stringify(orderShippedEvent),
    },
  ]
})
// All events for orderId "123" always go to the same partition
// Within that partition, they're guaranteed ordered

Cause 2: Multiple Consumers on Same Partition

// ❌ Concurrent processing within a consumer destroys order
consumer.on('message', async (message) => {
  // Processes multiple messages concurrently
  processEvent(JSON.parse(message.value))  // No await!
  // Message 3 might finish before Message 1
})
// ✅ Process one message at a time within a partition
import { Kafka } from 'kafkajs'

const consumer = kafka.consumer({ groupId: 'order-service' })

await consumer.run({
  // eachMessage processes one at a time — awaits completion before next
  eachMessage: async ({ message }) => {
    await processEvent(JSON.parse(message.value!.toString()))
  },
  // NOT eachBatch — which could parallelize incorrectly
})

Fix 1: Event Sequencing with Version Numbers

// Add sequence number to each event
interface OrderEvent {
  eventId: string
  orderId: string
  type: 'OrderCreated' | 'OrderPaid' | 'OrderShipped' | 'OrderCancelled'
  sequence: number       // Monotonically increasing per aggregate
  timestamp: number
  payload: any
}

// Consumer: check sequence before applying
class OrderProjection {
  private versions = new Map<string, number>()  // orderId → last processed sequence

  async apply(event: OrderEvent): Promise<void> {
    const lastSeq = this.versions.get(event.orderId) ?? -1

    if (event.sequence <= lastSeq) {
      console.log(`Skipping already-applied event: ${event.eventId} (seq ${event.sequence})`)
      return  // Already processed this or a later version
    }

    if (event.sequence !== lastSeq + 1) {
      // Gap detected — event arrived out of order
      console.warn(`Out-of-order event for ${event.orderId}: expected ${lastSeq + 1}, got ${event.sequence}`)
      await this.bufferAndWait(event)
      return
    }

    await this.applyEvent(event)
    this.versions.set(event.orderId, event.sequence)
  }

  private async bufferAndWait(event: OrderEvent): Promise<void> {
    // Buffer this event, wait for the missing one
    const bufferKey = `event-buffer:${event.orderId}`
    await redis.zadd(bufferKey, event.sequence, JSON.stringify(event))
    await redis.expire(bufferKey, 300)  // 5 minute buffer window

    // Try to drain the buffer after a short wait
    setTimeout(() => this.drainBuffer(event.orderId), 1000)
  }

  private async drainBuffer(orderId: string): Promise<void> {
    const bufferKey = `event-buffer:${orderId}`
    const buffered = await redis.zrangebyscore(bufferKey, '-inf', '+inf', 'WITHSCORES')
    // Apply buffered events in order...
  }

  private async applyEvent(event: OrderEvent): Promise<void> {
    // Actually process the event
    switch (event.type) {
      case 'OrderCreated': await this.createOrder(event.payload); break
      case 'OrderPaid': await this.markPaid(event.orderId); break
      case 'OrderShipped': await this.markShipped(event.orderId); break
    }
  }
}

Fix 2: Optimistic Concurrency on the Aggregate

// Use database version column to reject out-of-order updates

// Schema: orders table has a `version` column (starts at 0)

async function applyEvent(event: OrderEvent): Promise<void> {
  const expectedVersion = event.sequence - 1

  const updated = await db.raw(`
    UPDATE orders
    SET
      status = ?,
      version = version + 1,
      updated_at = NOW()
    WHERE
      id = ?
      AND version = ?   -- Optimistic concurrency check
  `, [getNewStatus(event), event.orderId, expectedVersion])

  if (updated.rowCount === 0) {
    const order = await db('orders').where({ id: event.orderId }).first()

    if (!order) {
      // Order doesn't exist yet — event arrived before OrderCreated
      throw new OutOfOrderError(`Order ${event.orderId} not found — requeue event`)
    }

    if (order.version > expectedVersion) {
      // A later event already applied — skip this one (already superseded)
      console.log(`Skipping superseded event for order ${event.orderId}`)
      return
    }

    throw new ConcurrentModificationError(`Version conflict for order ${event.orderId}`)
  }
}

Fix 3: Delayed Retry for Out-of-Order Events

// When an event depends on a predecessor that hasn't arrived yet, delay and retry

class OrderEventConsumer {
  async handle(event: OrderEvent): Promise<void> {
    try {
      await this.apply(event)
    } catch (err) {
      if (err instanceof OutOfOrderError) {
        // Predecessor event hasn't arrived yet
        // Delay retry — give predecessor time to be processed
        const delayMs = this.calculateDelay(event.retryCount ?? 0)

        await this.scheduleRetry(event, delayMs)
        return  // Don't throw — don't dead-letter this event
      }
      throw err  // Re-throw other errors for DLQ
    }
  }

  private calculateDelay(retryCount: number): number {
    // Exponential backoff: 1s, 2s, 4s, 8s, 16s...
    return Math.min(1000 * Math.pow(2, retryCount), 30_000)
  }

  private async scheduleRetry(event: OrderEvent, delayMs: number): Promise<void> {
    const retryEvent = { ...event, retryCount: (event.retryCount ?? 0) + 1 }

    if (retryEvent.retryCount > 5) {
      // Give up — send to dead letter queue for manual investigation
      await deadLetterQueue.publish(retryEvent)
      return
    }

    // Schedule delayed retry via Bull
    await retryQueue.add(retryEvent, { delay: delayMs })
  }
}

Fix 4: Event Sourcing with Snapshots

// Event sourcing: rebuild current state from ordered event log

class OrderAggregate {
  private events: OrderEvent[] = []
  private state: OrderState = { status: 'initial', version: -1 }

  // Load from event store — events already ordered by sequence
  static async load(orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate()
    const events = await db('events')
      .where({ aggregate_id: orderId })
      .orderBy('sequence', 'asc')

    for (const event of events) {
      aggregate.applyEvent(event)
    }
    return aggregate
  }

  applyEvent(event: OrderEvent): void {
    // Guard: reject if sequence is not monotonically increasing
    if (event.sequence !== this.state.version + 1) {
      throw new Error(`Event out of sequence: ${event.sequence}, expected ${this.state.version + 1}`)
    }

    switch (event.type) {
      case 'OrderCreated':
        this.state = { ...this.state, status: 'created', version: event.sequence }
        break
      case 'OrderPaid':
        if (this.state.status !== 'created') throw new InvalidTransitionError()
        this.state = { ...this.state, status: 'paid', version: event.sequence }
        break
      case 'OrderShipped':
        if (this.state.status !== 'paid') throw new InvalidTransitionError()
        this.state = { ...this.state, status: 'shipped', version: event.sequence }
        break
    }
  }
}

Ordering Rules Cheatsheet

ProblemSolution
Events from same entity on different partitionsUse entity ID as Kafka partition key
Concurrent consumer workers on same partitioneachMessage (not parallel batch)
Events arrive before their predecessorsRetry with delay; buffer for short windows
Consumer applies stale updateOptimistic concurrency with version column
Need full auditabilityEvent sourcing with ordered event log

Conclusion

Event ordering bugs are subtle and only appear under specific failure conditions — consumer crashes, partition rebalances, or network delays. The foundation is correct: use entity ID as the Kafka partition key to co-locate all events for the same entity. Within consumers, process events sequentially. Add version numbers to detect gaps, optimistic concurrency to reject out-of-order updates, and delayed retries for events that arrive before their predecessors. For the strongest guarantees, event sourcing provides an ordered, immutable log that always reconstructs correct state.