Published on

Message Ordering at Scale — When You Need Order and When You Don't

Authors

Introduction

Global message ordering across millions of events kills throughput. But ordering within an entity (all events for customer X) is critical for correctness. FIFO queues guarantee order but severely limit parallelism. Kafka's partition-based approach offers a sweet spot: order per entity, parallelism across entities. We'll explore ordering strategies, detect out-of-order messages, and know when order doesn't matter.

SQS Standard vs FIFO

SQS Standard: Unlimited throughput, no ordering guarantees, messages can be delivered out of order or duplicated.

class StandardQueueConsumer {
  async sendMessage(queueUrl: string, body: any): Promise<void> {
    // Fast, no ordering guarantees
    await this.sqs.sendMessage({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(body),
    });
  }

  async processMessages(queueUrl: string): Promise<void> {
    // Consumer 1 might process msg A then msg B
    // Consumer 2 might process msg B then msg A
    // Messages are delivered at least once, possibly out of order

    const result = await this.sqs.receiveMessage({
      QueueUrl: queueUrl,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20,
    });

    for (const message of result.Messages || []) {
      await this.processMessage(message);

      // Delete only after processing succeeds
      await this.sqs.deleteMessage({
        QueueUrl: queueUrl,
        ReceiptHandle: message.ReceiptHandle,
      });
    }
  }

  private async processMessage(message: any): Promise<void> {
    // Must be idempotent; could see same message multiple times
  }
}

SQS FIFO: Guaranteed ordering per message group, but lower throughput (300 msgs/sec).

class FIFOQueueConsumer {
  async sendMessage(queueUrl: string, customerId: string, body: any): Promise<void> {
    // Group ID ensures ordering within group
    await this.sqs.sendMessage({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(body),
      MessageGroupId: customerId, // All msgs for this customer are ordered
      MessageDeduplicationId: `${customerId}-${body.txId}`, // Dedup key
    });
  }

  async processMessages(queueUrl: string): Promise<void> {
    // Exactly one consumer processes messages from a group at a time
    // Messages from same group are delivered in order
    // Max 300 messages/sec across all groups

    const result = await this.sqs.receiveMessage({
      QueueUrl: queueUrl,
      MaxNumberOfMessages: 1, // FIFO processes one at a time per group
      WaitTimeSeconds: 20,
    });

    for (const message of result.Messages || []) {
      const body = JSON.parse(message.Body);
      await this.processMessage(body);

      await this.sqs.deleteMessage({
        QueueUrl: queueUrl,
        ReceiptHandle: message.ReceiptHandle,
      });
    }
  }

  private async processMessage(message: any): Promise<void> {
    // Guaranteed to see messages in order for same customer
  }
}

Tradeoff: Standard queues are 40x faster but require idempotent consumers and tolerate out-of-order. FIFO is slow but guarantees order.

Kafka Partition Keys for Per-Entity Ordering

Kafka partitions ensure ordering; partition keys route related messages to same partition.

class KafkaOrdering {
  async publishOrderEvents(customerId: string, events: any[]): Promise<void> {
    const producer = this.kafka.producer();
    await producer.connect();

    // Use customer ID as partition key
    // All events for this customer go to same partition
    // Guaranteed to be processed in order
    const messages = events.map(event => ({
      key: customerId, // Critical: same key = same partition
      value: JSON.stringify(event),
      headers: {
        'content-type': 'application/json',
        'event-type': event.type,
      },
    }));

    await producer.send({
      topic: 'customer-events',
      messages,
    });

    await producer.disconnect();
  }

  async consumeOrderedEvents(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'order-processor' });
    await consumer.connect();
    await consumer.subscribe({ topic: 'customer-events' });

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const customerId = message.key!.toString();
        const event = JSON.parse(message.value!.toString());

        // Partition assignment ensures one consumer per partition
        // Events for same customer are processed sequentially
        await this.processEvent(customerId, event);

        console.log(`Processed ${event.type} for customer ${customerId}`);
      },
    });
  }

  private async processEvent(customerId: string, event: any): Promise<void> {
    // Implementation
  }
}

Important: Choose partition keys that distribute load evenly:

class PartitionKeyStrategy {
  // Good: distributes across many partitions
  getKeyForOrder(orderId: string, customerId: string): string {
    return customerId; // Many customers = many partitions
  }

  // Bad: hot partition
  getKeyForUser(userId: string): string {
    return userId; // One user gets all their events, can overload partition
  }

  // Better: shard by user + tenant
  getKeyForUserEvent(userId: string, tenantId: string): string {
    return `${tenantId}:${userId}`;
  }

  // For truly large single entities, use composite key with randomization
  getKeyForLargeEntity(entityId: string, shardCount: number = 10): string {
    // Spread large entity across multiple partitions
    const shard = Math.floor(Math.random() * shardCount);
    return `${entityId}:${shard}`;
  }
}

