Published on

Integrating LLMs Into Microservices — Async Patterns, Queues, and Service Design

Authors
  • Name
    Twitter

Introduction

LLM calls add latency. A user-facing request that blocks on a 30-second LLM API call creates a poor user experience. Instead, integrate LLMs into microservice architectures using async patterns, job queues, and streaming responses. This post covers synchronous vs. asynchronous LLM calls, job queue patterns, webhook callbacks, circuit breakers, and service contracts.

Synchronous vs. Asynchronous LLM Calls

Choose synchronous for latency-tolerant tasks, asynchronous for better UX in real-time scenarios.

import Anthropic from "@anthropic-ai/sdk";

// Synchronous: Simple, blocking
async function syncLLMCall(prompt: string): Promise<string> {
  const client = new Anthropic();

  const response = await client.messages.create({
    model: "claude-3-5-sonnet-20241022",
    max_tokens: 1024,
    messages: [{ role: "user", content: prompt }],
  });

  return response.content[0].type === "text" ? response.content[0].text : "";
}

// Synchronous usage: Good for background jobs
async function synchronousProcessing() {
  const result = await syncLLMCall("Summarize this text...");
  console.log("Got result:", result);
}

// Asynchronous: Returns immediately, processes in background
interface AsyncJobRequest {
  jobId: string;
  prompt: string;
  callbackUrl?: string;
  createdAt: Date;
  status: "pending" | "processing" | "completed" | "failed";
  result?: string;
  error?: string;
}

class AsyncLLMJobQueue {
  private jobs = new Map<string, AsyncJobRequest>();
  private processingJobs = new Set<string>();
  private maxConcurrency = 5;

  submitJob(prompt: string, callbackUrl?: string): AsyncJobRequest {
    const jobId = `job-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;

    const job: AsyncJobRequest = {
      jobId,
      prompt,
      callbackUrl,
      createdAt: new Date(),
      status: "pending",
    };

    this.jobs.set(jobId, job);

    // Process asynchronously
    this.processNextJob();

    return job;
  }

  getJobStatus(jobId: string): AsyncJobRequest | null {
    return this.jobs.get(jobId) || null;
  }

  private async processNextJob(): Promise<void> {
    // Don''t exceed concurrency limit
    if (this.processingJobs.size >= this.maxConcurrency) {
      return;
    }

    // Find next pending job
    let nextJob: AsyncJobRequest | null = null;

    for (const job of this.jobs.values()) {
      if (job.status === "pending" && !this.processingJobs.has(job.jobId)) {
        nextJob = job;
        break;
      }
    }

    if (!nextJob) return;

    this.processingJobs.add(nextJob.jobId);
    nextJob.status = "processing";

    try {
      const client = new Anthropic();

      const response = await client.messages.create({
        model: "claude-3-5-sonnet-20241022",
        max_tokens: 1024,
        messages: [{ role: "user", content: nextJob.prompt }],
      });

      nextJob.result =
        response.content[0].type === "text" ? response.content[0].text : "";
      nextJob.status = "completed";

      // Call webhook if provided
      if (nextJob.callbackUrl) {
        await this.notifyCallback(nextJob.callbackUrl, nextJob);
      }
    } catch (error) {
      nextJob.error = error instanceof Error ? error.message : String(error);
      nextJob.status = "failed";
    } finally {
      this.processingJobs.delete(nextJob.jobId);

      // Process next job
      setTimeout(() => this.processNextJob(), 100);
    }
  }

  private async notifyCallback(
    callbackUrl: string,
    job: AsyncJobRequest
  ): Promise<void> {
    // In production, use HTTP POST with retry logic
    console.log(`Notifying ${callbackUrl} for job ${job.jobId}`);
  }
}

// Usage: Submit and poll
async function asynchronousProcessing() {
  const queue = new AsyncLLMJobQueue();

  const job = queue.submitJob("Summarize this text...", "https://example.com/callback");

  console.log("Job submitted:", job.jobId);

  // Poll for result
  const pollInterval = setInterval(() => {
    const status = queue.getJobStatus(job.jobId);

    if (status?.status === "completed") {
      console.log("Job completed:", status.result);
      clearInterval(pollInterval);
    } else if (status?.status === "failed") {
      console.error("Job failed:", status.error);
      clearInterval(pollInterval);
    }
  }, 1000);
}

Async LLM via Job Queue

Implement a robust job queue for asynchronous LLM processing with persistence.

interface QueuedJob {
  id: string;
  type: "llm_call" | "batch_processing" | "analysis";
  payload: Record<string, unknown>;
  status: "queued" | "processing" | "completed" | "dead_letter";
  retries: number;
  maxRetries: number;
  createdAt: Date;
  startedAt?: Date;
  completedAt?: Date;
  result?: unknown;
  error?: string;
}

class PersistentJobQueue {
  private queue: QueuedJob[] = [];
  private processing = new Set<string>();
  private deadLetterQueue: QueuedJob[] = [];
  private maxConcurrency = 10;
  private processingInterval: NodeJS.Timer | null = null;

  enqueue(
    type: QueuedJob["type"],
    payload: Record<string, unknown>,
    maxRetries: number = 3
  ): QueuedJob {
    const job: QueuedJob = {
      id: `job-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
      type,
      payload,
      status: "queued",
      retries: 0,
      maxRetries,
      createdAt: new Date(),
    };

