Message Queues — RabbitMQ and Bull Queue

Sanjeev SharmaSanjeev Sharma
2 min read

Advertisement

Message queues decouple services and ensure reliability. Essential for background jobs and inter-service communication.

Bull Queue (Redis-based)

npm install bull
import Queue from "bull";

// Create queue
const emailQueue = new Queue("emails", {
  redis: { host: "127.0.0.1", port: 6379 },
});

// Add job
await emailQueue.add({ to: "user@example.com", subject: "Hello" });

// Process jobs
emailQueue.process(async (job) => {
  console.log(`Sending email to ${job.data.to}`);
  await sendEmail(job.data);
  return { success: true };
});

// Listen for events
emailQueue.on("completed", (job) => {
  console.log(`Job ${job.id} completed`);
});

emailQueue.on("failed", (job, err) => {
  console.log(`Job ${job.id} failed: ${err.message}`);
});

RabbitMQ

npm install amqplib
import amqp from "amqplib";

const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();

// Producer
async function sendMessage(exchange: string, message: any) {
  await channel.assertExchange(exchange, "topic", { durable: true });
  channel.publish(
    exchange,
    "user.created",
    Buffer.from(JSON.stringify(message))
  );
}

// Consumer
async function consumeMessages(exchange: string, queue: string) {
  await channel.assertExchange(exchange, "topic", { durable: true });
  const q = await channel.assertQueue(queue, { durable: true });
  await channel.bindQueue(q.queue, exchange, "user.*");

  channel.consume(q.queue, (msg) => {
    if (msg) {
      console.log("Message:", msg.content.toString());
      channel.ack(msg);
    }
  });
}

Job Retry and Scheduling

const jobQueue = new Queue("jobs");

// Retry failed jobs
jobQueue.add(
  { data: "test" },
  {
    attempts: 3, // Retry 3 times
    backoff: {
      type: "exponential",
      delay: 2000, // Start with 2 seconds
    },
  }
);

// Delayed job
await jobQueue.add(
  { data: "delayed" },
  {
    delay: 60000, // Delay 1 minute
  }
);

// Recurring job
await jobQueue.add(
  {},
  {
    repeat: {
      cron: "0 * * * *", // Every hour
    },
  }
);

FAQ

Q: Should I use Bull or RabbitMQ? A: Bull for simple jobs with Redis. RabbitMQ for complex multi-service messaging.

Q: What if Redis goes down? A: Bull loses jobs. Use RabbitMQ with persistence for reliability.

Q: How do I monitor queues? A: Bull UI for visualization. RabbitMQ Management interface for monitoring.


Message queues are essential for scalable backend systems. Choose Bull for simplicity, RabbitMQ for reliability.

Advertisement

Sanjeev Sharma

Written by

Sanjeev Sharma

Full Stack Engineer · E-mopro