Published on

Kafka Consumer Patterns — At-Least-Once, Exactly-Once, and Everything in Between

Authors

Introduction

Kafka is the backbone of event-driven systems, but consuming from Kafka reliably is non-trivial. Offsets determine where you resume after failure. Consumer groups coordinate work across multiple instances. Rebalancing pauses processing. You must choose between at-least-once (simple, requires idempotent consumers) and exactly-once (complex, lower throughput). We'll navigate these tradeoffs and build resilient consumers.

Consumer Groups and Partition Assignment

Kafka partitions ensure ordering within a partition, and consumer groups ensure each partition is consumed by exactly one consumer.

import { Kafka, logLevel } from 'kafkajs';

class ConsumerGroupExample {
  private kafka = new Kafka({
    clientId: 'order-processor',
    brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
    logLevel: logLevel.ERROR,
    retry: {
      initialRetryTime: 100,
      retries: 8,
      maxRetryTime: 30000,
      randomizationFactor: 0.2,
      multiplier: 2,
    },
  });

  private consumer = this.kafka.consumer({
    groupId: 'order-processor-group',
    sessionTimeout: 30000,
    heartbeatInterval: 3000,
    rebalanceTimeout: 60000,
  });

  async startConsuming(): Promise<void> {
    await this.consumer.connect();

    // Subscribe to topic; Kafka assigns partitions automatically
    await this.consumer.subscribe({
      topic: 'orders',
      fromBeginning: false,
    });

    // Listen for rebalancing events
    this.consumer.on('consumer.rebalance', async event => {
      if (event.type === 'REBALANCE_IN_PROGRESS') {
        console.log('Rebalancing started...');
        // Pause processing if needed
      } else if (event.type === 'REBALANCE_FINISHED') {
        console.log('Rebalancing finished');
        console.log('Assigned partitions:', event.partitions);
        // Resume processing
      }
    });

    // Run message loop
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log(`Processing message on partition ${partition}`);
        await this.processMessage(message);
      },
    });
  }

  private async processMessage(message: any): Promise<void> {
    // Implementation
  }
}

Manual Offset Commit for At-Least-Once

At-least-once: commit offset AFTER processing succeeds. If processing fails, the consumer retries on restart.

class AtLeastOnceConsumer {
  private kafka = new Kafka({
    clientId: 'payment-processor',
    brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  });

  private consumer = this.kafka.consumer({
    groupId: 'payment-processor-group',
    allowAutoTopicCreation: false,
  });

  async start(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'payments' });

    await this.consumer.run({
      // Disable auto-commit; we'll commit manually
      autoCommitInterval: null,
      autoCommitThreshold: null,

      eachMessage: async ({ topic, partition, message }) => {
        try {
          // Process message
          const payment = JSON.parse(message.value!.toString());
          await this.processPayment(payment);

          // Only commit after successful processing
          await this.consumer.commitOffsets([
            {
              topic,
              partition,
              offset: (Number(message.offset) + 1).toString(),
            },
          ]);

          console.log(`Processed and committed offset: ${message.offset}`);
        } catch (error) {
          console.error(`Failed to process payment:`, error);
          // Don't commit; message will be retried on next startup
          throw error;
        }
      },
    });
  }

  private async processPayment(payment: any): Promise<void> {
    // Process payment; must be idempotent (can be called multiple times)
    const existing = await this.db.query(
      `SELECT id FROM payments WHERE idempotency_key = $1`,
      [payment.idempotencyKey]
    );

    if (existing.rows.length > 0) {
      return; // Already processed
    }

    await this.chargeCard(payment.customerId, payment.amount);
    await this.db.query(
      `INSERT INTO payments (idempotency_key, customer_id, amount, status)
       VALUES ($1, $2, $3, 'completed')`,
      [payment.idempotencyKey, payment.customerId, payment.amount]
    );
  }

  private async chargeCard(customerId: string, amount: number): Promise<void> {
    // Call payment provider
  }
}

Exactly-Once with Transactional Consumers

Exactly-once-semantics (EOS): use idempotent producer + transactional consumer. Complex but guarantees no duplicates or losses.

class ExactlyOnceConsumer {
  private kafka = new Kafka({
    clientId: 'ledger-processor',
    brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  });

  private consumer = this.kafka.consumer({
    groupId: 'ledger-processor-group',
  });

  private admin = this.kafka.admin();

