Published on

Message Queue Patterns — SQS, Kafka, and BullMQ for Different Workloads

Authors

Introduction

Message queues decouple producers and consumers, enabling scalable async processing. Yet teams often choose the wrong queue for their workload or misunderstand delivery semantics. This post covers when to use SQS (fire-and-forget), Kafka (event streaming), and BullMQ (in-process jobs), with production patterns for each.

When to Use Message Queues vs Direct Calls

Know the trade-offs:

// DIRECT API CALL
// Fast, simple, synchronous
async function processOrderDirect(orderId: string) {
  // Customer waits for processing (5 seconds)
  const invoice = await generateInvoice(orderId);
  await sendEmailWithInvoice(invoice);
  return invoice;
}

// PROBLEM: If email service slow (10s), customer waits 10s total
// PROBLEM: If email fails, entire request fails (no retry logic)

// MESSAGE QUEUE
// Fast response, async processing, decoupled
async function processOrderWithQueue(orderId: string) {
  // 1. Acknowledge receipt immediately (100ms)
  await queue.enqueue('generate-invoice', { orderId });
  return { status: 'processing' };
}

// 2. Background workers process asynchronously (5-10s later)
// 3. If service fails, queue retries automatically

// USE MESSAGE QUEUE IF:
// - Processing takes >1 second (don't block customer)
// - Need retries (payment processing, email sending)
// - Want to batch process (collect events, process in chunks)
// - Load spikes (queue buffers burst traffic)
// - Need audit trail (events are stored)

// USE DIRECT CALL IF:
// - Processing <100ms (synchronous response expected)
// - Need immediate result (query data)
// - Simple request-response (API calls)

Queues for async work. Direct calls for synchronous responses.

SQS FIFO for Ordered Processing

SQS FIFO guarantees exactly-once, in-order delivery within a message group:

import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  SendMessageBatchCommand,
} from '@aws-sdk/client-sqs';

const sqs = new SQSClient({ region: 'us-east-1' });

// Send message to FIFO queue
async function enqueueOrderEvent(orderId: string, event: string) {
  const command = new SendMessageCommand({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/orders.fifo',
    MessageBody: JSON.stringify({ orderId, event, timestamp: new Date() }),
    MessageGroupId: orderId,  // Ensure ordering by orderId
    MessageDeduplicationId: `${orderId}-${event}-${Date.now()}`,  // Prevent duplicates
  });

  const response = await sqs.send(command);
  console.log(`Message sent: ${response.MessageId}`);
}

// FIFO guarantees:
// - Messages from same group (orderId) processed in order
// - Each message processed at least once (no loss)
// - Deduplication within 5 minutes (same MessageDeduplicationId = ignored)

// Consume messages (with visibility timeout)
async function processOrderEvents(queueUrl: string) {
  const command = new ReceiveMessageCommand({
    QueueUrl: queueUrl,
    MaxNumberOfMessages: 10,
    VisibilityTimeout: 30,  // 30 seconds to process before requeue
    WaitTimeSeconds: 20,  // Long polling (wait up to 20s for messages)
  });

  const response = await sqs.send(command);

  if (!response.Messages) {
    console.log('No messages');
    return;
  }

  for (const message of response.Messages) {
    try {
      const body = JSON.parse(message.Body!);
      console.log(`Processing: ${body.event} for order ${body.orderId}`);

      // Process message (must complete before visibility timeout)
      await processEvent(body);

      // Delete message after successful processing
      const deleteCommand = new DeleteMessageCommand({
        QueueUrl: queueUrl,
        ReceiptHandle: message.ReceiptHandle!,
      });

      await sqs.send(deleteCommand);
      console.log(`Message deleted: ${message.MessageId}`);
    } catch (err) {
      console.error('Error processing message:', err);
      // Do NOT delete message; it will be retried when visibility timeout expires
    }
  }
}

// Batch send for throughput
async function batchEnqueueOrderEvents(events: Array<{ orderId: string; event: string }>) {
  const entries = events.map((e, i) => ({
    Id: i.toString(),
    MessageBody: JSON.stringify(e),
    MessageGroupId: e.orderId,
    MessageDeduplicationId: `${e.orderId}-${e.event}-${Date.now()}`,
  }));

  const command = new SendMessageBatchCommand({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/orders.fifo',
    Entries: entries,
  });

  const response = await sqs.send(command);
  console.log(`Sent ${response.Successful?.length} messages`);
}

FIFO queues guarantee ordering within a group and exactly-once delivery.

DLQ Setup and Monitoring

