Published on

Event-Driven Architecture in Practice — From Direct Calls to Async Events

Authors

Introduction

Synchronous service calls create tight coupling: if one service is slow, everything waits. If you need to add a new consumer, old code changes. Event-driven architecture inverts this: services emit events about what happened, and any number of consumers can react asynchronously. Netflix, Uber, and Amazon are built on this pattern.

Events vs Commands vs Queries

These three patterns often get confused. Each solves different problems:

// QUERY: Request data synchronously, expect immediate response
async function getUser(userId: string): Promise<User> {
  const response = await fetch(`/users/${userId}`)
  return response.json()
}

// COMMAND: Request action, expect synchronous result or error
async function createOrder(items: CartItem[]): Promise<OrderCreated> {
  const response = await fetch('/orders', {
    method: 'POST',
    body: JSON.stringify({ items })
  })
  if (!response.ok) throw new Error('Order creation failed')
  return response.json()
}

// EVENT: Notification that something happened, fire-and-forget
async function publishOrderCreated(event: OrderCreatedEvent): Promise<void> {
  await eventBus.publish('order.created', event)
  // We don't wait for consumers
  // We don't know if they succeeded
  // Multiple consumers can react
}

// Mixing patterns creates problems:
// Problem 1: Synchronous call chains
function checkout(cartId: string) {
  const order = createOrder(cartId) // Slow: waits for all logic
  const payment = processPayment(order.id) // Slow: waits for payment processor
  const notification = sendConfirmationEmail(order.id) // Slow: waits for email service
  return order
}

// Better: Emit event, let consumers handle async work
function checkout(cartId: string) {
  const order = createOrder(cartId) // Fast: just creates order
  eventBus.publish('order.created', { orderId: order.id })
  // Email, payment confirmation, inventory update all happen async
  return order
}

CloudEvents Specification for Interoperability

CloudEvents is a standard format for events across systems. Whether you use Kafka, RabbitMQ, or AWS EventBridge, CloudEvents keeps you portable:

// CloudEvents standard structure
interface CloudEvent {
  // Required
  specversion: '1.0'
  type: string // e.g., 'com.example.orders.created'
  source: string // e.g., '/orders-service'
  id: string // Unique event ID
  time: string // ISO 8601 timestamp

  // Optional but recommended
  datacontenttype: 'application/json'
  subject?: string // e.g., 'orders/123'
  dataschema?: string // URL to event schema
  data?: Record<string, any>
}

// Implementation
import { v4 as uuidv4 } from 'uuid'

export class CloudEventFactory {
  static create<T = Record<string, any>>(
    type: string,
    source: string,
    data: T,
    subject?: string
  ): CloudEvent {
    return {
      specversion: '1.0',
      type,
      source,
      id: uuidv4(),
      time: new Date().toISOString(),
      datacontenttype: 'application/json',
      subject,
      dataschema: `https://api.example.com/schemas/${type}`,
      data
    }
  }
}

// Usage
const event = CloudEventFactory.create(
  'com.example.orders.created',
  '/orders-service',
  {
    orderId: '123',
    userId: 'user456',
    total: 99.99,
    items: [{ productId: 'p1', qty: 2 }]
  },
  'orders/123'
)

// Now this event can be published to any CloudEvents-compatible broker
// Consumers in any language can understand it

Event Schema Registry (Confluent, AWS Glue)

As events evolve, you need schema versioning. A schema registry prevents incompatible events from being published:

// Schema registry workflow:
// 1. Define schema in registry
// 2. Services check schema before publishing
// 3. Consumers know what to expect

// src/schemas/order-created.schema.json
{
  "$id": "https://api.example.com/schemas/orders/created.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "OrderCreated",
  "version": 1,
  "type": "object",
  "required": ["orderId", "userId", "total"],
  "properties": {
    "orderId": {
      "type": "string",
      "format": "uuid"
    },
    "userId": {
      "type": "string",
      "format": "uuid"
    },
    "total": {
      "type": "number",
      "minimum": 0
    },
    "items": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["productId", "quantity"],
        "properties": {
          "productId": { "type": "string" },
          "quantity": { "type": "integer", "minimum": 1 }
        }
      }
    },
    "createdAt": {
      "type": "string",
      "format": "date-time"
    }
  }
}