Sequence Numbers with Gap Detection

When ordering is critical, include sequence numbers and detect gaps.

class SequenceNumbering {
  async publishWithSequence(customerId: string, event: any): Promise<void> {
    const producer = this.kafka.producer();

    // Get next sequence number
    const nextSeq = await this.db.query(
      `SELECT COALESCE(MAX(sequence), 0) + 1 as next_seq
       FROM events
       WHERE customer_id = $1`,
      [customerId]
    );

    const sequence = nextSeq.rows[0].next_seq;

    // Publish with sequence number
    await producer.send({
      topic: 'ordered-events',
      messages: [
        {
          key: customerId,
          value: JSON.stringify({
            ...event,
            sequence,
            timestamp: Date.now(),
          }),
        },
      ],
    });

    await producer.disconnect();
  }

  async consumeWithSequenceValidation(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'validator' });
    const customerSequences = new Map<string, number>();

    await consumer.subscribe({ topic: 'ordered-events' });

    await consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value!.toString());
        const customerId = message.key!.toString();

        const lastSeq = customerSequences.get(customerId) || 0;
        const expectedSeq = lastSeq + 1;

        if (event.sequence === expectedSeq) {
          // In order
          await this.processEvent(event);
          customerSequences.set(customerId, event.sequence);
        } else if (event.sequence < expectedSeq) {
          // Duplicate or late message
          console.log(`Duplicate message: expected ${expectedSeq}, got ${event.sequence}`);
        } else {
          // Gap detected
          console.error(
            `Gap in sequence for ${customerId}: expected ${expectedSeq}, got ${event.sequence}`
          );
          // Buffer out-of-order message (see next section)
          await this.bufferOutOfOrder(customerId, event);
        }
      },
    });
  }

  private async bufferOutOfOrder(customerId: string, event: any): Promise<void> {
    // Implementation
  }

  private async processEvent(event: any): Promise<void> {
    // Implementation
  }
}

Out-of-Order Message Buffering

When messages arrive out of order (network delay, retries), buffer and process in sequence.

class OutOfOrderBuffer {
  private buffers = new Map<string, PriorityQueue>();

  async handleMessage(customerId: string, event: any): Promise<void> {
    if (!this.buffers.has(customerId)) {
      this.buffers.set(customerId, new PriorityQueue());
    }

    const buffer = this.buffers.get(customerId)!;
    buffer.enqueue(event, event.sequence); // Sort by sequence

    // Try to process buffered messages
    await this.drainBuffer(customerId);
  }

  private async drainBuffer(customerId: string): Promise<void> {
    const buffer = this.buffers.get(customerId)!;
    const expectedSeq = (await this.getLastProcessedSequence(customerId)) + 1;

    while (!buffer.isEmpty()) {
      const event = buffer.peek();

      if (event.sequence === expectedSeq) {
        // Next expected message, process it
        buffer.dequeue();
        await this.processEvent(customerId, event);
        await this.recordProcessed(customerId, event.sequence);
      } else if (event.sequence < expectedSeq) {
        // Duplicate, discard
        buffer.dequeue();
      } else {
        // Gap; can't process yet
        break;
      }
    }

    // Clean up empty buffers
    if (buffer.isEmpty()) {
      this.buffers.delete(customerId);
    }
  }

  private async getLastProcessedSequence(customerId: string): Promise<number> {
    const result = await this.db.query(
      `SELECT COALESCE(MAX(sequence), 0) as last_seq
       FROM processed_events
       WHERE customer_id = $1`,
      [customerId]
    );
    return result.rows[0].last_seq;
  }

  private async processEvent(customerId: string, event: any): Promise<void> {
    // Implementation
  }

  private async recordProcessed(customerId: string, sequence: number): Promise<void> {
    await this.db.query(
      `INSERT INTO processed_events (customer_id, sequence, processed_at)
       VALUES ($1, $2, NOW())`,
      [customerId, sequence]
    );
  }
}

class PriorityQueue {
  private items: Array<{ value: any; priority: number }> = [];

  enqueue(value: any, priority: number): void {
    this.items.push({ value, priority });
    this.items.sort((a, b) => a.priority - b.priority);
  }

  dequeue(): any {
    return this.items.shift()?.value;
  }

  peek(): any {
    return this.items[0]?.value;
  }

  isEmpty(): boolean {
    return this.items.length === 0;
  }
}

Idempotent Processing as Alternative to Strict Ordering

Often you can avoid the cost of strict ordering by making operations idempotent and commutative.

class IdempotentOperations {
  // Bad: order-dependent
  // Transfer $100, then transfer $50
  // If reversed: $50 first, then $100—different final balance
  badApproach(): void {
    // Don't do this with unordered messages
  }