Dead Letter Queues capture failed messages for debugging:

import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
} from '@aws-sdk/client-sqs';

const sqs = new SQSClient({ region: 'us-east-1' });

// Main queue setup with DLQ
const MAIN_QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/account-id/orders';
const DLQ_URL = 'https://sqs.us-east-1.amazonaws.com/account-id/orders-dlq';

async function processMessageWithDLQ(
  queueUrl: string,
  dlqUrl: string,
  maxRetries: number = 3
) {
  const command = new ReceiveMessageCommand({
    QueueUrl: queueUrl,
    MaxNumberOfMessages: 1,
    VisibilityTimeout: 60,
    MessageAttributeNames: ['ApproximateReceiveCount'],
  });

  const response = await sqs.send(command);

  if (!response.Messages) return;

  const message = response.Messages[0];
  const receiveCount = parseInt(
    message.MessageAttributes?.ApproximateReceiveCount?.StringValue || '0'
  );

  try {
    const body = JSON.parse(message.Body!);
    console.log(`Processing (attempt ${receiveCount}): ${body.event}`);

    await processEvent(body);

    // Success: delete from queue
    await sqs.send(
      new DeleteMessageCommand({
        QueueUrl: queueUrl,
        ReceiptHandle: message.ReceiptHandle!,
      })
    );
  } catch (err) {
    console.error(`Error processing message (attempt ${receiveCount}):`, err);

    if (receiveCount >= maxRetries) {
      // Max retries exceeded: send to DLQ
      console.error(`Moving to DLQ after ${maxRetries} retries`);

      await sqs.send(
        new SendMessageCommand({
          QueueUrl: dlqUrl,
          MessageBody: message.Body!,
          MessageAttributes: {
            FailureReason: {
              StringValue: err.message,
              DataType: 'String',
            },
            OriginalQueueUrl: {
              StringValue: queueUrl,
              DataType: 'String',
            },
          },
        })
      );

      // Delete from main queue
      await sqs.send(
        new DeleteMessageCommand({
          QueueUrl: queueUrl,
          ReceiptHandle: message.ReceiptHandle!,
        })
      );
    }
    // else: don't delete, message will be retried when visibility timeout expires
  }
}

// Monitor DLQ for stuck messages
async function monitorDLQ() {
  const command = new ReceiveMessageCommand({
    QueueUrl: DLQ_URL,
    MaxNumberOfMessages: 10,
    MessageAttributeNames: ['All'],
  });

  const response = await sqs.send(command);

  if (response.Messages) {
    console.warn(`WARNING: ${response.Messages.length} messages in DLQ`);
    for (const msg of response.Messages) {
      console.warn(`Message: ${msg.MessageId}`);
      console.warn(`  Reason: ${msg.MessageAttributes?.FailureReason?.StringValue}`);
      console.warn(`  Original: ${msg.MessageAttributes?.OriginalQueueUrl?.StringValue}`);
    }
  }
}

// Scheduled check (CloudWatch Events)
setInterval(monitorDLQ, 60000);  // Check every minute

DLQs catch messages that fail repeatedly. Monitor them regularly.

Kafka for Event Streaming

Kafka persists events and supports consumer groups for scalable processing:

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
  retry: {
    initialRetryTime: 100,
    retries: 8,
    maxRetryTime: 30000,
  },
});

const producer = kafka.producer({
  idempotent: true,  // Exactly-once semantics
});

const consumer = kafka.consumer({
  groupId: 'order-processors',
  sessionTimeout: 30000,
});

// Produce events (publish to topic)
async function publishOrderEvent(orderId: string, event: object) {
  await producer.connect();

  const result = await producer.send({
    topic: 'order-events',
    messages: [
      {
        key: orderId,  // Partition key: same order always goes to same partition
        value: JSON.stringify(event),
        timestamp: Date.now().toString(),
        headers: {
          'event-type': 'order-placed',
          'source': 'order-service',
        },
      },
    ],
  });

  console.log(`Event published to partition ${result[0].partition}`);
}

// Consume events (subscribe to topic)
async function startEventConsumer() {
  await consumer.connect();

  // Subscribe to topic (consumer group handles partitioning)
  await consumer.subscribe({
    topic: 'order-events',
    fromBeginning: false,  // Start from new messages
  });

  // Process messages
  await consumer.run({
    partitionsConsumedConcurrently: 3,  // Process 3 partitions in parallel
    eachMessage: async ({ topic, partition, message }) => {
      try {
        const event = JSON.parse(message.value?.toString() || '{}');
        const orderId = message.key?.toString();

        console.log(`[P${partition}] Processing order ${orderId}: ${event.type}`);

        // Process event
        await processEvent(event);
      } catch (err) {
        console.error('Error processing event:', err);
        // Consumer framework handles retries automatically
      }
    },
  });
}

