Published on

Domain Events in Node.js — Decoupling Business Logic With Events

Authors

Introduction

When an order is created, inventory decreases, emails are sent, analytics are updated. If these are all called synchronously, the Order aggregate is tightly coupled to everything. Domain events decouple this: the order emits "OrderCreated," and other parts of the system listen and react. The aggregate publishes events—external listeners decide what to do.

Domain Event as Value Object

Domain events are immutable value objects describing something that happened in the domain:

// src/domain/events/domain-event.ts
export interface DomainEvent {
  aggregateId: string
  aggregateType: string
  eventType: string
  version: number
  timestamp: Date
}

// src/domain/events/order-created.ts
export interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated'
  aggregateType: 'Order'
  data: {
    orderId: string
    userId: string
    items: Array<{
      productId: string
      quantity: number
      price: number
    }>
    total: number
  }
}

// src/domain/events/inventory-decremented.ts
export interface InventoryDecrementedEvent extends DomainEvent {
  eventType: 'InventoryDecremented'
  aggregateType: 'Inventory'
  data: {
    productId: string
    quantity: number
    reason: 'order_created' | 'damaged' | 'returned'
    relatedAggregateId: string  // orderId
  }
}

// Events are immutable
export class DomainEventFactory {
  static createOrderCreatedEvent(
    orderId: string,
    userId: string,
    items: OrderItem[],
    total: number
  ): OrderCreatedEvent {
    return {
      aggregateId: orderId,
      aggregateType: 'Order',
      eventType: 'OrderCreated',
      version: 1,
      timestamp: new Date(),
      data: {
        orderId,
        userId,
        items: items.map(item => ({
          productId: item.productId,
          quantity: item.quantity,
          price: item.price
        })),
        total
      }
    }
  }
}

Synchronous In-Process Dispatch (EventEmitter)

For simple cases, use Node's EventEmitter for synchronous in-memory dispatch:

// src/domain/event-bus.ts
import { EventEmitter } from 'events'
import type { DomainEvent } from './events/domain-event'

export class InProcessEventBus extends EventEmitter {
  publish<T extends DomainEvent>(event: T): void {
    // Emit synchronously—handlers run immediately
    this.emit(event.eventType, event)
  }

  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: (event: T) => void | Promise<void>
  ): void {
    this.on(eventType, handler)
  }
}

// src/domain/aggregates/order.ts
export class Order {
  private events: DomainEvent[] = []

  constructor(
    public id: string,
    public userId: string,
    public items: OrderItem[],
    public total: number,
    public status: 'pending' | 'confirmed' | 'cancelled' = 'pending'
  ) {
    // Record that this order was created
    this.events.push(
      DomainEventFactory.createOrderCreatedEvent(
        this.id,
        this.userId,
        this.items,
        this.total
      )
    )
  }

  confirm(paymentId: string): void {
    if (this.status !== 'pending') {
      throw new Error('Can only confirm pending orders')
    }

    this.status = 'confirmed'
    this.events.push({
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderConfirmed',
      version: 1,
      timestamp: new Date(),
      data: { orderId: this.id, paymentId }
    })
  }

  cancel(): void {
    this.status = 'cancelled'
    this.events.push({
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderCancelled',
      version: 1,
      timestamp: new Date(),
      data: { orderId: this.id }
    })
  }

  // Publish events that occurred in this aggregate
  getUncommittedEvents(): DomainEvent[] {
    return this.events
  }

  clearEvents(): void {
    this.events = []
  }
}

// src/application/create-order.ts
export class CreateOrderUseCase {
  constructor(
    private orderRepository: OrderRepository,
    private eventBus: InProcessEventBus
  ) {}

  async execute(request: CreateOrderRequest): Promise<Order> {
    // Create order
    const order = new Order(
      crypto.randomUUID(),
      request.userId,
      request.items,
      request.total
    )

    // Persist it
    await this.orderRepository.save(order)

    // Publish events (synchronously)
    const events = order.getUncommittedEvents()
    for (const event of events) {
      this.eventBus.publish(event)
    }

    order.clearEvents()

    return order
  }
}

// src/application/listeners/order-listeners.ts
export class OrderListeners {
  constructor(
    private eventBus: InProcessEventBus,
    private inventoryService: InventoryService,
    private notificationService: NotificationService
  ) {
    // Subscribe to events
    this.eventBus.subscribe('OrderCreated', this.onOrderCreated.bind(this))
    this.eventBus.subscribe('OrderConfirmed', this.onOrderConfirmed.bind(this))
  }