// Schema registry client
import axios from 'axios'
import Ajv from 'ajv'

export class SchemaRegistry {
  private schemas: Map<string, any> = new Map()
  private ajv = new Ajv()

  async registerSchema(name: string, schema: any): Promise<number> {
    const response = await axios.post(
      `${process.env.SCHEMA_REGISTRY_URL}/subjects/${name}/versions`,
      { schema: JSON.stringify(schema) }
    )
    return response.data.id
  }

  async getSchema(name: string): Promise<any> {
    if (this.schemas.has(name)) {
      return this.schemas.get(name)
    }

    const response = await axios.get(
      `${process.env.SCHEMA_REGISTRY_URL}/subjects/${name}/versions/latest`
    )
    this.schemas.set(name, response.data.schema)
    return response.data.schema
  }

  async validateEvent(schemaName: string, event: any): Promise<boolean> {
    const schema = await this.getSchema(schemaName)
    const validate = this.ajv.compile(schema)
    return validate(event)
  }
}

// Usage in event publishing
export class OrderService {
  constructor(
    private eventBus: EventBus,
    private schemaRegistry: SchemaRegistry
  ) {}

  async createOrder(request: CreateOrderRequest): Promise<Order> {
    const order = new Order(request)
    await this.repository.save(order)

    const event = {
      orderId: order.id,
      userId: request.userId,
      total: order.total,
      items: order.items,
      createdAt: new Date().toISOString()
    }

    // Validate against schema before publishing
    const isValid = await this.schemaRegistry.validateEvent('OrderCreated', event)
    if (!isValid) {
      throw new Error('Event does not match schema')
    }

    await this.eventBus.publish('order.created', event)
    return order
  }
}

Domain Events vs Integration Events

Domain events model business occurrences. Integration events notify other services. They're different tools:

// Domain events: Occur within your aggregate, internal to service
export class Order {
  private domainEvents: DomainEvent[] = []

  constructor(items: OrderItem[]) {
    this.items = items
    this.status = 'pending'
    // When order is created, something happened in this domain
    this.addDomainEvent(new OrderCreatedDomainEvent(this.id))
  }

  confirm(paymentId: string): void {
    this.status = 'confirmed'
    this.paymentId = paymentId
    // Business rule: when payment confirmed, order confirmed
    this.addDomainEvent(new OrderConfirmedDomainEvent(this.id, paymentId))
  }

  private addDomainEvent(event: DomainEvent): void {
    this.domainEvents.push(event)
  }

  getDomainEvents(): DomainEvent[] {
    return this.domainEvents
  }
}

// After order is persisted, publish integration events
export class OrderRepository {
  async save(order: Order): Promise<void> {
    // Persist order
    await this.db.orders.create({
      id: order.id,
      status: order.status
    })

    // Convert domain events to integration events and publish
    const domainEvents = order.getDomainEvents()
    for (const domainEvent of domainEvents) {
      const integrationEvent = this.toIntegrationEvent(domainEvent)
      await this.eventBus.publish(integrationEvent)
    }

    order.clearDomainEvents()
  }

  private toIntegrationEvent(domainEvent: DomainEvent): IntegrationEvent {
    if (domainEvent instanceof OrderCreatedDomainEvent) {
      return {
        type: 'order.created',
        source: 'orders-service',
        data: {
          orderId: domainEvent.orderId,
          timestamp: new Date().toISOString()
        }
      }
    }
    throw new Error('Unknown domain event')
  }
}

// Other services consume integration events
export class NotificationService {
  constructor(private eventBus: EventBus) {
    this.eventBus.subscribe('order.created', this.onOrderCreated.bind(this))
  }