// Kafka guarantees per partition:
// - Messages in order within partition
// - Each message offset increases
// - Offset tracking for restart recovery

// Multiple consumers in same group: Kafka auto-distributes partitions
// Example: 3-partition topic, 2 consumers
// - Consumer 1 handles partitions [0, 1]
// - Consumer 2 handles partition [2]

// Rebalancing on consumer add/remove: automatic

// Consumer group offset management
async function resetConsumerOffset(groupId: string, topic: string) {
  const admin = kafka.admin();
  await admin.connect();

  // Reset to beginning of topic
  await admin.setOffsets({
    groupId,
    topic,
    partitions: [
      { partition: 0, offset: '0' },
      { partition: 1, offset: '0' },
      { partition: 2, offset: '0' },
    ],
  });

  await admin.disconnect();
}

Kafka scales horizontally: more partitions = more parallel consumers.

BullMQ for In-Process Jobs

BullMQ is Redis-backed task queue for in-process worker pools:

import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: null,  // Required for BullMQ
});

// Create queue
const invoiceQueue = new Queue('invoice-generation', { connection });
const emailQueue = new Queue('email-sending', { connection });

// Enqueue job
async function generateInvoice(orderId: string) {
  const job = await invoiceQueue.add(
    'create',
    { orderId, timestamp: new Date() },
    {
      priority: 10,  // Higher number = higher priority
      delay: 5000,  // Wait 5 seconds before processing
      attempts: 3,  // Retry up to 3 times on failure
      backoff: {
        type: 'exponential',
        delay: 2000,  // Start 2s, then 4s, 8s
      },
      removeOnComplete: true,  // Delete after success
      removeOnFail: false,  // Keep failed jobs for debugging
    }
  );

  console.log(`Job enqueued: ${job.id}`);
  return job;
}

// Process jobs (worker)
const invoiceWorker = new Worker('invoice-generation', async (job) => {
  console.log(`[${job.id}] Generating invoice for order ${job.data.orderId}`);

  // Actual work
  const invoice = await createInvoice(job.data.orderId);

  // Report progress
  job.progress(50);
  console.log(`[${job.id}] Invoice created, sending email`);

  await sendEmail(invoice);

  job.progress(100);
  return { invoiceId: invoice.id };
}, { connection, concurrency: 5 });  // 5 jobs in parallel

// Job lifecycle events
invoiceWorker.on('completed', (job) => {
  console.log(`Job ${job.id} completed with result:`, job.returnvalue);
});

invoiceWorker.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
  // Send alert if consistently failing
});

invoiceWorker.on('error', (err) => {
  console.error('Worker error:', err);
});

// Monitor queue status
async function monitorQueue() {
  const counts = await invoiceQueue.getJobCounts(
    'wait',
    'active',
    'completed',
    'failed'
  );

  console.log('Queue status:', counts);
  // Output: { wait: 10, active: 2, completed: 1000, failed: 5 }

  if (counts.failed > 0) {
    console.warn(`WARNING: ${counts.failed} failed jobs`);
  }
}

// Job dependencies (process invoice, then send email)
async function generateAndNotify(orderId: string) {
  const invoiceJob = await invoiceQueue.add(
    'create',
    { orderId }
  );

  const emailJob = await emailQueue.add(
    'send',
    { orderId, invoiceId: 'temp' },
    {
      // Wait for invoiceJob to complete
      depends: [{ id: invoiceJob.id, parent: invoiceQueue }],
    }
  );

  return { invoiceJobId: invoiceJob.id, emailJobId: emailJob.id };
}

BullMQ excels for in-process work: background jobs, batch processing, scheduled tasks.

Visibility Timeout and Message Deduplication

Handle retries and duplicates correctly:

// SQS VISIBILITY TIMEOUT
// Message invisible for N seconds, then reappears if not deleted

// Scenario: message processing takes 10 seconds
const VISIBILITY_TIMEOUT = 30;  // 30 seconds

async function processWithTimeout(message: AWSMessage) {
  try {
    // Start processing (1 second elapsed)
    await longRunningTask();  // 10 seconds

    // Task complete, delete message (11 seconds elapsed)
    await sqs.deleteMessage(message);
  } catch (err) {
    // Error at second 9, message still invisible for 21 more seconds
    // After 30 seconds total, message reappears for retry
  }
}

