- Published on
Integrating LLMs Into Microservices — Async Patterns, Queues, and Service Design
- Authors
- Name
Introduction
LLM calls add latency. A user-facing request that blocks on a 30-second LLM API call creates a poor user experience. Instead, integrate LLMs into microservice architectures using async patterns, job queues, and streaming responses. This post covers synchronous vs. asynchronous LLM calls, job queue patterns, webhook callbacks, circuit breakers, and service contracts.
- Synchronous vs. Asynchronous LLM Calls
- Async LLM via Job Queue
- Webhook Callback Pattern
- LLM Service as Separate Microservice
- Shared LLM Client Library
- Streaming Responses from Microservice
- Circuit Breaker for LLM Service
- Service Contract Testing
- Checklist
- Conclusion
Synchronous vs. Asynchronous LLM Calls
Choose synchronous for latency-tolerant tasks, asynchronous for better UX in real-time scenarios.
import Anthropic from "@anthropic-ai/sdk";
// Synchronous: Simple, blocking
async function syncLLMCall(prompt: string): Promise<string> {
const client = new Anthropic();
const response = await client.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
return response.content[0].type === "text" ? response.content[0].text : "";
}
// Synchronous usage: Good for background jobs
async function synchronousProcessing() {
const result = await syncLLMCall("Summarize this text...");
console.log("Got result:", result);
}
// Asynchronous: Returns immediately, processes in background
interface AsyncJobRequest {
jobId: string;
prompt: string;
callbackUrl?: string;
createdAt: Date;
status: "pending" | "processing" | "completed" | "failed";
result?: string;
error?: string;
}
class AsyncLLMJobQueue {
private jobs = new Map<string, AsyncJobRequest>();
private processingJobs = new Set<string>();
private maxConcurrency = 5;
submitJob(prompt: string, callbackUrl?: string): AsyncJobRequest {
const jobId = `job-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const job: AsyncJobRequest = {
jobId,
prompt,
callbackUrl,
createdAt: new Date(),
status: "pending",
};
this.jobs.set(jobId, job);
// Process asynchronously
this.processNextJob();
return job;
}
getJobStatus(jobId: string): AsyncJobRequest | null {
return this.jobs.get(jobId) || null;
}
private async processNextJob(): Promise<void> {
// Don''t exceed concurrency limit
if (this.processingJobs.size >= this.maxConcurrency) {
return;
}
// Find next pending job
let nextJob: AsyncJobRequest | null = null;
for (const job of this.jobs.values()) {
if (job.status === "pending" && !this.processingJobs.has(job.jobId)) {
nextJob = job;
break;
}
}
if (!nextJob) return;
this.processingJobs.add(nextJob.jobId);
nextJob.status = "processing";
try {
const client = new Anthropic();
const response = await client.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 1024,
messages: [{ role: "user", content: nextJob.prompt }],
});
nextJob.result =
response.content[0].type === "text" ? response.content[0].text : "";
nextJob.status = "completed";
// Call webhook if provided
if (nextJob.callbackUrl) {
await this.notifyCallback(nextJob.callbackUrl, nextJob);
}
} catch (error) {
nextJob.error = error instanceof Error ? error.message : String(error);
nextJob.status = "failed";
} finally {
this.processingJobs.delete(nextJob.jobId);
// Process next job
setTimeout(() => this.processNextJob(), 100);
}
}
private async notifyCallback(
callbackUrl: string,
job: AsyncJobRequest
): Promise<void> {
// In production, use HTTP POST with retry logic
console.log(`Notifying ${callbackUrl} for job ${job.jobId}`);
}
}
// Usage: Submit and poll
async function asynchronousProcessing() {
const queue = new AsyncLLMJobQueue();
const job = queue.submitJob("Summarize this text...", "https://example.com/callback");
console.log("Job submitted:", job.jobId);
// Poll for result
const pollInterval = setInterval(() => {
const status = queue.getJobStatus(job.jobId);
if (status?.status === "completed") {
console.log("Job completed:", status.result);
clearInterval(pollInterval);
} else if (status?.status === "failed") {
console.error("Job failed:", status.error);
clearInterval(pollInterval);
}
}, 1000);
}
Async LLM via Job Queue
Implement a robust job queue for asynchronous LLM processing with persistence.
interface QueuedJob {
id: string;
type: "llm_call" | "batch_processing" | "analysis";
payload: Record<string, unknown>;
status: "queued" | "processing" | "completed" | "dead_letter";
retries: number;
maxRetries: number;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
result?: unknown;
error?: string;
}
class PersistentJobQueue {
private queue: QueuedJob[] = [];
private processing = new Set<string>();
private deadLetterQueue: QueuedJob[] = [];
private maxConcurrency = 10;
private processingInterval: NodeJS.Timer | null = null;
enqueue(
type: QueuedJob["type"],
payload: Record<string, unknown>,
maxRetries: number = 3
): QueuedJob {
const job: QueuedJob = {
id: `job-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
type,
payload,
status: "queued",
retries: 0,
maxRetries,
createdAt: new Date(),
};
this.queue.push(job);
// Persist to database in production
this.persistJob(job);
if (!this.processingInterval) {
this.startProcessing();
}
return job;
}
private startProcessing(): void {
this.processingInterval = setInterval(() => {
this.processJobs();
}, 1000);
}
private async processJobs(): Promise<void> {
while (
this.processing.size < this.maxConcurrency &&
this.queue.length > 0
) {
const job = this.queue.shift();
if (!job) break;
this.processing.add(job.id);
job.status = "processing";
job.startedAt = new Date();
try {
const result = await this.executeJob(job);
job.result = result;
job.status = "completed";
job.completedAt = new Date();
} catch (error) {
job.retries++;
if (job.retries < job.maxRetries) {
// Requeue with exponential backoff
job.status = "queued";
this.queue.push(job);
const backoff = Math.pow(2, job.retries) * 1000;
await new Promise((resolve) => setTimeout(resolve, backoff));
} else {
// Move to dead letter queue
job.status = "dead_letter";
job.error = error instanceof Error ? error.message : String(error);
this.deadLetterQueue.push(job);
}
} finally {
this.processing.delete(job.id);
this.persistJob(job);
}
}
}
private async executeJob(job: QueuedJob): Promise<unknown> {
switch (job.type) {
case "llm_call":
return this.executeLLMCall(job.payload as { prompt: string });
case "batch_processing":
return this.executeBatchProcessing(job.payload);
case "analysis":
return this.executeAnalysis(job.payload);
default:
throw new Error(`Unknown job type: ${job.type}`);
}
}
private async executeLLMCall(payload: { prompt: string }): Promise<string> {
const client = new Anthropic();
const response = await client.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 1024,
messages: [{ role: "user", content: payload.prompt }],
});
return response.content[0].type === "text" ? response.content[0].text : "";
}
private async executeBatchProcessing(
payload: Record<string, unknown>
): Promise<unknown> {
// Implementation
return { processed: true };
}
private async executeAnalysis(
payload: Record<string, unknown>
): Promise<unknown> {
// Implementation
return { analyzed: true };
}
getDeadLetterQueue(): QueuedJob[] {
return this.deadLetterQueue;
}
private persistJob(job: QueuedJob): void {
// In production, save to database (Redis, PostgreSQL, etc.)
console.log(`Persisting job ${job.id} with status ${job.status}`);
}
}
Webhook Callback Pattern
Notify clients via webhook when asynchronous jobs complete.
interface WebhookDelivery {
deliveryId: string;
webhookUrl: string;
jobId: string;
payload: unknown;
status: "pending" | "delivered" | "failed";
retries: number;
nextRetryAt?: Date;
lastError?: string;
}
class WebhookNotifier {
private deliveries: WebhookDelivery[] = [];
private maxRetries = 5;
private retryDelayMs = 5000;
async notifyJobCompletion(
webhookUrl: string,
jobId: string,
result: unknown
): Promise<WebhookDelivery> {
const delivery: WebhookDelivery = {
deliveryId: `delivery-${Date.now()}`,
webhookUrl,
jobId,
payload: { jobId, status: "completed", result },
status: "pending",
retries: 0,
};
this.deliveries.push(delivery);
// Send immediately
await this.sendWebhook(delivery);
return delivery;
}
private async sendWebhook(delivery: WebhookDelivery): Promise<void> {
try {
const response = await fetch(delivery.webhookUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(delivery.payload),
});
if (response.ok) {
delivery.status = "delivered";
} else {
throw new Error(`HTTP ${response.status}`);
}
} catch (error) {
delivery.retries++;
delivery.lastError = error instanceof Error ? error.message : String(error);
if (delivery.retries < this.maxRetries) {
// Schedule retry with exponential backoff
const backoff =
this.retryDelayMs * Math.pow(2, delivery.retries - 1);
delivery.nextRetryAt = new Date(Date.now() + backoff);
delivery.status = "pending";
setTimeout(() => {
this.sendWebhook(delivery);
}, backoff);
} else {
delivery.status = "failed";
}
}
}
getDeliveryStatus(deliveryId: string): WebhookDelivery | null {
return this.deliveries.find((d) => d.deliveryId === deliveryId) || null;
}
}
LLM Service as Separate Microservice
Isolate LLM interactions into a dedicated service for reusability and scaling.
interface LLMRequest {
requestId: string;
prompt: string;
systemPrompt?: string;
model: string;
maxTokens: number;
temperature: number;
}
interface LLMResponse {
requestId: string;
content: string;
tokensUsed: {
input: number;
output: number;
};
latencyMs: number;
cost: number;
}
class LLMService {
private client: Anthropic;
private requestLog: Array<{ request: LLMRequest; response: LLMResponse }> = [];
constructor() {
this.client = new Anthropic();
}
async generate(request: LLMRequest): Promise<LLMResponse> {
const startTime = Date.now();
const response = await this.client.messages.create({
model: request.model,
max_tokens: request.maxTokens,
system: request.systemPrompt,
messages: [{ role: "user", content: request.prompt }],
});
const content =
response.content[0].type === "text" ? response.content[0].text : "";
const llmResponse: LLMResponse = {
requestId: request.requestId,
content,
tokensUsed: {
input: response.usage.input_tokens,
output: response.usage.output_tokens,
},
latencyMs: Date.now() - startTime,
cost: this.calculateCost(
response.usage.input_tokens,
response.usage.output_tokens,
request.model
),
};
this.requestLog.push({ request, response: llmResponse });
return llmResponse;
}
async generateBatch(
requests: LLMRequest[]
): Promise<LLMResponse[]> {
// Process in parallel for efficiency
const responses = await Promise.all(
requests.map((req) => this.generate(req))
);
return responses;
}
// Health check endpoint
async healthCheck(): Promise<{ status: "healthy" | "degraded" }> {
try {
await this.client.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 10,
messages: [{ role: "user", content: "Say OK" }],
});
return { status: "healthy" };
} catch {
return { status: "degraded" };
}
}
private calculateCost(
inputTokens: number,
outputTokens: number,
model: string
): number {
// Cost in cents
const costs: Record<string, { input: number; output: number }> = {
"claude-3-5-sonnet-20241022": {
input: 0.003 / 1000,
output: 0.015 / 1000,
},
};
const modelCost = costs[model] || costs["claude-3-5-sonnet-20241022"];
return inputTokens * modelCost.input + outputTokens * modelCost.output;
}
}
Shared LLM Client Library
Create a reusable client library for consistent LLM integration across services.
interface LLMClientConfig {
apiKey: string;
defaultModel: string;
defaultMaxTokens: number;
requestTimeout: number;
retryStrategy: {
maxRetries: number;
initialBackoffMs: number;
maxBackoffMs: number;
};
}
class SharedLLMClient {
private config: LLMClientConfig;
private client: Anthropic;
constructor(config: Partial<LLMClientConfig> = {}) {
this.config = {
defaultModel: "claude-3-5-sonnet-20241022",
defaultMaxTokens: 1024,
requestTimeout: 30000,
retryStrategy: {
maxRetries: 3,
initialBackoffMs: 100,
maxBackoffMs: 10000,
},
...config,
apiKey: config.apiKey || process.env.ANTHROPIC_API_KEY || "",
};
this.client = new Anthropic({
apiKey: this.config.apiKey,
});
}
async complete(
prompt: string,
options?: {
model?: string;
maxTokens?: number;
systemPrompt?: string;
temperature?: number;
}
): Promise<string> {
const model = options?.model || this.config.defaultModel;
const maxTokens = options?.maxTokens || this.config.defaultMaxTokens;
return this.retryWithBackoff(async () => {
const response = await this.client.messages.create({
model,
max_tokens: maxTokens,
system: options?.systemPrompt,
messages: [{ role: "user", content: prompt }],
});
return response.content[0].type === "text" ? response.content[0].text : "";
});
}
private async retryWithBackoff<T>(
fn: () => Promise<T>
): Promise<T> {
let lastError: Error | undefined;
for (
let attempt = 1;
attempt <= this.config.retryStrategy.maxRetries;
attempt++
) {
try {
return await fn();
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (attempt < this.config.retryStrategy.maxRetries) {
const backoff = Math.min(
this.config.retryStrategy.initialBackoffMs *
Math.pow(2, attempt - 1),
this.config.retryStrategy.maxBackoffMs
);
await new Promise((resolve) => setTimeout(resolve, backoff));
}
}
}
throw lastError || new Error("Unknown error");
}
}
Streaming Responses from Microservice
Stream LLM responses for real-time updates without waiting for completion.
async function streamLLMResponse(
request: { prompt: string; model: string },
res: any // HTTP response object
): Promise<void> {
const client = new Anthropic();
// Set response headers for streaming
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
// Use streaming API if available
const stream = await client.messages.stream({
model: request.model,
max_tokens: 1024,
messages: [{ role: "user", content: request.prompt }],
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
// Send text chunk as SSE
res.write(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: String(error) })}\n\n`);
res.end();
}
}
Circuit Breaker for LLM Service
Prevent cascading failures when LLM service is degraded using circuit breaker pattern.
type CircuitState = "closed" | "open" | "half-open";
interface CircuitBreakerConfig {
failureThreshold: number; // Number of failures before opening
resetTimeoutMs: number; // How long to wait before trying again
windowMs: number; // Time window for counting failures
}
class LLMCircuitBreaker {
private state: CircuitState = "closed";
private failureCount = 0;
private lastFailureTime?: Date;
private successCount = 0;
private config: CircuitBreakerConfig;
constructor(config: Partial<CircuitBreakerConfig> = {}) {
this.config = {
failureThreshold: 5,
resetTimeoutMs: 60000,
windowMs: 60000,
...config,
};
}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === "open") {
// Check if we should try again
const timeSinceLastFailure = Date.now() - (this.lastFailureTime?.getTime() || 0);
if (timeSinceLastFailure > this.config.resetTimeoutMs) {
this.state = "half-open";
this.successCount = 0;
} else {
throw new Error("Circuit breaker is open");
}
}
try {
const result = await fn();
// Record success
if (this.state === "half-open") {
this.successCount++;
if (this.successCount >= 2) {
// Close circuit after 2 successful calls
this.state = "closed";
this.failureCount = 0;
}
}
return result;
} catch (error) {
// Record failure
this.failureCount++;
this.lastFailureTime = new Date();
// Check if we should open circuit
if (this.failureCount >= this.config.failureThreshold) {
this.state = "open";
}
throw error;
}
}
getState(): CircuitState {
return this.state;
}
}
Service Contract Testing
Test LLM service contracts to ensure consistent behavior across versions.
interface ServiceContract {
serviceName: string;
version: string;
endpoints: Array<{
name: string;
input: object;
expectedOutputPattern: RegExp;
}>;
}
class ServiceContractTester {
async testServiceContract(
service: any,
contract: ServiceContract
): Promise<{ passed: number; failed: number; details: string[] }> {
const results: string[] = [];
let passed = 0;
let failed = 0;
for (const endpoint of contract.endpoints) {
try {
const result = await service[endpoint.name](endpoint.input);
if (endpoint.expectedOutputPattern.test(result)) {
passed++;
results.push(`✓ ${endpoint.name}: PASS`);
} else {
failed++;
results.push(`✗ ${endpoint.name}: Output doesn''t match pattern`);
}
} catch (error) {
failed++;
results.push(`✗ ${endpoint.name}: ${error}`);
}
}
return { passed, failed, details: results };
}
}
// Example contract
const llmServiceContract: ServiceContract = {
serviceName: "LLMService",
version: "1.0",
endpoints: [
{
name: "generate",
input: { prompt: "Say hello", maxTokens: 100 },
expectedOutputPattern: /hello/i,
},
],
};
Checklist
- Use async job queues to prevent LLM latency from blocking requests
- Implement webhook callbacks for job completion notifications
- Isolate LLM interactions into a dedicated microservice
- Create a shared client library for consistent integration
- Support streaming responses for real-time updates
- Implement circuit breaker to prevent cascading failures
- Test service contracts to ensure compatibility
- Monitor queue depth and processing latency
Conclusion
Integrating LLMs into microservice architectures requires careful attention to latency and resilience. Use synchronous calls only for background processing where latency is acceptable. For user-facing requests, use async job queues to prevent blocking. Implement webhook callbacks to notify clients when results are ready. Isolate LLM interactions into a dedicated service that can be scaled independently. Use circuit breakers to prevent cascading failures when the LLM service is degraded. Share client libraries across services for consistency. Finally, test service contracts to ensure reliable integration. With these patterns, you''ll build microservice systems that scale efficiently while maintaining responsive user experiences.