  private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    console.log(`Order created: ${event.data.orderId}`)

    // Decrease inventory
    for (const item of event.data.items) {
      await this.inventoryService.decrementStock(item.productId, item.quantity)
    }
  }

  private async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
    console.log(`Order confirmed: ${event.data.orderId}`)

    // Send confirmation email
    await this.notificationService.sendOrderConfirmation(event.data.orderId)
  }
}

Collecting Events on Aggregate, Then Dispatch After Commit

Best practice: collect events during business logic, dispatch after persistence:

// src/repositories/order-repository.ts
export class OrderRepository {
  constructor(
    private db: Database,
    private eventBus: InProcessEventBus
  ) {}

  async save(order: Order): Promise<void> {
    // Get uncommitted events before clearing them
    const events = order.getUncommittedEvents()

    // Persist aggregate
    await this.db.orders.create({
      id: order.id,
      userId: order.userId,
      status: order.status,
      total: order.total,
      items: order.items,
      createdAt: new Date()
    })

    // Only publish events if persistence succeeded
    // If persistence fails, exception is thrown before publication
    for (const event of events) {
      this.eventBus.publish(event)
    }

    order.clearEvents()
  }

  async update(order: Order): Promise<void> {
    const events = order.getUncommittedEvents()

    await this.db.orders.update(
      { id: order.id },
      {
        status: order.status,
        updatedAt: new Date()
      }
    )

    for (const event of events) {
      this.eventBus.publish(event)
    }

    order.clearEvents()
  }
}

// This pattern ensures:
// 1. Events only published if persistence succeeds
// 2. No partial state: either everything persists or nothing
// 3. Listeners can assume event means aggregate was persisted

Transactional Outbox for Reliable Publishing

In-process events are synchronous and can fail. The outbox pattern ensures events are published reliably even if the event bus is down:

// src/infrastructure/outbox.ts
export interface OutboxEvent {
  id: string
  aggregateId: string
  aggregateType: string
  eventType: string
  eventData: Record<string, any>
  published: boolean
  publishedAt?: Date
  createdAt: Date
}

export class TransactionalOutbox {
  constructor(
    private db: Database,
    private eventBus: EventBus
  ) {}

  async recordEvent(event: DomainEvent): Promise<void> {
    // Store event in database within same transaction as aggregate
    await this.db.outbox.create({
      id: crypto.randomUUID(),
      aggregateId: event.aggregateId,
      aggregateType: event.aggregateType,
      eventType: event.eventType,
      eventData: event.data,
      published: false,
      createdAt: new Date()
    })
    // If this transaction commits, event is in database
    // If it rolls back, event is not recorded
  }

  async publishPendingEvents(): Promise<void> {
    // Background job: periodically find unpublished events and publish them
    const unpublished = await this.db.outbox.findMany({
      where: { published: false }
    })

    for (const outboxEvent of unpublished) {
      try {
        // Publish to event bus
        await this.eventBus.publish({
          aggregateId: outboxEvent.aggregateId,
          aggregateType: outboxEvent.aggregateType,
          eventType: outboxEvent.eventType,
          version: 1,
          timestamp: outboxEvent.createdAt,
          data: outboxEvent.eventData
        })

        // Mark as published
        await this.db.outbox.update(
          { id: outboxEvent.id },
          {
            published: true,
            publishedAt: new Date()
          }
        )
      } catch (error) {
        console.error(`Failed to publish event ${outboxEvent.id}`, error)
        // Retry on next job run
      }
    }
  }
}

// Repository uses outbox
export class OrderRepository {
  constructor(
    private db: Database,
    private outbox: TransactionalOutbox
  ) {}

  async save(order: Order): Promise<void> {
    const events = order.getUncommittedEvents()

    // Single database transaction
    const transaction = await this.db.transaction()

    try {
      // Persist aggregate
      await transaction.orders.create({
        id: order.id,
        userId: order.userId,
        status: order.status,
        total: order.total
      })

      // Record all events in outbox (same transaction)
      for (const event of events) {
        await transaction.outbox.create({
          id: crypto.randomUUID(),
          aggregateId: event.aggregateId,
          aggregateType: event.aggregateType,
          eventType: event.eventType,
          eventData: event.data,
          published: false,
          createdAt: new Date()
        })
      }

      // Commit transaction
      await transaction.commit()

      // After commit succeeds, try immediate publication (optional)
      // If this fails, background job will retry
      for (const event of events) {
        this.eventBus.publish(event).catch(err => {
          console.error('Event bus offline, will retry via outbox job', err)
        })
      }

      order.clearEvents()
    } catch (error) {
      await transaction.rollback()
      throw error
    }
  }
}