  private async onOrderCreated(event: IntegrationEvent): Promise<void> {
    await this.emailSender.send({
      to: 'customer@example.com',
      subject: 'Order Confirmed',
      body: `Your order ${event.data.orderId} has been created`
    })
  }
}

Event Versioning and Backwards Compatibility

Events evolve. Handle it gracefully:

// Version 1: Original event
{
  "eventType": "order.created",
  "orderId": "123",
  "total": 99.99
}

// Version 2: Add optional field (backwards compatible)
{
  "eventType": "order.created",
  "version": 2,
  "orderId": "123",
  "total": 99.99,
  "currency": "USD" // New, optional
}

// Version 3: Add required field (NOT backwards compatible, handle carefully)
{
  "eventType": "order.created",
  "version": 3,
  "orderId": "123",
  "total": 99.99,
  "currency": "USD",
  "userId": "user456" // New, required
}

// Handle version differences in consumer
export class OrderNotificationConsumer {
  async handle(event: any): Promise<void> {
    // Support multiple versions
    const orderId = event.orderId
    const total = event.total
    const currency = event.currency || 'USD' // Default for v1
    const userId = event.userId // May not exist in v1-2

    if (!event.userId) {
      // For old events without userId, look it up
      const order = await this.orderRepository.findById(orderId)
      event.userId = order.userId
    }

    await this.sendNotification(orderId, total, userId)
  }
}

// Better: Use discriminated unions for type safety
type OrderCreatedEventV1 = {
  version: 1
  orderId: string
  total: number
}

type OrderCreatedEventV2 = {
  version: 2
  orderId: string
  total: number
  currency: string
}

type OrderCreatedEventV3 = {
  version: 3
  orderId: string
  total: number
  currency: string
  userId: string
}

type OrderCreatedEvent = OrderCreatedEventV1 | OrderCreatedEventV2 | OrderCreatedEventV3

function handleOrderCreated(event: OrderCreatedEvent): void {
  switch (event.version) {
    case 1:
      // Handle V1
      break
    case 2:
      // Handle V2
      break
    case 3:
      // Handle V3
      break
  }
}

Consumer Group Strategy for Event Fanout

Kafka consumer groups ensure each consumer gets events, but multiple instances don't duplicate work:

// Kafka setup with consumer groups
import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'notifications-service',
  brokers: ['kafka:9092']
})

// Consumer group 1: Notifications
// If you have 5 notification-service instances, each gets different partitions
const notificationsConsumer = kafka.consumer({ groupId: 'notifications-service' })

// Consumer group 2: Analytics
// Analytics-service instances form separate group
const analyticsConsumer = kafka.consumer({ groupId: 'analytics-service' })

// Same event goes to both groups, but each instance only processes its partition
await notificationsConsumer.subscribe({ topic: 'orders', fromBeginning: false })
await analyticsConsumer.subscribe({ topic: 'orders', fromBeginning: false })

// Notifications group: 5 instances, 5 partitions → each instance gets 1 partition
await notificationsConsumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log(`Partition ${partition}: ${message.value}`)
    // Instance A handles partition 0
    // Instance B handles partition 1
    // ... etc
  }
})

// Analytics group: Can have different number of instances
// Handles topic from start to catch up on history
await analyticsConsumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    // Processes all historical orders, not just new ones
  }
})

// Key insight: Same topic, different consumer groups = parallel fanout
// Multiple services can independently consume same events

Event Sourcing vs Event-Driven (Different Concepts)

These are often confused but solve different problems:

// EVENT-DRIVEN: Services emit events about what happened
// Other services listen and react
// Events are the communication mechanism between services

class OrderService {
  async createOrder(request: CreateOrderRequest): Promise<Order> {
    const order = new Order(request)
    await this.repository.save(order)

    // Emit event so other services know about it
    await this.eventBus.publish('order.created', {
      orderId: order.id,
      userId: request.userId,
      total: order.total
    })

    return order
  }
}

