- Published on
Event Sourcing in Node.js — Storing What Happened Instead of What Is
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Event sourcing inverts the traditional database model: instead of storing the current state, you store every change that led to that state. An immutable event log becomes your source of truth. You reconstruct state by replaying events and build denormalized read models (projections) optimized for queries. This post covers event store design, aggregate reconstruction, snapshots for performance, projections, choosing between EventStoreDB and Postgres, temporal queries, and handling schema evolution.
- Event Store Design and Append-Only Logs
- Aggregate Reconstruction from Events
- Snapshots for Performance
- Projections and Read Models
- EventStoreDB vs Postgres
- Temporal Queries and Time Travel
- Schema Evolution and Upcasting
- Production Checklist
- Conclusion
Event Store Design and Append-Only Logs
An event store is an append-only log. Events are immutable facts about what happened, with timestamps and causation chains.
import { EventEmitter } from 'events';
import { Pool, QueryResult } from 'pg';
interface DomainEvent {
id: string; // UUID
aggregateId: string; // Root aggregate ID
aggregateType: string; // 'User', 'Post', etc.
eventType: string; // 'UserCreated', 'PostPublished'
eventVersion: number; // Schema version of this event type
data: Record<string, unknown>; // Event payload
metadata: {
timestamp: Date;
correlationId: string; // Links causally related events
causationId?: string; // Direct parent event
userId: string; // Who triggered this
source: string; // API, background job, etc.
};
revision: number; // Optimistic lock version
}
// Postgres schema for event store
const createEventStoreSchema = async (pool: Pool) => {
await pool.query(`
CREATE TABLE IF NOT EXISTS events (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id uuid NOT NULL,
aggregate_type text NOT NULL,
event_type text NOT NULL,
event_version integer NOT NULL DEFAULT 1,
data jsonb NOT NULL,
metadata jsonb NOT NULL,
revision bigint NOT NULL UNIQUE,
created_at timestamp NOT NULL DEFAULT now(),
INDEX idx_aggregate ON events(aggregate_id, aggregate_type),
INDEX idx_event_type ON events(event_type),
INDEX idx_created_at ON events(created_at),
INDEX idx_correlation_id ON events((metadata->>'correlationId'))
);
CREATE TABLE IF NOT EXISTS event_snapshots (
id uuid PRIMARY KEY,
aggregate_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
state jsonb NOT NULL,
revision bigint NOT NULL,
created_at timestamp NOT NULL DEFAULT now(),
expires_at timestamp,
INDEX idx_aggregate_type ON event_snapshots(aggregate_type)
);
CREATE TABLE IF NOT EXISTS outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
event_id uuid NOT NULL REFERENCES events(id),
published boolean NOT NULL DEFAULT false,
published_at timestamp,
created_at timestamp NOT NULL DEFAULT now(),
INDEX idx_unpublished ON outbox(published, created_at)
);
`);
};
class EventStore {
constructor(private pool: Pool) {}
async append(event: Omit<DomainEvent, 'id' | 'revision'>) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Get next revision number
const revisionResult = await client.query(
'SELECT COALESCE(MAX(revision), 0) + 1 as next_revision FROM events'
);
const revision = revisionResult.rows[0].next_revision;
// Insert event
const insertResult = await client.query(
`INSERT INTO events
(aggregate_id, aggregate_type, event_type, event_version, data, metadata, revision)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[
event.aggregateId,
event.aggregateType,
event.eventType,
event.eventVersion,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
revision,
]
);
// Add to outbox for event handlers
await client.query(
'INSERT INTO outbox (event_id) VALUES ($1)',
[insertResult.rows[0].id]
);
await client.query('COMMIT');
return insertResult.rows[0];
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEventsByAggregateId(aggregateId: string): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1
ORDER BY revision ASC`,
[aggregateId]
);
return result.rows.map(row => ({
...row,
data: row.data,
metadata: row.metadata,
}));
}
async getEventsByType(eventType: string, since?: Date): Promise<DomainEvent[]> {
const query =
'SELECT * FROM events WHERE event_type = $1' +
(since ? ' AND created_at > $2' : '') +
' ORDER BY created_at ASC';
const result = await this.pool.query(query, since ? [eventType, since] : [eventType]);
return result.rows;
}
async getEventsSince(revision: bigint): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE revision > $1
ORDER BY revision ASC
LIMIT 1000`,
[revision]
);
return result.rows;
}
}
export { EventStore, DomainEvent };
Aggregate Reconstruction from Events
Reconstruct aggregate state by replaying events in order. This is the core of event sourcing.
interface User {
id: string;
email: string;
displayName: string;
status: 'active' | 'suspended' | 'deleted';
roles: string[];
createdAt: Date;
suspendedAt?: Date;
}
type UserEvent =
| { type: 'UserCreated'; data: { email: string; displayName: string } }
| { type: 'UserEmailChanged'; data: { newEmail: string } }
| { type: 'UserSuspended'; data: { reason: string } }
| { type: 'UserReactivated'; data: {} }
| { type: 'RoleGranted'; data: { role: string } }
| { type: 'RoleRevoked'; data: { role: string } }
| { type: 'UserDeleted'; data: { reason: string } };
class UserAggregate {
id: string;
private state: User;
private uncommittedEvents: UserEvent[] = [];
private version: number = 0;
constructor(id: string) {
this.id = id;
this.state = {
id,
email: '',
displayName: '',
status: 'active',
roles: [],
createdAt: new Date(),
};
}
// Replay events to rebuild state
loadFromHistory(events: DomainEvent[]): void {
events.forEach(event => this.applyEvent(event));
this.version = events.length;
}
private applyEvent(event: DomainEvent): void {
switch (event.eventType) {
case 'UserCreated':
this.state = {
...this.state,
email: event.data.email,
displayName: event.data.displayName,
createdAt: event.metadata.timestamp,
};
break;
case 'UserEmailChanged':
this.state.email = event.data.newEmail;
break;
case 'UserSuspended':
this.state.status = 'suspended';
this.state.suspendedAt = event.metadata.timestamp;
break;
case 'UserReactivated':
this.state.status = 'active';
this.state.suspendedAt = undefined;
break;
case 'RoleGranted':
if (!this.state.roles.includes(event.data.role)) {
this.state.roles.push(event.data.role);
}
break;
case 'RoleRevoked':
this.state.roles = this.state.roles.filter(r => r !== event.data.role);
break;
case 'UserDeleted':
this.state.status = 'deleted';
break;
}
}
// Command handlers emit events
createUser(email: string, displayName: string): void {
this.addEvent({
type: 'UserCreated',
data: { email, displayName },
});
}
suspendUser(reason: string): void {
if (this.state.status === 'suspended') {
throw new Error('User already suspended');
}
this.addEvent({
type: 'UserSuspended',
data: { reason },
});
}
grantRole(role: string): void {
if (this.state.roles.includes(role)) {
throw new Error(`User already has role ${role}`);
}
this.addEvent({
type: 'RoleGranted',
data: { role },
});
}
private addEvent(event: UserEvent): void {
this.uncommittedEvents.push(event);
this.applyEvent({
id: '',
aggregateId: this.id,
aggregateType: 'User',
eventType: event.type,
eventVersion: 1,
data: event.data,
metadata: {
timestamp: new Date(),
correlationId: '',
userId: '',
source: 'api',
},
revision: 0,
});
}
getState(): User {
return this.state;
}
getUncommittedEvents(): UserEvent[] {
return this.uncommittedEvents;
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
}
// Repository loads and saves aggregates
class UserRepository {
constructor(private eventStore: EventStore) {}
async getById(userId: string): Promise<UserAggregate | null> {
const events = await this.eventStore.getEventsByAggregateId(userId);
if (events.length === 0) return null;
const aggregate = new UserAggregate(userId);
aggregate.loadFromHistory(events);
return aggregate;
}
async save(aggregate: UserAggregate): Promise<void> {
const events = aggregate.getUncommittedEvents();
for (const event of events) {
await this.eventStore.append({
aggregateId: aggregate.id,
aggregateType: 'User',
eventType: event.type,
eventVersion: 1,
data: event.data,
metadata: {
timestamp: new Date(),
correlationId: crypto.randomUUID(),
userId: 'system',
source: 'api',
},
});
}
aggregate.clearUncommittedEvents();
}
}
Snapshots for Performance
Snapshots store aggregate state periodically to avoid replaying thousands of events.
interface Snapshot {
aggregateId: string;
aggregateType: string;
state: unknown;
revision: number;
createdAt: Date;
}
class SnapshotStore {
constructor(private pool: Pool) {}
async saveSnapshot(
aggregateId: string,
aggregateType: string,
state: unknown,
revision: number
): Promise<void> {
await this.pool.query(
`INSERT INTO event_snapshots (aggregate_id, aggregate_type, state, revision, expires_at)
VALUES ($1, $2, $3, $4, NOW() + INTERVAL '30 days')
ON CONFLICT (aggregate_id) DO UPDATE
SET state = $3, revision = $4, created_at = NOW()`,
[aggregateId, aggregateType, JSON.stringify(state), revision]
);
}
async getSnapshot(aggregateId: string): Promise<Snapshot | null> {
const result = await this.pool.query(
`SELECT * FROM event_snapshots
WHERE aggregate_id = $1
AND expires_at > NOW()`,
[aggregateId]
);
return result.rows[0] || null;
}
}
// Updated repository to use snapshots
class UserRepositoryWithSnapshots {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
async getById(userId: string): Promise<UserAggregate | null> {
const aggregate = new UserAggregate(userId);
// Try to load from snapshot
const snapshot = await this.snapshotStore.getSnapshot(userId);
if (snapshot) {
// Load snapshot state, then replay events after snapshot
Object.assign(aggregate['state'], snapshot.state);
const events = await this.eventStore.getEventsSince(snapshot.revision);
events.forEach(event => aggregate['applyEvent'](event));
} else {
// Load all events
const events = await this.eventStore.getEventsByAggregateId(userId);
if (events.length === 0) return null;
aggregate.loadFromHistory(events);
// Create snapshot every 100 events
if (events.length > 100) {
await this.snapshotStore.saveSnapshot(
userId,
'User',
aggregate['state'],
events.length
);
}
}
return aggregate;
}
}
Projections and Read Models
Projections build denormalized read models optimized for queries by subscribing to events.
// Read model for user profiles
interface UserProfile {
userId: string;
email: string;
displayName: string;
postCount: number;
followerCount: number;
updatedAt: Date;
}
class UserProfileProjection {
constructor(private pool: Pool) {}
async init(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS user_profiles (
user_id uuid PRIMARY KEY,
email text NOT NULL,
display_name text NOT NULL,
post_count integer DEFAULT 0,
follower_count integer DEFAULT 0,
updated_at timestamp DEFAULT now(),
INDEX idx_email ON user_profiles(email)
)
`);
}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'UserCreated':
await this.pool.query(
`INSERT INTO user_profiles (user_id, email, display_name)
VALUES ($1, $2, $3)`,
[event.aggregateId, event.data.email, event.data.displayName]
);
break;
case 'UserEmailChanged':
await this.pool.query(
'UPDATE user_profiles SET email = $1, updated_at = NOW() WHERE user_id = $2',
[event.data.newEmail, event.aggregateId]
);
break;
case 'PostCreated':
await this.pool.query(
`UPDATE user_profiles
SET post_count = post_count + 1, updated_at = NOW()
WHERE user_id = $1`,
[event.data.authorId]
);
break;
case 'UserFollowed':
await this.pool.query(
`UPDATE user_profiles
SET follower_count = follower_count + 1, updated_at = NOW()
WHERE user_id = $1`,
[event.data.followedUserId]
);
break;
}
}
async getByEmail(email: string): Promise<UserProfile | null> {
const result = await this.pool.query(
'SELECT * FROM user_profiles WHERE email = $1',
[email]
);
return result.rows[0] || null;
}
}
// Projection handler that subscribes to events
class ProjectionManager {
constructor(
private eventStore: EventStore,
private projections: Map<string, Projection>
) {}
async handleNewEvent(event: DomainEvent): Promise<void> {
for (const [, projection] of this.projections) {
try {
await projection.handle(event);
} catch (error) {
console.error(`Projection failed for event ${event.id}:`, error);
// Store failed projection state for replay
}
}
}
async rebuildProjections(): Promise<void> {
const events = await this.eventStore.getEventsSince(0n);
for (const event of events) {
await this.handleNewEvent(event);
}
}
}
EventStoreDB vs Postgres
Compare production choices for event stores.
| Feature | EventStoreDB | Postgres |
|---|---|---|
| Purpose-built | Yes, optimized for events | General database |
| Append performance | Extremely fast | Good with indices |
| Subscriptions | Built-in, efficient | Via polling or triggers |
| Projections | First-class support | Must build yourself |
| Complexity | Requires separate service | Single database |
| Backup/restore | Specialized tooling | Standard postgres tools |
| Cost | Dedicated service cost | Likely already running |
| Team knowledge | Specialist skill | Common knowledge |
Use EventStoreDB for event-only workloads at scale. Use Postgres if you already run it and event sourcing is one of many patterns.
Temporal Queries and Time Travel
Query aggregate state at any point in history.
class TemporalAggregateStore {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
async getAggregateAtTime(
aggregateId: string,
asOf: Date
): Promise<UserAggregate | null> {
// Get snapshot before target time
const snapshotResult = await this.pool.query(
`SELECT * FROM event_snapshots
WHERE aggregate_id = $1 AND created_at <= $2
ORDER BY created_at DESC LIMIT 1`,
[aggregateId, asOf]
);
const aggregate = new UserAggregate(aggregateId);
let startRevision = 0;
if (snapshotResult.rows[0]) {
Object.assign(aggregate['state'], snapshotResult.rows[0].state);
startRevision = snapshotResult.rows[0].revision;
}
// Get events between snapshot and target time
const eventsResult = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1
AND revision > $2
AND created_at <= $3
ORDER BY revision ASC`,
[aggregateId, startRevision, asOf]
);
eventsResult.rows.forEach(event => aggregate['applyEvent'](event));
return aggregate;
}
}
Schema Evolution and Upcasting
Handle event schema changes without migrating historical events.
interface EventUpcast {
fromVersion: number;
toVersion: number;
upcast: (data: unknown) => unknown;
}
class UserCreatedUpcast implements EventUpcast {
fromVersion = 1;
toVersion = 2;
upcast(data: unknown): unknown {
const v1Data = data as { email: string; name: string };
// Split name into firstName/lastName in version 2
const [firstName, ...lastNameParts] = v1Data.name.split(' ');
return {
email: v1Data.email,
firstName,
lastName: lastNameParts.join(' '),
};
}
}
class EventUpcaster {
private upcasts: Map<string, EventUpcast[]> = new Map();
registerUpcast(eventType: string, upcast: EventUpcast): void {
if (!this.upcasts.has(eventType)) {
this.upcasts.set(eventType, []);
}
this.upcasts.get(eventType)!.push(upcast);
}
upcast(event: DomainEvent): DomainEvent {
const upcasts = this.upcasts.get(event.eventType) || [];
let data = event.data;
for (const upcast of upcasts) {
if (upcast.fromVersion === event.eventVersion) {
data = upcast.upcast(data);
}
}
return { ...event, data };
}
}
Production Checklist
- Event store append operation is atomic and consistent
- Events are immutable and never deleted (only logical deletes)
- Snapshots created every N events to prevent slow replays
- Projections are rebuilt from scratch periodically
- Outbox pattern used to ensure events published exactly-once
- Event correlation IDs track causation chains
- Upcasting strategy for handling event schema evolution
- Temporal queries tested for historical state reconstruction
- Event handler failures don't lose events (stored in failed queue)
- Monitoring on event lag, replay duration, snapshot staleness
Conclusion
Event sourcing provides an audit trail of every change and enables temporal queries and rebuilding read models. Start with a single Postgres event table, create projections for query performance, add snapshots when replays slow down, and consider EventStoreDB if event volume demands it. The complexity is worth it when you need historical data, compliance auditing, or complex domain logic.