// Background job (runs every 30 seconds)
// Cron job or Kubernetes CronJob
setInterval(async () => {
  await outbox.publishPendingEvents()
}, 30000)

Event Handler Organization (One Handler Per File)

Keep event handlers organized and focused:

// src/application/event-handlers/order-created.handler.ts
export class OrderCreatedHandler {
  constructor(
    private inventoryService: InventoryService
  ) {}

  async handle(event: OrderCreatedEvent): Promise<void> {
    // Decrease inventory
    for (const item of event.data.items) {
      await this.inventoryService.decrementStock(item.productId, item.quantity)
    }
  }
}

// src/application/event-handlers/order-confirmed.handler.ts
export class OrderConfirmedHandler {
  constructor(
    private emailService: EmailService
  ) {}

  async handle(event: OrderConfirmedEvent): Promise<void> {
    // Send confirmation email
    await this.emailService.sendConfirmation(event.data.orderId)
  }
}

// src/application/event-handlers/inventory-decremented.handler.ts
export class InventoryDecrementedHandler {
  constructor(
    private analyticsService: AnalyticsService
  ) {}

  async handle(event: InventoryDecrementedEvent): Promise<void> {
    // Track inventory change for reporting
    await this.analyticsService.trackInventoryChange({
      productId: event.data.productId,
      delta: -event.data.quantity,
      reason: event.data.reason
    })
  }
}

// src/application/event-handler-registry.ts
export class EventHandlerRegistry {
  private handlers: Map<string, any[]> = new Map()

  register(eventType: string, handler: any): void {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, [])
    }
    this.handlers.get(eventType)!.push(handler)
  }

  getHandlers(eventType: string): any[] {
    return this.handlers.get(eventType) || []
  }
}

// src/composition-root.ts
const registry = new EventHandlerRegistry()
const eventBus = new InProcessEventBus()

// Register handlers
registry.register('OrderCreated', new OrderCreatedHandler(inventoryService))
registry.register('OrderConfirmed', new OrderConfirmedHandler(emailService))
registry.register('InventoryDecremented', new InventoryDecrementedHandler(analyticsService))

// Subscribe handlers to event bus
for (const [eventType, handlers] of Object.entries(registry.handlers)) {
  for (const handler of handlers) {
    eventBus.subscribe(eventType, (event) => handler.handle(event))
  }
}

Async vs Sync Event Handling Tradeoffs

// SYNCHRONOUS: Handler runs immediately, blocking
// Pros: Consistency guaranteed, easy to reason about
// Cons: Slow (handler latency blocks request), tight coupling

export class CreateOrderUseCase {
  async execute(request: CreateOrderRequest): Promise<Order> {
    const order = new Order(request)
    await this.orderRepository.save(order)

    // Handler blocks here
    const events = order.getUncommittedEvents()
    for (const event of events) {
      // Email sends synchronously - if slow, request is slow
      await this.eventBus.publish(event)  // blocks
    }

    return order
  }
}

// ASYNCHRONOUS: Handler runs in background, non-blocking
// Pros: Fast (request returns immediately), decoupled, scalable
// Cons: Eventual consistency (handler might fail), harder to debug

export class CreateOrderUseCase {
  async execute(request: CreateOrderRequest): Promise<Order> {
    const order = new Order(request)
    await this.orderRepository.save(order)

    const events = order.getUncommittedEvents()
    for (const event of events) {
      // Return immediately, handler runs in background (queue)
      // If handler fails, retry independently
      await this.eventBus.publishAsync(event)  // returns immediately
    }

    return order
  }
}

// HYBRID: Some handlers sync, some async
export class CreateOrderUseCase {
  async execute(request: CreateOrderRequest): Promise<Order> {
    const order = new Order(request)

    const events = order.getUncommittedEvents()

    // Sync handlers (critical path): inventory, payment
    const syncHandlers = ['InventoryDecremented', 'PaymentProcessed']

    // Async handlers (non-critical): email, analytics, notifications
    const asyncHandlers = ['EmailSent', 'AnalyticsTracked']

    // Run critical handlers synchronously
    for (const event of events) {
      if (syncHandlers.includes(event.eventType)) {
        await this.eventBus.publish(event)
      }
    }

    // Run non-critical handlers asynchronously
    for (const event of events) {
      if (asyncHandlers.includes(event.eventType)) {
        this.eventBus.publishAsync(event).catch(err =>
          console.error('Async handler failed', err)
        )
      }
    }

    return order
  }
}

