- Published on
Event Sourcing for AI Systems — Immutable Audit Trails for Regulated Industries
- Authors
- Name
Introduction
Regulators demand answers. "Why did your AI reject this loan application?" If you can't replay the exact context, you're liable. Event sourcing captures every AI decision with full context: the prompt, input data, model version, and output. This post covers building compliant AI systems.
- Why AI Decisions Need Audit Trails
- Event Store for AI Interactions
- Temporal Queries: What Did the AI Say on Date X?
- Replaying Events to Reproduce AI Decisions
- Masking PII in Event Streams While Preserving Audit Capability
- Compaction and Retention Policies
- Versioning AI Model References in Events
- CQRS Read Models for AI Analytics Dashboards
- Checklist
- Conclusion
Why AI Decisions Need Audit Trails
Traditional systems use current state: the user's current balance, current permissions. AI systems need historical state: the exact data the model saw when it made a decision.
GDPR Article 22: The right to explanation. EU citizens can demand to know why an algorithm rejected them. Saying "the model predicted low risk" is not an explanation. You must show: the input data, the model version, the decision rationale.
EU AI Act: Regulated AI systems require detailed audit logs. You must prove the model was trained ethically, data was handled correctly, and decisions are explainable.
Without event sourcing, compliance is impossible. You have no record of what the model saw.
Event Store for AI Interactions
Store everything: the prompt, context, response, model metadata.
interface AIEvent {
eventId: string;
timestamp: Date;
userId: string;
tenantId: string;
aiInteraction: {
model: string;
modelVersion: string;
prompt: string;
systemPrompt: string;
context: {
userHistory: string[];
relevantDocuments: string[];
metadata: Record<string, any>;
};
parameters: {
temperature: number;
maxTokens: number;
topP: number;
};
response: {
content: string;
tokens: number;
finishReason: string;
};
decision?: {
approved: boolean;
confidence: number;
reasoning: string;
};
};
}
// Store event
async function recordAIDecision(event: AIEvent) {
// Immutable append-only log
await eventStore.append({
...event,
eventId: uuidv4(),
timestamp: new Date()
});
// Also write to read model for fast queries
await aiDecisions.insert({
eventId: event.eventId,
userId: event.userId,
tenantId: event.tenantId,
model: event.aiInteraction.model,
decision: event.aiInteraction.decision,
timestamp: event.timestamp
});
}
// Example: loan application
const loanEvent: AIEvent = {
eventId: '',
timestamp: new Date(),
userId: 'user_123',
tenantId: 'bank_456',
aiInteraction: {
model: 'gpt-4o',
modelVersion: 'gpt-4o-2024-05-13',
prompt: 'Evaluate this loan application',
systemPrompt: 'You are a loan underwriting assistant. Evaluate based on: credit score, income, debt-to-income ratio, and employment history. Respond with: { approved: boolean, confidence: 0-1, reasoning: string }',
context: {
userHistory: ['User has 2 previous approved loans'],
relevantDocuments: ['Credit report: score 720', 'Income verification: $120k/year', 'Tax return 2025'],
metadata: {
creditScore: 720,
annualIncome: 120000,
dtiRatio: 0.35,
employmentYears: 5
}
},
parameters: {
temperature: 0.3,
maxTokens: 500,
topP: 1
},
response: {
content: '{ "approved": true, "confidence": 0.92, "reasoning": "Strong credit score, stable income, reasonable debt-to-income ratio" }',
tokens: 45,
finishReason: 'stop'
},
decision: {
approved: true,
confidence: 0.92,
reasoning: 'Strong credit score, stable income, reasonable debt-to-income ratio'
}
}
};
await recordAIDecision(loanEvent);
Every decision is traceable. Auditors can verify the exact inputs and outputs.
Temporal Queries: What Did the AI Say on Date X?
Replay decisions from specific points in time.
async function getDecisionAtTime(userId, tenantId, timestamp) {
// Get all events for this user up to the timestamp
const events = await eventStore.query({
userId,
tenantId,
timestamp: { $lte: timestamp }
}).sort({ timestamp: -1 });
if (events.length === 0) return null;
// The most recent event before that timestamp
return events[0];
}
// Example: "What decision was made on loan_123 at 2026-03-15?"
const decision = await getDecisionAtTime('user_123', 'bank_456', new Date('2026-03-15'));
console.log(decision.aiInteraction.decision.reasoning);
// Output: "Strong credit score, stable income, reasonable debt-to-income ratio"
Temporal queries let you reconstruct the exact state at any moment in time.
Replaying Events to Reproduce AI Decisions
Replay events through a model to verify decisions or test new models.
async function replayDecision(eventId) {
const event = await eventStore.get(eventId);
// Re-run the same prompt through the same model version
const response = await openai.createChatCompletion({
model: event.aiInteraction.model,
messages: [
{
role: 'system',
content: event.aiInteraction.systemPrompt
},
{
role: 'user',
content: event.aiInteraction.prompt + '\n\nContext: ' + JSON.stringify(event.aiInteraction.context)
}
],
temperature: event.aiInteraction.parameters.temperature,
max_tokens: event.aiInteraction.parameters.maxTokens
});
const newDecision = JSON.parse(response.choices[0].message.content);
// Compare with original decision
return {
original: event.aiInteraction.decision,
replayed: newDecision,
matches: isEqual(event.aiInteraction.decision, newDecision)
};
}
// Verify the decision is reproducible
const result = await replayDecision('event_abc123');
console.log(result.matches); // true = decision is deterministic
Replay proves decisions are reproducible and model behavior is stable.
Masking PII in Event Streams While Preserving Audit Capability
Store full details for auditors, mask for operators.
class PIIMasker {
// Store full event with PII
async storeEvent(event: AIEvent) {
await eventStore.append(event);
// Create masked version for operational logs
const masked = this.maskEvent(event);
await operationalLog.append(masked);
}
maskEvent(event: AIEvent) {
return {
...event,
userId: this.hashUser(event.userId),
aiInteraction: {
...event.aiInteraction,
context: {
...event.aiInteraction.context,
// Remove PII from context
metadata: this.maskPII(event.aiInteraction.context.metadata),
relevantDocuments: event.aiInteraction.context.relevantDocuments.map(
doc => '[DOCUMENT_MASKED]'
)
},
prompt: this.maskPII(event.aiInteraction.prompt)
}
};
}
maskPII(text: string) {
// Simple regex-based masking; use better library in production
return text
.replace(/\b\d{3}-\d{2}-\d{4}\b/g, '[SSN]')
.replace(/\b\d{16}\b/g, '[CARD]')
.replace(/\b[\w.-]+@[\w.-]+\b/g, '[EMAIL]');
}
hashUser(userId: string) {
return crypto.createHash('sha256').update(userId).digest('hex');
}
}
// Auditors can decrypt and view full events
async function getFullEventForAudit(eventId, auditorRole) {
if (auditorRole !== 'compliance_officer') {
throw new Error('Unauthorized');
}
const event = await eventStore.get(eventId);
return event; // Full, unmasked
}
Operators see masked logs. Auditors and compliance teams see full events. This prevents operational incidents from exposing PII.
Compaction and Retention Policies
Event stores grow unbounded. Set retention and compaction policies.
// Retention: keep raw events for 7 years (GDPR minimum)
const retentionDays = 365 * 7;
async function cleanupOldEvents() {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - retentionDays);
// Archive old events to cold storage
const oldEvents = await eventStore.query({
timestamp: { $lt: cutoff }
});
if (oldEvents.length > 0) {
await s3.putObject({
Bucket: 'archive-bucket',
Key: `events/archive-${cutoff.toISOString()}.jsonl`,
Body: oldEvents.map(e => JSON.stringify(e)).join('\n')
});
// Delete from hot storage
await eventStore.delete({ timestamp: { $lt: cutoff } });
}
}
// Run weekly
scheduler.every('1 week', () => cleanupOldEvents());
// Compaction: summarize old events
async function compactEvents(startDate, endDate) {
const events = await eventStore.query({
timestamp: { $gte: startDate, $lte: endDate }
});
const summary = {
startDate,
endDate,
totalDecisions: events.length,
approvedCount: events.filter(e => e.aiInteraction.decision?.approved).length,
deniedCount: events.filter(e => !e.aiInteraction.decision?.approved).length,
averageConfidence: events.reduce((sum, e) => sum + (e.aiInteraction.decision?.confidence || 0), 0) / events.length
};
// Store compacted summary
await eventStore.append({
eventType: 'COMPACTION',
data: summary,
eventsCompacted: events.map(e => e.eventId)
});
// Delete original events (keep compaction)
await eventStore.delete({
timestamp: { $gte: startDate, $lte: endDate }
});
}
Archive old events to S3. Compact summaries reduce storage from TB to GB. Keep detailed events for 7 years as required by law.
Versioning AI Model References in Events
Track which model version made which decision.
async function recordModelMetadata() {
const modelMetadata = {
modelName: 'gpt-4o',
version: 'gpt-4o-2024-05-13',
releaseDate: '2024-05-13',
trainingCutoff: '2024-04-01',
finetuningDataset: 'proprietary_loan_decisions_2024',
evaluationMetrics: {
accuracy: 0.94,
precision: 0.91,
recall: 0.93,
fairnessScore: 0.89 // protected attributes
},
certifications: [
{ name: 'EU_AI_ACT', level: 'HIGH_RISK' },
{ name: 'SOC2', passed: true }
]
};
await modelVersionStore.insert(modelMetadata);
}
// When making a decision, reference the model version
const event = {
aiInteraction: {
model: 'gpt-4o',
modelVersion: 'gpt-4o-2024-05-13', // Reference to version metadata
modelMetadataId: 'meta_abc123',
...
}
};
Version metadata includes training data, evaluation metrics, and certifications. Link every decision to the model version that made it.
CQRS Read Models for AI Analytics Dashboards
Event sourcing stores truth. CQRS read models serve analytics.
// Event: AI decision made
const event = {
eventType: 'AI_DECISION',
tenantId: 'bank_456',
userId: 'user_123',
decision: {
approved: true,
confidence: 0.92,
model: 'gpt-4o-2024-05-13'
},
timestamp: new Date()
};
// Update read models when event occurs
async function updateReadModels(event) {
if (event.eventType === 'AI_DECISION') {
// Update daily metrics
await readModels.dailyMetrics.updateOne(
{ date: event.timestamp.toISOString().split('T')[0], tenantId: event.tenantId },
{
$inc: {
totalDecisions: 1,
approvedCount: event.decision.approved ? 1 : 0,
deniedCount: event.decision.approved ? 0 : 1
},
$set: {
lastUpdated: new Date()
}
},
{ upsert: true }
);
// Update model performance
await readModels.modelMetrics.updateOne(
{ model: event.decision.model },
{
$push: {
confidences: event.decision.confidence
}
}
);
// Update user history
await readModels.userDecisions.insertOne({
userId: event.userId,
tenantId: event.tenantId,
decision: event.decision,
timestamp: event.timestamp
});
}
}
// Dashboard queries read model (fast)
async function getDashboardStats(tenantId) {
const today = new Date().toISOString().split('T')[0];
const metrics = await readModels.dailyMetrics.findOne({ date: today, tenantId });
return {
totalDecisions: metrics?.totalDecisions || 0,
approvalRate: metrics?.approvedCount / metrics?.totalDecisions || 0,
denialRate: metrics?.deniedCount / metrics?.totalDecisions || 0
};
}
Event sourcing stores truth. Read models serve analytics. They're eventually consistent but fast.
Checklist
- Store all AI interactions as immutable events with full context
- Include model version, prompt, parameters, context, and response in events
- Implement temporal queries to view decisions at any point in time
- Support replaying events to verify decision reproducibility
- Mask PII in operational logs while keeping full events for auditors
- Archive events > 7 years old to S3 (GDPR retention)
- Version all model metadata (training data, metrics, certifications)
- Implement CQRS: event store for truth, read models for analytics
- Create audit trail API for compliance teams
- Test compliance before launch (EU AI Act, GDPR Article 22)
Conclusion
Event sourcing transforms AI systems from black boxes into auditable systems. Every decision is recorded with full context. Temporal queries let you answer "what happened?" from any point in time. Masking preserves privacy while maintaining audit capability. For regulated industries, event sourcing is not optional—it's table stakes.