  // Good: idempotent, commutative
  // Set balance to $X (idempotent, timestamp-based wins)
  async updateBalance(accountId: string, newBalance: number, timestamp: number): Promise<void> {
    const result = await this.db.query(
      `UPDATE accounts
       SET balance = $1, balance_updated_at = $2
       WHERE id = $3 AND balance_updated_at < $2
       RETURNING balance`,
      [newBalance, new Date(timestamp), accountId]
    );

    if (result.rows.length === 0) {
      // Update was from stale message, ignore it
      return;
    }
  }

  // Or use compensating transactions
  async applyCredit(accountId: string, amount: number, txId: string): Promise<void> {
    const existing = await this.db.query(
      `SELECT id FROM transactions WHERE tx_id = $1`,
      [txId]
    );

    if (existing.rows.length > 0) {
      return; // Already applied
    }

    await this.db.query(
      `INSERT INTO transactions (tx_id, account_id, amount, type, applied_at)
       VALUES ($1, $2, $3, 'credit', NOW())`,
      [txId, accountId, amount]
    );

    // Sum all transactions for account
    const result = await this.db.query(
      `SELECT SUM(amount) as balance FROM transactions WHERE account_id = $1`,
      [accountId]
    );

    await this.db.query(
      `UPDATE accounts SET balance = $1 WHERE id = $2`,
      [result.rows[0].balance, accountId]
    );
  }
}

Global Order vs Per-Entity Order

Global order: All messages across entire system in order. Impossible to scale; don't do it.

Per-entity order: All messages for entity X in order. Achievable with Kafka partitions.

No required order: Messages can arrive in any order; make operations commutative/idempotent.

class OrderingRequirements {
  // Global order: DON'T
  // "Event 1, event 2, event 3 for ALL users must be globally sequenced"
  // Kills throughput
  dontDoGlobalOrdering(): void {
    // Don't implement
  }

  // Per-entity order: DO
  // "For customer X, event 1 must be processed before event 2"
  // Achievable with partition keys
  async eventStream(customerId: string, events: any[]): Promise<void> {
    const producer = this.kafka.producer();

    // All events for customer X go to partition hash(customerId)
    await producer.send({
      topic: 'customer-events',
      messages: events.map((e, i) => ({
        key: customerId, // Same partition
        value: JSON.stringify({ ...e, sequence: i + 1 }),
      })),
    });
  }

  // No required order: DO for scale
  // "Metrics can arrive in any order; we aggregate them"
  async publishMetrics(metrics: any[]): Promise<void> {
    const producer = this.kafka.producer();

    // No key = random partition = parallel processing
    await producer.send({
      topic: 'metrics',
      messages: metrics.map(m => ({
        value: JSON.stringify(m),
      })),
    });
  }
}

Testing Message Ordering

describe('MessageOrdering', () => {
  it('should preserve order for same partition key', async () => {
    const producer = kafka.producer();
    const consumer = kafka.consumer({ groupId: 'test' });

    const customerId = 'customer-123';
    const messages: string[] = [];

    // Send ordered messages
    await producer.send({
      topic: 'test-topic',
      messages: Array.from({ length: 100 }, (_, i) => ({
        key: customerId,
        value: `message-${i}`,
      })),
    });

    await consumer.subscribe({ topic: 'test-topic' });
    await consumer.run({
      eachMessage: async ({ message }) => {
        messages.push(message.value!.toString());
      },
    });

    await sleep(2000);

    // Verify order
    expect(messages).toEqual(Array.from({ length: 100 }, (_, i) => `message-${i}`));
  });

  it('should detect sequence gaps', async () => {
    const buffer = new OutOfOrderBuffer();
    const events: any[] = [];

    buffer.onEvent = (e: any) => events.push(e);

    // Send out of order
    await buffer.handleMessage('cust-1', { sequence: 1, data: 'a' });
    await buffer.handleMessage('cust-1', { sequence: 3, data: 'c' });
    await buffer.handleMessage('cust-1', { sequence: 2, data: 'b' });

    expect(events.map(e => e.sequence)).toEqual([1, 2, 3]);
  });
});

Checklist

  • Use partition keys to ensure per-entity ordering in Kafka
  • Choose FIFO only if ordering is critical and throughput < 300 msgs/sec
  • Include sequence numbers for critical event streams
  • Detect gaps and buffer out-of-order messages if needed
  • Make operations idempotent to avoid strict ordering dependency
  • Avoid global ordering at all costs (use per-entity or none)
  • Test message ordering under network delays and retries
  • Monitor partition key distribution for hot partition detection
  • Document which event streams require ordering vs don't

Conclusion

Global message ordering kills throughput and is rarely necessary. Per-entity ordering (via Kafka partition keys) is achievable and often sufficient. When possible, make operations idempotent and commutative—a message arriving out of order shouldn't require buffering and replaying. Reserve strict ordering for truly critical workflows; for metrics, logs, and analytics, embrace lack of order and aggregate robustly.