    this.queue.push(job);

    // Persist to database in production
    this.persistJob(job);

    if (!this.processingInterval) {
      this.startProcessing();
    }

    return job;
  }

  private startProcessing(): void {
    this.processingInterval = setInterval(() => {
      this.processJobs();
    }, 1000);
  }

  private async processJobs(): Promise<void> {
    while (
      this.processing.size < this.maxConcurrency &&
      this.queue.length > 0
    ) {
      const job = this.queue.shift();

      if (!job) break;

      this.processing.add(job.id);
      job.status = "processing";
      job.startedAt = new Date();

      try {
        const result = await this.executeJob(job);
        job.result = result;
        job.status = "completed";
        job.completedAt = new Date();
      } catch (error) {
        job.retries++;

        if (job.retries < job.maxRetries) {
          // Requeue with exponential backoff
          job.status = "queued";
          this.queue.push(job);

          const backoff = Math.pow(2, job.retries) * 1000;
          await new Promise((resolve) => setTimeout(resolve, backoff));
        } else {
          // Move to dead letter queue
          job.status = "dead_letter";
          job.error = error instanceof Error ? error.message : String(error);
          this.deadLetterQueue.push(job);
        }
      } finally {
        this.processing.delete(job.id);
        this.persistJob(job);
      }
    }
  }

  private async executeJob(job: QueuedJob): Promise<unknown> {
    switch (job.type) {
      case "llm_call":
        return this.executeLLMCall(job.payload as { prompt: string });

      case "batch_processing":
        return this.executeBatchProcessing(job.payload);

      case "analysis":
        return this.executeAnalysis(job.payload);

      default:
        throw new Error(`Unknown job type: ${job.type}`);
    }
  }

  private async executeLLMCall(payload: { prompt: string }): Promise<string> {
    const client = new Anthropic();

    const response = await client.messages.create({
      model: "claude-3-5-sonnet-20241022",
      max_tokens: 1024,
      messages: [{ role: "user", content: payload.prompt }],
    });

    return response.content[0].type === "text" ? response.content[0].text : "";
  }

  private async executeBatchProcessing(
    payload: Record<string, unknown>
  ): Promise<unknown> {
    // Implementation
    return { processed: true };
  }

  private async executeAnalysis(
    payload: Record<string, unknown>
  ): Promise<unknown> {
    // Implementation
    return { analyzed: true };
  }

  getDeadLetterQueue(): QueuedJob[] {
    return this.deadLetterQueue;
  }

  private persistJob(job: QueuedJob): void {
    // In production, save to database (Redis, PostgreSQL, etc.)
    console.log(`Persisting job ${job.id} with status ${job.status}`);
  }
}

