- Published on
Domain Events in Node.js — Decoupling Business Logic With Events
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
When an order is created, inventory decreases, emails are sent, analytics are updated. If these are all called synchronously, the Order aggregate is tightly coupled to everything. Domain events decouple this: the order emits "OrderCreated," and other parts of the system listen and react. The aggregate publishes events—external listeners decide what to do.
- Domain Event as Value Object
- Synchronous In-Process Dispatch (EventEmitter)
- Collecting Events on Aggregate, Then Dispatch After Commit
- Transactional Outbox for Reliable Publishing
- Event Handler Organization (One Handler Per File)
- Async vs Sync Event Handling Tradeoffs
- Saga Coordination With Domain Events
- Testing Domain Event Flows
- Checklist
- Conclusion
Domain Event as Value Object
Domain events are immutable value objects describing something that happened in the domain:
// src/domain/events/domain-event.ts
export interface DomainEvent {
aggregateId: string
aggregateType: string
eventType: string
version: number
timestamp: Date
}
// src/domain/events/order-created.ts
export interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated'
aggregateType: 'Order'
data: {
orderId: string
userId: string
items: Array<{
productId: string
quantity: number
price: number
}>
total: number
}
}
// src/domain/events/inventory-decremented.ts
export interface InventoryDecrementedEvent extends DomainEvent {
eventType: 'InventoryDecremented'
aggregateType: 'Inventory'
data: {
productId: string
quantity: number
reason: 'order_created' | 'damaged' | 'returned'
relatedAggregateId: string // orderId
}
}
// Events are immutable
export class DomainEventFactory {
static createOrderCreatedEvent(
orderId: string,
userId: string,
items: OrderItem[],
total: number
): OrderCreatedEvent {
return {
aggregateId: orderId,
aggregateType: 'Order',
eventType: 'OrderCreated',
version: 1,
timestamp: new Date(),
data: {
orderId,
userId,
items: items.map(item => ({
productId: item.productId,
quantity: item.quantity,
price: item.price
})),
total
}
}
}
}
Synchronous In-Process Dispatch (EventEmitter)
For simple cases, use Node's EventEmitter for synchronous in-memory dispatch:
// src/domain/event-bus.ts
import { EventEmitter } from 'events'
import type { DomainEvent } from './events/domain-event'
export class InProcessEventBus extends EventEmitter {
publish<T extends DomainEvent>(event: T): void {
// Emit synchronously—handlers run immediately
this.emit(event.eventType, event)
}
subscribe<T extends DomainEvent>(
eventType: string,
handler: (event: T) => void | Promise<void>
): void {
this.on(eventType, handler)
}
}
// src/domain/aggregates/order.ts
export class Order {
private events: DomainEvent[] = []
constructor(
public id: string,
public userId: string,
public items: OrderItem[],
public total: number,
public status: 'pending' | 'confirmed' | 'cancelled' = 'pending'
) {
// Record that this order was created
this.events.push(
DomainEventFactory.createOrderCreatedEvent(
this.id,
this.userId,
this.items,
this.total
)
)
}
confirm(paymentId: string): void {
if (this.status !== 'pending') {
throw new Error('Can only confirm pending orders')
}
this.status = 'confirmed'
this.events.push({
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderConfirmed',
version: 1,
timestamp: new Date(),
data: { orderId: this.id, paymentId }
})
}
cancel(): void {
this.status = 'cancelled'
this.events.push({
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderCancelled',
version: 1,
timestamp: new Date(),
data: { orderId: this.id }
})
}
// Publish events that occurred in this aggregate
getUncommittedEvents(): DomainEvent[] {
return this.events
}
clearEvents(): void {
this.events = []
}
}
// src/application/create-order.ts
export class CreateOrderUseCase {
constructor(
private orderRepository: OrderRepository,
private eventBus: InProcessEventBus
) {}
async execute(request: CreateOrderRequest): Promise<Order> {
// Create order
const order = new Order(
crypto.randomUUID(),
request.userId,
request.items,
request.total
)
// Persist it
await this.orderRepository.save(order)
// Publish events (synchronously)
const events = order.getUncommittedEvents()
for (const event of events) {
this.eventBus.publish(event)
}
order.clearEvents()
return order
}
}
// src/application/listeners/order-listeners.ts
export class OrderListeners {
constructor(
private eventBus: InProcessEventBus,
private inventoryService: InventoryService,
private notificationService: NotificationService
) {
// Subscribe to events
this.eventBus.subscribe('OrderCreated', this.onOrderCreated.bind(this))
this.eventBus.subscribe('OrderConfirmed', this.onOrderConfirmed.bind(this))
}
private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
console.log(`Order created: ${event.data.orderId}`)
// Decrease inventory
for (const item of event.data.items) {
await this.inventoryService.decrementStock(item.productId, item.quantity)
}
}
private async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
console.log(`Order confirmed: ${event.data.orderId}`)
// Send confirmation email
await this.notificationService.sendOrderConfirmation(event.data.orderId)
}
}
Collecting Events on Aggregate, Then Dispatch After Commit
Best practice: collect events during business logic, dispatch after persistence:
// src/repositories/order-repository.ts
export class OrderRepository {
constructor(
private db: Database,
private eventBus: InProcessEventBus
) {}
async save(order: Order): Promise<void> {
// Get uncommitted events before clearing them
const events = order.getUncommittedEvents()
// Persist aggregate
await this.db.orders.create({
id: order.id,
userId: order.userId,
status: order.status,
total: order.total,
items: order.items,
createdAt: new Date()
})
// Only publish events if persistence succeeded
// If persistence fails, exception is thrown before publication
for (const event of events) {
this.eventBus.publish(event)
}
order.clearEvents()
}
async update(order: Order): Promise<void> {
const events = order.getUncommittedEvents()
await this.db.orders.update(
{ id: order.id },
{
status: order.status,
updatedAt: new Date()
}
)
for (const event of events) {
this.eventBus.publish(event)
}
order.clearEvents()
}
}
// This pattern ensures:
// 1. Events only published if persistence succeeds
// 2. No partial state: either everything persists or nothing
// 3. Listeners can assume event means aggregate was persisted
Transactional Outbox for Reliable Publishing
In-process events are synchronous and can fail. The outbox pattern ensures events are published reliably even if the event bus is down:
// src/infrastructure/outbox.ts
export interface OutboxEvent {
id: string
aggregateId: string
aggregateType: string
eventType: string
eventData: Record<string, any>
published: boolean
publishedAt?: Date
createdAt: Date
}
export class TransactionalOutbox {
constructor(
private db: Database,
private eventBus: EventBus
) {}
async recordEvent(event: DomainEvent): Promise<void> {
// Store event in database within same transaction as aggregate
await this.db.outbox.create({
id: crypto.randomUUID(),
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
eventData: event.data,
published: false,
createdAt: new Date()
})
// If this transaction commits, event is in database
// If it rolls back, event is not recorded
}
async publishPendingEvents(): Promise<void> {
// Background job: periodically find unpublished events and publish them
const unpublished = await this.db.outbox.findMany({
where: { published: false }
})
for (const outboxEvent of unpublished) {
try {
// Publish to event bus
await this.eventBus.publish({
aggregateId: outboxEvent.aggregateId,
aggregateType: outboxEvent.aggregateType,
eventType: outboxEvent.eventType,
version: 1,
timestamp: outboxEvent.createdAt,
data: outboxEvent.eventData
})
// Mark as published
await this.db.outbox.update(
{ id: outboxEvent.id },
{
published: true,
publishedAt: new Date()
}
)
} catch (error) {
console.error(`Failed to publish event ${outboxEvent.id}`, error)
// Retry on next job run
}
}
}
}
// Repository uses outbox
export class OrderRepository {
constructor(
private db: Database,
private outbox: TransactionalOutbox
) {}
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents()
// Single database transaction
const transaction = await this.db.transaction()
try {
// Persist aggregate
await transaction.orders.create({
id: order.id,
userId: order.userId,
status: order.status,
total: order.total
})
// Record all events in outbox (same transaction)
for (const event of events) {
await transaction.outbox.create({
id: crypto.randomUUID(),
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
eventData: event.data,
published: false,
createdAt: new Date()
})
}
// Commit transaction
await transaction.commit()
// After commit succeeds, try immediate publication (optional)
// If this fails, background job will retry
for (const event of events) {
this.eventBus.publish(event).catch(err => {
console.error('Event bus offline, will retry via outbox job', err)
})
}
order.clearEvents()
} catch (error) {
await transaction.rollback()
throw error
}
}
}
// Background job (runs every 30 seconds)
// Cron job or Kubernetes CronJob
setInterval(async () => {
await outbox.publishPendingEvents()
}, 30000)
Event Handler Organization (One Handler Per File)
Keep event handlers organized and focused:
// src/application/event-handlers/order-created.handler.ts
export class OrderCreatedHandler {
constructor(
private inventoryService: InventoryService
) {}
async handle(event: OrderCreatedEvent): Promise<void> {
// Decrease inventory
for (const item of event.data.items) {
await this.inventoryService.decrementStock(item.productId, item.quantity)
}
}
}
// src/application/event-handlers/order-confirmed.handler.ts
export class OrderConfirmedHandler {
constructor(
private emailService: EmailService
) {}
async handle(event: OrderConfirmedEvent): Promise<void> {
// Send confirmation email
await this.emailService.sendConfirmation(event.data.orderId)
}
}
// src/application/event-handlers/inventory-decremented.handler.ts
export class InventoryDecrementedHandler {
constructor(
private analyticsService: AnalyticsService
) {}
async handle(event: InventoryDecrementedEvent): Promise<void> {
// Track inventory change for reporting
await this.analyticsService.trackInventoryChange({
productId: event.data.productId,
delta: -event.data.quantity,
reason: event.data.reason
})
}
}
// src/application/event-handler-registry.ts
export class EventHandlerRegistry {
private handlers: Map<string, any[]> = new Map()
register(eventType: string, handler: any): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, [])
}
this.handlers.get(eventType)!.push(handler)
}
getHandlers(eventType: string): any[] {
return this.handlers.get(eventType) || []
}
}
// src/composition-root.ts
const registry = new EventHandlerRegistry()
const eventBus = new InProcessEventBus()
// Register handlers
registry.register('OrderCreated', new OrderCreatedHandler(inventoryService))
registry.register('OrderConfirmed', new OrderConfirmedHandler(emailService))
registry.register('InventoryDecremented', new InventoryDecrementedHandler(analyticsService))
// Subscribe handlers to event bus
for (const [eventType, handlers] of Object.entries(registry.handlers)) {
for (const handler of handlers) {
eventBus.subscribe(eventType, (event) => handler.handle(event))
}
}
Async vs Sync Event Handling Tradeoffs
// SYNCHRONOUS: Handler runs immediately, blocking
// Pros: Consistency guaranteed, easy to reason about
// Cons: Slow (handler latency blocks request), tight coupling
export class CreateOrderUseCase {
async execute(request: CreateOrderRequest): Promise<Order> {
const order = new Order(request)
await this.orderRepository.save(order)
// Handler blocks here
const events = order.getUncommittedEvents()
for (const event of events) {
// Email sends synchronously - if slow, request is slow
await this.eventBus.publish(event) // blocks
}
return order
}
}
// ASYNCHRONOUS: Handler runs in background, non-blocking
// Pros: Fast (request returns immediately), decoupled, scalable
// Cons: Eventual consistency (handler might fail), harder to debug
export class CreateOrderUseCase {
async execute(request: CreateOrderRequest): Promise<Order> {
const order = new Order(request)
await this.orderRepository.save(order)
const events = order.getUncommittedEvents()
for (const event of events) {
// Return immediately, handler runs in background (queue)
// If handler fails, retry independently
await this.eventBus.publishAsync(event) // returns immediately
}
return order
}
}
// HYBRID: Some handlers sync, some async
export class CreateOrderUseCase {
async execute(request: CreateOrderRequest): Promise<Order> {
const order = new Order(request)
const events = order.getUncommittedEvents()
// Sync handlers (critical path): inventory, payment
const syncHandlers = ['InventoryDecremented', 'PaymentProcessed']
// Async handlers (non-critical): email, analytics, notifications
const asyncHandlers = ['EmailSent', 'AnalyticsTracked']
// Run critical handlers synchronously
for (const event of events) {
if (syncHandlers.includes(event.eventType)) {
await this.eventBus.publish(event)
}
}
// Run non-critical handlers asynchronously
for (const event of events) {
if (asyncHandlers.includes(event.eventType)) {
this.eventBus.publishAsync(event).catch(err =>
console.error('Async handler failed', err)
)
}
}
return order
}
}
Saga Coordination With Domain Events
Use domain events to coordinate long-running processes across aggregates:
// src/domain/sagas/order-fulfillment-saga.ts
export class OrderFulfillmentSaga {
constructor(
private eventBus: EventBus,
private orderService: OrderService,
private shippingService: ShippingService,
private paymentService: PaymentService
) {
this.eventBus.subscribe('OrderCreated', this.onOrderCreated.bind(this))
this.eventBus.subscribe('PaymentConfirmed', this.onPaymentConfirmed.bind(this))
this.eventBus.subscribe('ShippingInitiated', this.onShippingInitiated.bind(this))
}
private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
// 1. Order created, process payment
try {
await this.paymentService.charge(
event.data.orderId,
event.data.total
)
// PaymentConfirmed event will be emitted
} catch (error) {
// Payment failed, order should be cancelled
await this.orderService.cancel(event.data.orderId)
}
}
private async onPaymentConfirmed(event: PaymentConfirmedEvent): Promise<void> {
// 2. Payment confirmed, initiate shipping
await this.shippingService.initiate(event.data.orderId)
// ShippingInitiated event will be emitted
}
private async onShippingInitiated(event: ShippingInitiatedEvent): Promise<void> {
// 3. Shipping initiated, confirm order
await this.orderService.confirm(event.data.orderId)
}
}
// Event flow:
// OrderCreated → (process payment) → PaymentConfirmed → (initiate shipping) → ShippingInitiated → (confirm order)
// Each step is a separate event, coordinated via saga
// If any step fails, saga can compensate:
private async onPaymentFailed(event: PaymentFailedEvent): Promise<void> {
// Compensate: cancel order
await this.orderService.cancel(event.data.orderId)
// CancelledEvent emitted
}
Testing Domain Event Flows
// src/domain/__tests__/order-flow.test.ts
describe('Order creation flow with events', () => {
let order: Order
let eventBus: InProcessEventBus
let inventoryService: InventoryService
let inventoryServiceMock: jest.Mock
beforeEach(() => {
eventBus = new InProcessEventBus()
inventoryServiceMock = jest.fn()
inventoryService = { decrementStock: inventoryServiceMock }
// Register handler
eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
for (const item of event.data.items) {
await inventoryService.decrementStock(item.productId, item.quantity)
}
})
})
it('should decrease inventory when order created', async () => {
order = new Order('123', 'user1', [
{ productId: 'p1', quantity: 2, price: 50 }
], 100)
const events = order.getUncommittedEvents()
for (const event of events) {
eventBus.publish(event)
}
// Handler should have been called
expect(inventoryServiceMock).toHaveBeenCalledWith('p1', 2)
})
it('should publish multiple events in order', (done) => {
const eventSequence: string[] = []
eventBus.subscribe('OrderCreated', () => {
eventSequence.push('OrderCreated')
})
eventBus.subscribe('OrderConfirmed', () => {
eventSequence.push('OrderConfirmed')
})
order = new Order('123', 'user1', [], 100)
let events = order.getUncommittedEvents()
for (const event of events) {
eventBus.publish(event)
}
order.confirm('payment-123')
events = order.getUncommittedEvents()
for (const event of events) {
eventBus.publish(event)
}
setImmediate(() => {
expect(eventSequence).toEqual(['OrderCreated', 'OrderConfirmed'])
done()
})
})
})
Checklist
- Domain events model important business occurrences
- Events are immutable value objects with timestamp and version
- Aggregates collect events during operations
- Events published only after persistence succeeds
- Handlers organized as separate classes, one per event
- Handlers are idempotent (safe to run multiple times)
- Outbox pattern ensures event publication reliability
- Critical handlers synchronous, non-critical asynchronous
- Sagas coordinate complex processes via event chains
- Event versioning strategy documented
- Dead letter queue for failed handlers
- Tests verify event sequences and handler behavior
Conclusion
Domain events make complex systems manageable by decoupling aggregates. An order doesn't call inventory to decrement stock—it emits OrderCreated and lets interested parties listen. When requirements change, you add new listeners without touching the order. This is how you scale systems and teams.