// PROBLEM: If visibility timeout too short
// Message reappears mid-processing, processed twice

// SOLUTION: Dynamically extend visibility timeout
async function processWithExtendedTimeout(message: AWSMessage) {
  try {
    // Process part 1
    await part1();

    // Extend visibility for more time
    await sqs.changeMessageVisibility({
      QueueUrl: queueUrl,
      ReceiptHandle: message.ReceiptHandle,
      VisibilityTimeout: 60,  // Now have 60 more seconds
    });

    // Process part 2
    await part2();

    // Delete on success
    await sqs.deleteMessage(message);
  } catch (err) {
    // On error, don't delete; will retry
  }
}

// MESSAGE DEDUPLICATION
// Prevent processing same message twice

// SQS FIFO deduplication
// Option 1: Content-based (5-minute window)
const msgDupId = sha256(JSON.stringify(event));
sqs.sendMessage({
  MessageDeduplicationId: msgDupId,  // Same content = same ID
});

// Option 2: Timestamp-based (prevent retries in short window)
const msgDupId = `${orderId}-${timestamp}`;
sqs.sendMessage({
  MessageDeduplicationId: msgDupId,
});

// Application-level deduplication (Kafka, custom queue)
const processedIds = new Set();

async function processEvent(event: any) {
  const eventId = `${event.orderId}-${event.timestamp}`;

  // Check if processed
  if (processedIds.has(eventId)) {
    console.log('Already processed, skipping');
    return;
  }

  // Process event
  await applyEvent(event);

  // Mark as processed (persist to database for durability)
  await db.query(
    'INSERT INTO processed_events (event_id) VALUES ($1)',
    [eventId]
  );

  processedIds.add(eventId);
}

Visibility timeout and deduplication prevent duplicate processing.

Backpressure Handling

Don't let fast producers overwhelm slow consumers:

// PROBLEM: Producer enqueues 1000 messages/sec
// Consumer processes 100 messages/sec
// Queue grows unbounded, memory exhausted

// SOLUTION 1: Rate limit producer
async function enqueueWithBackpressure(event: any) {
  const queueSize = await queue.count();

  if (queueSize > 10000) {
    // Queue backed up, reject new events
    throw new Error('Queue full, try again later');
  }

  await queue.add(event);
}

// SOLUTION 2: Batch processing
// Collect events, process in bulk
const eventBuffer: any[] = [];
const BATCH_SIZE = 100;

async function bufferEvent(event: any) {
  eventBuffer.push(event);

  if (eventBuffer.length >= BATCH_SIZE) {
    const batch = eventBuffer.splice(0, BATCH_SIZE);
    await processBatch(batch);
  }
}

setInterval(async () => {
  if (eventBuffer.length > 0) {
    const batch = eventBuffer.splice(0, eventBuffer.length);
    await processBatch(batch);
  }
}, 5000);

// SOLUTION 3: Auto-scale consumers
// Add more workers when queue depth increases
async function autoScaleConsumers() {
  const queueSize = await queue.count();
  const activeWorkers = getActiveWorkerCount();

  if (queueSize > activeWorkers * 100) {
    // Queue backed up, add workers
    startWorker();
  } else if (queueSize < activeWorkers * 10 && activeWorkers > 1) {
    // Queue idle, remove workers
    stopWorker();
  }
}

setInterval(autoScaleConsumers, 10000);

Implement backpressure to prevent queue overflow.

Message Queue Patterns Checklist

  • Queue type chosen (SQS for simple async, Kafka for event streaming, BullMQ for in-process)
  • SQS: FIFO queue for ordering, with MessageDeduplicationId
  • SQS: Visibility timeout set appropriately (3x max processing time)
  • SQS: DLQ configured for failed messages
  • Kafka: partition key strategy (same orderId = same partition)
  • Kafka: consumer group manages scaling automatically
  • BullMQ: worker concurrency configured
  • Backpressure handled (rate limit or auto-scaling)
  • Poison pill detection (messages in DLQ/failed indefinitely)
  • Metrics tracked (queue depth, processing time, error rate)

Conclusion

Message queues solve different problems: SQS for fire-and-forget async work, Kafka for event streaming and audit trails, BullMQ for in-process background jobs. Get visibility timeout right (3x processing time) and implement DLQ for dead messages. Kafka's partition key strategy ensures ordering and parallel processing. Always implement backpressure to prevent queue overflow. Choose the right queue for your workload and your infrastructure scales smoothly.