  async start(): Promise<void> {
    await this.consumer.connect();
    await this.admin.connect();

    // Set consumer isolation level to READ_COMMITTED
    // This ensures we only see messages from committed transactions
    const consumerConfig = {
      topic: 'transactions',
      fromBeginning: false,
      isolationLevel: 1, // READ_COMMITTED
    };
    await this.consumer.subscribe(consumerConfig);

    await this.consumer.run({
      eachBatch: async ({ payload }) => {
        // Process a batch atomically with offset commit
        const { topic, partition, messages, resolveOffset, heartbeat, isRunning } = payload;

        const batch = messages.map(msg => JSON.parse(msg.value!.toString()));

        try {
          // Process all messages in batch
          for (const message of batch) {
            await this.postTransaction(message);
          }

          // Commit all offsets atomically
          resolveOffset(messages[messages.length - 1].offset);

          // Send heartbeat to prevent session timeout during long processing
          await heartbeat();
        } catch (error) {
          console.error('Failed to process batch:', error);
          // Don't resolve offset; batch will be retried
          throw error;
        }
      },
    });
  }

  private async postTransaction(transaction: any): Promise<void> {
    // Idempotent operation: use transaction ID as key
    const existing = await this.db.query(
      `SELECT id FROM transactions WHERE tx_id = $1`,
      [transaction.txId]
    );

    if (existing.rows.length > 0) {
      return; // Already posted
    }

    // Use transaction for atomicity
    const client = await this.db.connect();
    try {
      await client.query('BEGIN ISOLATION LEVEL SERIALIZABLE');

      // Post to ledger
      await client.query(
        `INSERT INTO ledger (tx_id, account_id, amount, posted_at)
         VALUES ($1, $2, $3, NOW())`,
        [transaction.txId, transaction.accountId, transaction.amount]
      );

      // Update balance
      await client.query(
        `UPDATE accounts SET balance = balance + $1 WHERE id = $2`,
        [transaction.amount, transaction.accountId]
      );

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
}

Dead Letter Queue for Poison Messages

When a message consistently fails to process, move it to a DLQ instead of blocking the consumer.

class ConsumerWithDLQ {
  private maxRetries = 3;

  async startConsuming(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'order-processor-group' });
    const producer = this.kafka.producer();

    await consumer.connect();
    await producer.connect();

    await consumer.subscribe({ topic: 'orders' });

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const retryCount = this.getRetryCount(message);

        try {
          await this.processOrder(message);
          await consumer.commitOffsets([
            {
              topic,
              partition,
              offset: (Number(message.offset) + 1).toString(),
            },
          ]);
        } catch (error) {
          if (retryCount >= this.maxRetries) {
            // Move to DLQ
            console.error(
              `Message failed after ${this.maxRetries} retries, sending to DLQ:`,
              error
            );

            await producer.send({
              topic: 'orders-dlq',
              messages: [
                {
                  key: message.key,
                  value: message.value,
                  headers: {
                    'x-retry-count': retryCount.toString(),
                    'x-original-topic': topic,
                    'x-original-partition': partition.toString(),
                    'x-error-message': (error as Error).message,
                  },
                },
              ],
            });

            // Commit offset to move forward
            await consumer.commitOffsets([
              {
                topic,
                partition,
                offset: (Number(message.offset) + 1).toString(),
              },
            ]);
          } else {
            // Publish retry with incremented count
            await producer.send({
              topic, // Re-publish to same topic
              messages: [
                {
                  key: message.key,
                  value: message.value,
                  headers: {
                    ...message.headers,
                    'x-retry-count': (retryCount + 1).toString(),
                  },
                  timestamp: Date.now().toString(),
                },
              ],
            });
          }
        }
      },
    });
  }

  private getRetryCount(message: any): number {
    const header = message.headers?.['x-retry-count'];
    return header ? parseInt(header.toString()) : 0;
  }

  private async processOrder(message: any): Promise<void> {
    const order = JSON.parse(message.value!.toString());
    // Implementation; should throw on transient failures
  }
}

Consumer Group Pause/Resume

Pause consumption temporarily to handle backpressure or allow graceful shutdown.

class PausableConsumer {
  private running = true;

  async start(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'processor-group' });
    await consumer.connect();
    await consumer.subscribe({ topic: 'events' });

    const checksum = new BackpressureMonitor();

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        // Check if we're overloaded
        const queueDepth = checksum.getQueueDepth();
        if (queueDepth > 10000) {
          console.log('Pausing consumer due to backpressure...');
          await consumer.pause([{ topic }]);

          // Wait for queue to drain
          while (checksum.getQueueDepth() > 1000) {
            await sleep(1000);
          }

          console.log('Resuming consumer...');
          await consumer.resume([{ topic }]);
        }

        try {
          checksum.increment();
          await this.processEvent(message);
        } finally {
          checksum.decrement();
        }
      },
    });

    // Handle graceful shutdown
    process.on('SIGTERM', async () => {
      console.log('Shutting down gracefully...');
      this.running = false;
      await consumer.pause([{ topic: 'events' }]);
      await checksum.waitForDrain(5000);
      await consumer.disconnect();
    });
  }

  private async processEvent(message: any): Promise<void> {
    // Implementation
  }
}

class BackpressureMonitor {
  private queue = 0;

  increment(): void {
    this.queue++;
  }

  decrement(): void {
    this.queue--;
  }

  getQueueDepth(): number {
    return this.queue;
  }