// EVENT SOURCING: Store the complete history of changes as events
// Rebuild state by replaying events
// Offers perfect audit trail, time travel, event replay

class OrderEventStore {
  async createOrder(request: CreateOrderRequest): Promise<Order> {
    const orderId = crypto.randomUUID()

    // Store event in event store (immutable log)
    await this.eventStore.append('Order', orderId, {
      type: 'OrderCreated',
      orderId,
      userId: request.userId,
      items: request.items,
      total: request.total,
      timestamp: new Date()
    })

    // Rebuild order state from event log
    const order = await this.rebuildOrderFromEvents(orderId)
    return order
  }

  async confirmOrder(orderId: string, paymentId: string): Promise<Order> {
    // Append confirmation event
    await this.eventStore.append('Order', orderId, {
      type: 'OrderConfirmed',
      orderId,
      paymentId,
      timestamp: new Date()
    })

    // Rebuild order with new state
    const order = await this.rebuildOrderFromEvents(orderId)
    return order
  }

  private async rebuildOrderFromEvents(orderId: string): Promise<Order> {
    const events = await this.eventStore.getEvents('Order', orderId)
    let order = null

    for (const event of events) {
      switch (event.type) {
        case 'OrderCreated':
          order = new Order(
            event.orderId,
            event.userId,
            event.items,
            event.total
          )
          break
        case 'OrderConfirmed':
          order.confirm(event.paymentId)
          break
      }
    }

    return order
  }
}

// Use both together:
// - Event Sourcing for audit trail and state reconstruction
// - Event-Driven for inter-service communication
// Events from event store can be published to event bus

Observability for Event Flows

Event flows are hard to debug if you can't see them. Implement tracing:

// Distributed tracing for events
import { trace } from '@opentelemetry/api'

const tracer = trace.getTracer('orders-service')

export class OrderService {
  async createOrder(request: CreateOrderRequest): Promise<Order> {
    const span = tracer.startSpan('createOrder')

    try {
      span.addEvent('order_creation_started', {
        userId: request.userId,
        itemCount: request.items.length
      })

      const order = new Order(request)
      await this.repository.save(order)

      // Pass trace ID to event so consumers can track it
      const traceId = span.spanContext().traceId
      await this.eventBus.publish('order.created', {
        orderId: order.id,
        userId: request.userId,
        total: order.total,
        traceId // Enable end-to-end tracing
      })

      span.addEvent('order_created', { orderId: order.id })
      return order
    } catch (error) {
      span.recordException(error)
      throw error
    } finally {
      span.end()
    }
  }
}

// Consumer can continue trace
export class NotificationConsumer {
  async handle(event: OrderCreatedEvent): Promise<void> {
    // Create child span under event's trace
    const span = tracer.startSpan('send_order_notification', {
      attributes: {
        'trace_id': event.traceId
      }
    })

    try {
      await this.emailSender.send({
        to: 'customer@example.com',
        subject: 'Order Confirmed'
      })
      span.addEvent('email_sent')
    } finally {
      span.end()
    }
  }
}

// Dashboards show: Order created → Email sent → Analytics recorded
// All linked by trace ID, even across services

Checklist

  • Events follow CloudEvents spec for portability
  • Event schema registry enforces compatibility
  • Domain events modeled in aggregates
  • Domain events converted to integration events before publishing
  • Event versioning strategy documented
  • Backwards-compatible schema evolution planned
  • Multiple consumer groups handle fanout correctly
  • Consumers are idempotent (safe to retry)
  • Distributed tracing connects event flows
  • Dead letter queue configured for failed consumers
  • Event retention policy defined
  • Consumer lag monitored in production

Conclusion

Event-driven architecture scales horizontally: add new consumers without changing publishers. It handles asynchronous work elegantly and decouples services cleanly. But it requires discipline: events must be well-defined, versioning must be thought through, and observability must be built in from day one. When done right, it's the foundation of systems that scale.