Published on

No Backpressure Mechanism — When Fast Producers Drown Slow Consumers

Authors

Introduction

Backpressure is what happens when a fast producer overwhelms a slow consumer and you handle it gracefully — slowing down or buffering the producer instead of losing data or crashing. Without it, the mismatch between production rate and consumption rate either fills memory until the process crashes or drops events silently.

The Problem Without Backpressure

// ❌ No backpressure — incoming events buffered in memory without bound
const events: Event[] = []

eventSource.on('data', (event) => {
  events.push(event)  // Producer runs at 10,000/s
})

// Consumer runs at 500/s
setInterval(async () => {
  const batch = events.splice(0, 500)
  await db.insertBatch(batch)  // 500 inserts/second
}, 1000)

// After 10 seconds: 95,000 events in memory (10,000 - 500) × 10
// After 60 seconds: 570,000 events in memory
// After memory exhaustion: process crashes, ALL buffered events lost

Fix 1: Node.js Streams with Backpressure

import { Writable, Transform, pipeline } from 'stream'
import { promisify } from 'util'

const pipelineAsync = promisify(pipeline)

// Writable stream that respects backpressure
class DatabaseBatchWriter extends Writable {
  private batch: Event[] = []
  private readonly batchSize = 100

  constructor() {
    super({ objectMode: true, highWaterMark: 500 })  // buffer max 500 objects
  }

  async _write(event: Event, _encoding: string, callback: (error?: Error) => void) {
    this.batch.push(event)

    if (this.batch.length >= this.batchSize) {
      try {
        await db.insertBatch(this.batch)
        this.batch = []
        callback()  // signal: ready for more data
      } catch (err) {
        callback(err as Error)  // signal: error, stop pipeline
      }
    } else {
      callback()  // signal: ready for more data immediately
    }
  }

  async _final(callback: () => void) {
    // Flush remaining batch
    if (this.batch.length > 0) {
      await db.insertBatch(this.batch)
    }
    callback()
  }
}

// The stream pipeline handles backpressure automatically
// When DatabaseBatchWriter's buffer is full (500 items), it signals
// upstream to pause — the source stops reading until there's space
await pipelineAsync(
  eventSource,          // fast producer
  new Transform({...}), // optional transform
  new DatabaseBatchWriter()  // slow consumer with backpressure
)

Fix 2: Queue with Bounded Workers

import PQueue from 'p-queue'

// Concurrency-limited queue — processes at most 10 tasks at once
const queue = new PQueue({
  concurrency: 10,     // max 10 concurrent workers
  intervalCap: 100,    // max 100 tasks per interval
  interval: 1000,      // interval in ms (rate limit: 100/second)
})

// Monitor queue depth — if it grows too large, apply backpressure
queue.on('add', () => {
  if (queue.size > 10000) {
    // Queue is backing up — stop accepting new work
    eventSource.pause()
    console.warn(`Queue depth: ${queue.size} — pausing source`)
  }
})

queue.on('next', () => {
  if (queue.size < 1000 && eventSource.isPaused()) {
    // Queue has drained — resume accepting work
    eventSource.resume()
  }
})

eventSource.on('data', (event) => {
  queue.add(() => processEvent(event))
})

Fix 3: Persistent Queue (Events Survive Crashes)

import Bull from 'bull'

// Unlike in-memory arrays, Bull persists jobs in Redis
// Survives process crashes, can be distributed across workers
const eventQueue = new Bull('events', {
  redis: process.env.REDIS_URL,
  defaultJobOptions: {
    attempts: 3,                   // retry on failure
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: 100,         // keep last 100 completed jobs
    removeOnFail: false,           // keep failed jobs for inspection
  },
})

// Producer — just enqueues, doesn't block
eventSource.on('data', async (event) => {
  await eventQueue.add(event, {
    // If queue is too deep, reject new jobs (backpressure at producer)
    ...(await eventQueue.count() > 50000 ? { delay: 5000 } : {}),
  })
})

// Consumer — controlled concurrency
eventQueue.process(10, async (job) => {  // 10 concurrent workers
  await processEvent(job.data)
})

// Monitor queue depth
setInterval(async () => {
  const waiting = await eventQueue.count()
  const active = await eventQueue.getActiveCount()
  if (waiting > 10000) {
    logger.warn({ waiting, active }, 'Event queue backing up')
  }
}, 30_000)

Fix 4: Load Shedding as Last Resort

// When truly overwhelmed, shed load gracefully (drop non-critical work)
// rather than crashing

const queue = new PQueue({ concurrency: 20 })

function shouldAcceptEvent(event: Event): boolean {
  // If queue is too deep, drop low-priority events
  if (queue.size > 50000) {
    // Keep: payment events, error events, critical business events
    if (event.type === 'payment' || event.type === 'error') return true
    // Drop: analytics, telemetry, low-priority notifications
    metrics.increment('events.dropped', { type: event.type })
    return false
  }
  return true
}

eventSource.on('data', (event) => {
  if (shouldAcceptEvent(event)) {
    queue.add(() => processEvent(event))
  }
})

Backpressure Checklist

  • ✅ Identify every producer/consumer pair in your system
  • ✅ Use Node.js streams with highWaterMark for data pipelines
  • ✅ Use a bounded queue (Bull/BullMQ with Redis) for job processing
  • ✅ Monitor queue depth — alert when it exceeds a safe threshold
  • ✅ Pause/resume the producer based on consumer queue depth
  • ✅ For truly critical events, use a persistent queue that survives crashes
  • ✅ Have a load-shedding strategy for non-critical work under extreme load

Conclusion

Backpressure is what separates a system that degrades gracefully from one that crashes when traffic spikes. Node.js streams handle it natively via highWaterMark — when the buffer fills, reads pause automatically. For job queues, use a bounded concurrent worker pool (PQueue, BullMQ) and monitor queue depth with automatic source pausing. The rule: never buffer unbounded data in memory. Either apply pressure back to the producer, or use a persistent queue that can handle spikes without data loss.