  async waitForDrain(timeoutMs: number): Promise<void> {
    const startTime = Date.now();
    while (this.queue > 0 && Date.now() - startTime < timeoutMs) {
      await sleep(100);
    }
  }
}

Consumer Lag Monitoring

Track how far behind consumers are to detect issues early.

class LagMonitor {
  async monitorLag(): Promise<void> {
    const admin = this.kafka.admin();
    await admin.connect();

    setInterval(async () => {
      const consumer = this.kafka.consumer({ groupId: 'order-processor-group' });
      await consumer.connect();

      // Get offsets for all partitions in topic
      const topicOffsets = await admin.fetchTopicOffsets('orders');
      const consumerOffsets = await consumer.fetchOffsets('orders');

      for (const partition of topicOffsets) {
        const highWaterMark = parseInt(partition.high);
        const consumerOffset = parseInt(
          consumerOffsets.find(o => o.partition === partition.partition)?.offset || '0'
        );

        const lag = highWaterMark - consumerOffset;

        console.log(`Partition ${partition.partition} lag: ${lag} messages`);

        // Alert if lag is high
        if (lag > 10000) {
          await this.alertSlackChannel(
            `High consumer lag detected: ${lag} messages on partition ${partition.partition}`
          );
        }

        // Store in monitoring system
        await this.prometheus.histogram('kafka_consumer_lag', lag, {
          group: 'order-processor-group',
          partition: partition.partition.toString(),
        });
      }

      await consumer.disconnect();
    }, 30000); // Check every 30 seconds
  }

  private async alertSlackChannel(message: string): Promise<void> {
    // Implementation
  }

  private prometheus: any;
}

Batch Processing vs One-at-a-Time

Batch processing is more efficient; one-at-a-time is simpler.

// One-at-a-time (simple, low throughput)
await consumer.run({
  eachMessage: async ({ message }) => {
    await this.processMessage(message);
  },
});

// Batch processing (efficient, complex)
await consumer.run({
  eachBatchAutoResolve: false,
  eachBatch: async ({ payload }) => {
    const { messages, resolveOffset, heartbeat } = payload;

    const batch = messages.map(msg => JSON.parse(msg.value!.toString()));

    // Process in bulk
    await this.bulkInsert(batch);

    // Resolve all at once
    resolveOffset(messages[messages.length - 1].offset);

    // Send heartbeat for long-running batches
    await heartbeat();
  },
});

async function bulkInsert(batch: any[]): Promise<void> {
  // Use parameterized insert for efficiency
  const placeholders = batch.map((_, i) => `($${i * 2 + 1}, $${i * 2 + 2})`).join(',');
  const params = batch.flatMap(item => [item.id, JSON.stringify(item)]);

  await db.query(
    `INSERT INTO events (id, data) VALUES ${placeholders}
     ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data`,
    params
  );
}

Testing Kafka Consumers

describe('KafkaConsumer', () => {
  it('should process messages in order', async () => {
    const consumer = new AtLeastOnceConsumer(testKafka);
    const producer = testKafka.producer();

    await producer.connect();
    await producer.send({
      topic: 'orders',
      messages: [
        { key: 'order-1', value: JSON.stringify({ amount: 100 }) },
        { key: 'order-1', value: JSON.stringify({ amount: 200 }) },
        { key: 'order-1', value: JSON.stringify({ amount: 300 }) },
      ],
    });

    const processedOrders: any[] = [];
    jest.spyOn(consumer, 'processOrder').mockImplementation(async order => {
      processedOrders.push(order);
    });

    await consumer.start();
    await sleep(2000); // Wait for processing

    expect(processedOrders).toEqual([{ amount: 100 }, { amount: 200 }, { amount: 300 }]);
  });

  it('should retry on transient failure', async () => {
    const consumer = new ConsumerWithDLQ();
    let attempts = 0;

    jest.spyOn(consumer, 'processOrder').mockImplementation(async () => {
      attempts++;
      if (attempts < 3) {
        throw new Error('Transient failure');
      }
    });

    // Should eventually succeed after retries
    // DLQ should remain empty
  });
});

Checklist

  • Use consumer groups for distributed consumption
  • Choose at-least-once (simple) or exactly-once (complex) semantics
  • Commit offsets AFTER successful processing
  • Implement dead-letter queue for poison messages
  • Monitor consumer lag continuously
  • Use batch processing for throughput, one-at-a-time for simplicity
  • Test rebalancing and consumer failure scenarios
  • Implement heartbeats for long-running operations
  • Document retry policies and idempotency requirements
  • Alert on high consumer lag

Conclusion

Kafka consumers need careful design. Choose between at-least-once (requires idempotency) and exactly-once (adds complexity). Always commit offsets after processing succeeds. Use dead-letter queues for unprocesable messages. Monitor lag vigilantly—it's your early warning system. Test failure scenarios: consumer crashes, rebalancing, broker failures.