Webhook Callback Pattern

Notify clients via webhook when asynchronous jobs complete.

interface WebhookDelivery {
  deliveryId: string;
  webhookUrl: string;
  jobId: string;
  payload: unknown;
  status: "pending" | "delivered" | "failed";
  retries: number;
  nextRetryAt?: Date;
  lastError?: string;
}

class WebhookNotifier {
  private deliveries: WebhookDelivery[] = [];
  private maxRetries = 5;
  private retryDelayMs = 5000;

  async notifyJobCompletion(
    webhookUrl: string,
    jobId: string,
    result: unknown
  ): Promise<WebhookDelivery> {
    const delivery: WebhookDelivery = {
      deliveryId: `delivery-${Date.now()}`,
      webhookUrl,
      jobId,
      payload: { jobId, status: "completed", result },
      status: "pending",
      retries: 0,
    };

    this.deliveries.push(delivery);

    // Send immediately
    await this.sendWebhook(delivery);

    return delivery;
  }

  private async sendWebhook(delivery: WebhookDelivery): Promise<void> {
    try {
      const response = await fetch(delivery.webhookUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify(delivery.payload),
      });

      if (response.ok) {
        delivery.status = "delivered";
      } else {
        throw new Error(`HTTP ${response.status}`);
      }
    } catch (error) {
      delivery.retries++;
      delivery.lastError = error instanceof Error ? error.message : String(error);

      if (delivery.retries < this.maxRetries) {
        // Schedule retry with exponential backoff
        const backoff =
          this.retryDelayMs * Math.pow(2, delivery.retries - 1);
        delivery.nextRetryAt = new Date(Date.now() + backoff);
        delivery.status = "pending";

        setTimeout(() => {
          this.sendWebhook(delivery);
        }, backoff);
      } else {
        delivery.status = "failed";
      }
    }
  }

  getDeliveryStatus(deliveryId: string): WebhookDelivery | null {
    return this.deliveries.find((d) => d.deliveryId === deliveryId) || null;
  }
}

LLM Service as Separate Microservice

Isolate LLM interactions into a dedicated service for reusability and scaling.

interface LLMRequest {
  requestId: string;
  prompt: string;
  systemPrompt?: string;
  model: string;
  maxTokens: number;
  temperature: number;
}

interface LLMResponse {
  requestId: string;
  content: string;
  tokensUsed: {
    input: number;
    output: number;
  };
  latencyMs: number;
  cost: number;
}

class LLMService {
  private client: Anthropic;
  private requestLog: Array<{ request: LLMRequest; response: LLMResponse }> = [];

  constructor() {
    this.client = new Anthropic();
  }

  async generate(request: LLMRequest): Promise<LLMResponse> {
    const startTime = Date.now();

    const response = await this.client.messages.create({
      model: request.model,
      max_tokens: request.maxTokens,
      system: request.systemPrompt,
      messages: [{ role: "user", content: request.prompt }],
    });

    const content =
      response.content[0].type === "text" ? response.content[0].text : "";

    const llmResponse: LLMResponse = {
      requestId: request.requestId,
      content,
      tokensUsed: {
        input: response.usage.input_tokens,
        output: response.usage.output_tokens,
      },
      latencyMs: Date.now() - startTime,
      cost: this.calculateCost(
        response.usage.input_tokens,
        response.usage.output_tokens,
        request.model
      ),
    };

    this.requestLog.push({ request, response: llmResponse });

    return llmResponse;
  }

  async generateBatch(
    requests: LLMRequest[]
  ): Promise<LLMResponse[]> {
    // Process in parallel for efficiency
    const responses = await Promise.all(
      requests.map((req) => this.generate(req))
    );

    return responses;
  }