Saga Coordination With Domain Events

Use domain events to coordinate long-running processes across aggregates:

// src/domain/sagas/order-fulfillment-saga.ts
export class OrderFulfillmentSaga {
  constructor(
    private eventBus: EventBus,
    private orderService: OrderService,
    private shippingService: ShippingService,
    private paymentService: PaymentService
  ) {
    this.eventBus.subscribe('OrderCreated', this.onOrderCreated.bind(this))
    this.eventBus.subscribe('PaymentConfirmed', this.onPaymentConfirmed.bind(this))
    this.eventBus.subscribe('ShippingInitiated', this.onShippingInitiated.bind(this))
  }

  private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    // 1. Order created, process payment
    try {
      await this.paymentService.charge(
        event.data.orderId,
        event.data.total
      )
      // PaymentConfirmed event will be emitted
    } catch (error) {
      // Payment failed, order should be cancelled
      await this.orderService.cancel(event.data.orderId)
    }
  }

  private async onPaymentConfirmed(event: PaymentConfirmedEvent): Promise<void> {
    // 2. Payment confirmed, initiate shipping
    await this.shippingService.initiate(event.data.orderId)
    // ShippingInitiated event will be emitted
  }

  private async onShippingInitiated(event: ShippingInitiatedEvent): Promise<void> {
    // 3. Shipping initiated, confirm order
    await this.orderService.confirm(event.data.orderId)
  }
}

// Event flow:
// OrderCreated → (process payment) → PaymentConfirmed → (initiate shipping) → ShippingInitiated → (confirm order)
// Each step is a separate event, coordinated via saga

// If any step fails, saga can compensate:
private async onPaymentFailed(event: PaymentFailedEvent): Promise<void> {
  // Compensate: cancel order
  await this.orderService.cancel(event.data.orderId)
  // CancelledEvent emitted
}

Testing Domain Event Flows

// src/domain/__tests__/order-flow.test.ts

describe('Order creation flow with events', () => {
  let order: Order
  let eventBus: InProcessEventBus
  let inventoryService: InventoryService
  let inventoryServiceMock: jest.Mock

  beforeEach(() => {
    eventBus = new InProcessEventBus()
    inventoryServiceMock = jest.fn()
    inventoryService = { decrementStock: inventoryServiceMock }

    // Register handler
    eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
      for (const item of event.data.items) {
        await inventoryService.decrementStock(item.productId, item.quantity)
      }
    })
  })

  it('should decrease inventory when order created', async () => {
    order = new Order('123', 'user1', [
      { productId: 'p1', quantity: 2, price: 50 }
    ], 100)

    const events = order.getUncommittedEvents()
    for (const event of events) {
      eventBus.publish(event)
    }

    // Handler should have been called
    expect(inventoryServiceMock).toHaveBeenCalledWith('p1', 2)
  })

  it('should publish multiple events in order', (done) => {
    const eventSequence: string[] = []

    eventBus.subscribe('OrderCreated', () => {
      eventSequence.push('OrderCreated')
    })

    eventBus.subscribe('OrderConfirmed', () => {
      eventSequence.push('OrderConfirmed')
    })

    order = new Order('123', 'user1', [], 100)
    let events = order.getUncommittedEvents()
    for (const event of events) {
      eventBus.publish(event)
    }

    order.confirm('payment-123')
    events = order.getUncommittedEvents()
    for (const event of events) {
      eventBus.publish(event)
    }

    setImmediate(() => {
      expect(eventSequence).toEqual(['OrderCreated', 'OrderConfirmed'])
      done()
    })
  })
})

Checklist

  • Domain events model important business occurrences
  • Events are immutable value objects with timestamp and version
  • Aggregates collect events during operations
  • Events published only after persistence succeeds
  • Handlers organized as separate classes, one per event
  • Handlers are idempotent (safe to run multiple times)
  • Outbox pattern ensures event publication reliability
  • Critical handlers synchronous, non-critical asynchronous
  • Sagas coordinate complex processes via event chains
  • Event versioning strategy documented
  • Dead letter queue for failed handlers
  • Tests verify event sequences and handler behavior

Conclusion

Domain events make complex systems manageable by decoupling aggregates. An order doesn't call inventory to decrement stock—it emits OrderCreated and lets interested parties listen. When requirements change, you add new listeners without touching the order. This is how you scale systems and teams.