  // Health check endpoint
  async healthCheck(): Promise<{ status: "healthy" | "degraded" }> {
    try {
      await this.client.messages.create({
        model: "claude-3-5-sonnet-20241022",
        max_tokens: 10,
        messages: [{ role: "user", content: "Say OK" }],
      });

      return { status: "healthy" };
    } catch {
      return { status: "degraded" };
    }
  }

  private calculateCost(
    inputTokens: number,
    outputTokens: number,
    model: string
  ): number {
    // Cost in cents
    const costs: Record<string, { input: number; output: number }> = {
      "claude-3-5-sonnet-20241022": {
        input: 0.003 / 1000,
        output: 0.015 / 1000,
      },
    };

    const modelCost = costs[model] || costs["claude-3-5-sonnet-20241022"];

    return inputTokens * modelCost.input + outputTokens * modelCost.output;
  }
}

Shared LLM Client Library

Create a reusable client library for consistent LLM integration across services.

interface LLMClientConfig {
  apiKey: string;
  defaultModel: string;
  defaultMaxTokens: number;
  requestTimeout: number;
  retryStrategy: {
    maxRetries: number;
    initialBackoffMs: number;
    maxBackoffMs: number;
  };
}

class SharedLLMClient {
  private config: LLMClientConfig;
  private client: Anthropic;

  constructor(config: Partial<LLMClientConfig> = {}) {
    this.config = {
      defaultModel: "claude-3-5-sonnet-20241022",
      defaultMaxTokens: 1024,
      requestTimeout: 30000,
      retryStrategy: {
        maxRetries: 3,
        initialBackoffMs: 100,
        maxBackoffMs: 10000,
      },
      ...config,
      apiKey: config.apiKey || process.env.ANTHROPIC_API_KEY || "",
    };

    this.client = new Anthropic({
      apiKey: this.config.apiKey,
    });
  }

  async complete(
    prompt: string,
    options?: {
      model?: string;
      maxTokens?: number;
      systemPrompt?: string;
      temperature?: number;
    }
  ): Promise<string> {
    const model = options?.model || this.config.defaultModel;
    const maxTokens = options?.maxTokens || this.config.defaultMaxTokens;

    return this.retryWithBackoff(async () => {
      const response = await this.client.messages.create({
        model,
        max_tokens: maxTokens,
        system: options?.systemPrompt,
        messages: [{ role: "user", content: prompt }],
      });

      return response.content[0].type === "text" ? response.content[0].text : "";
    });
  }

  private async retryWithBackoff<T>(
    fn: () => Promise<T>
  ): Promise<T> {
    let lastError: Error | undefined;

    for (
      let attempt = 1;
      attempt <= this.config.retryStrategy.maxRetries;
      attempt++
    ) {
      try {
        return await fn();
      } catch (error) {
        lastError = error instanceof Error ? error : new Error(String(error));

        if (attempt < this.config.retryStrategy.maxRetries) {
          const backoff = Math.min(
            this.config.retryStrategy.initialBackoffMs *
              Math.pow(2, attempt - 1),
            this.config.retryStrategy.maxBackoffMs
          );

          await new Promise((resolve) => setTimeout(resolve, backoff));
        }
      }
    }

    throw lastError || new Error("Unknown error");
  }
}

Streaming Responses from Microservice

Stream LLM responses for real-time updates without waiting for completion.

async function streamLLMResponse(
  request: { prompt: string; model: string },
  res: any // HTTP response object
): Promise<void> {
  const client = new Anthropic();

  // Set response headers for streaming
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  try {
    // Use streaming API if available
    const stream = await client.messages.stream({
      model: request.model,
      max_tokens: 1024,
      messages: [{ role: "user", content: request.prompt }],
    });

    for await (const event of stream) {
      if (
        event.type === "content_block_delta" &&
        event.delta.type === "text_delta"
      ) {
        // Send text chunk as SSE
        res.write(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`);
      }
    }

    res.write("data: [DONE]\n\n");
    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: String(error) })}\n\n`);
    res.end();
  }
}

Circuit Breaker for LLM Service

Prevent cascading failures when LLM service is degraded using circuit breaker pattern.

type CircuitState = "closed" | "open" | "half-open";

interface CircuitBreakerConfig {
  failureThreshold: number; // Number of failures before opening
  resetTimeoutMs: number; // How long to wait before trying again
  windowMs: number; // Time window for counting failures
}

class LLMCircuitBreaker {
  private state: CircuitState = "closed";
  private failureCount = 0;
  private lastFailureTime?: Date;
  private successCount = 0;
  private config: CircuitBreakerConfig;

  constructor(config: Partial<CircuitBreakerConfig> = {}) {
    this.config = {
      failureThreshold: 5,
      resetTimeoutMs: 60000,
      windowMs: 60000,
      ...config,
    };
  }

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === "open") {
      // Check if we should try again
      const timeSinceLastFailure = Date.now() - (this.lastFailureTime?.getTime() || 0);

      if (timeSinceLastFailure > this.config.resetTimeoutMs) {
        this.state = "half-open";
        this.successCount = 0;
      } else {
        throw new Error("Circuit breaker is open");
      }
    }

    try {
      const result = await fn();

      // Record success
      if (this.state === "half-open") {
        this.successCount++;

        if (this.successCount >= 2) {
          // Close circuit after 2 successful calls
          this.state = "closed";
          this.failureCount = 0;
        }
      }

      return result;
    } catch (error) {
      // Record failure
      this.failureCount++;
      this.lastFailureTime = new Date();

      // Check if we should open circuit
      if (this.failureCount >= this.config.failureThreshold) {
        this.state = "open";
      }

      throw error;
    }
  }

  getState(): CircuitState {
    return this.state;
  }
}

Service Contract Testing

Test LLM service contracts to ensure consistent behavior across versions.

interface ServiceContract {
  serviceName: string;
  version: string;
  endpoints: Array<{
    name: string;
    input: object;
    expectedOutputPattern: RegExp;
  }>;
}

class ServiceContractTester {
  async testServiceContract(
    service: any,
    contract: ServiceContract
  ): Promise<{ passed: number; failed: number; details: string[] }> {
    const results: string[] = [];
    let passed = 0;
    let failed = 0;

    for (const endpoint of contract.endpoints) {
      try {
        const result = await service[endpoint.name](endpoint.input);

        if (endpoint.expectedOutputPattern.test(result)) {
          passed++;
          results.push(`${endpoint.name}: PASS`);
        } else {
          failed++;
          results.push(`${endpoint.name}: Output doesn''t match pattern`);
        }
      } catch (error) {
        failed++;
        results.push(`${endpoint.name}: ${error}`);
      }
    }

    return { passed, failed, details: results };
  }
}

// Example contract
const llmServiceContract: ServiceContract = {
  serviceName: "LLMService",
  version: "1.0",
  endpoints: [
    {
      name: "generate",
      input: { prompt: "Say hello", maxTokens: 100 },
      expectedOutputPattern: /hello/i,
    },
  ],
};

Checklist

  • Use async job queues to prevent LLM latency from blocking requests
  • Implement webhook callbacks for job completion notifications
  • Isolate LLM interactions into a dedicated microservice
  • Create a shared client library for consistent integration
  • Support streaming responses for real-time updates
  • Implement circuit breaker to prevent cascading failures
  • Test service contracts to ensure compatibility
  • Monitor queue depth and processing latency

Conclusion

Integrating LLMs into microservice architectures requires careful attention to latency and resilience. Use synchronous calls only for background processing where latency is acceptable. For user-facing requests, use async job queues to prevent blocking. Implement webhook callbacks to notify clients when results are ready. Isolate LLM interactions into a dedicated service that can be scaled independently. Use circuit breakers to prevent cascading failures when the LLM service is degraded. Share client libraries across services for consistency. Finally, test service contracts to ensure reliable integration. With these patterns, you''ll build microservice systems that scale efficiently while maintaining responsive user experiences.