Published on

Multi-Agent Orchestration in 2026 — Puppeteer, Specialist Agents, and Production Patterns

Authors

Introduction

Modern production AI systems don''t rely on a single monolithic agent. Instead, they use a puppeteer orchestrator (a high-capability agent) that routes tasks to specialized worker agents. This post covers the architecture, implementation patterns, cost optimization, and observability challenges of multi-agent orchestration in production.

The Orchestrator-Worker Pattern

The pattern is straightforward:

  1. Orchestrator: Receives user requests, decomposes into sub-tasks, routes to workers
  2. Workers: Specialized agents that execute specific task types
  3. Message Queue: Optional, but prevents bottlenecks if orchestrator is slow
  4. Shared State: Context accessible to all agents
interface Task {
  id: string;
  type: 'research' | 'code' | 'analysis' | 'writing';
  input: string;
  status: 'pending' | 'assigned' | 'completed' | 'failed';
  assignedWorker?: string;
  result?: unknown;
  error?: string;
}

interface OrchestratorState {
  tasks: Map<string, Task>;
  workerStats: Map<string, { tasksCompleted: number; avgTime: number }>;
}

The orchestrator analyzes the user''s request and decides which workers to invoke:

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();
const state: OrchestratorState = {
  tasks: new Map(),
  workerStats: new Map(),
};

async function orchestrate(userRequest: string): Promise<string> {
  // Step 1: Orchestrator analyzes request
  const analysisResponse = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 1024,
    tools: [
      {
        name: 'decompose_task',
        description: 'Break down a request into worker tasks',
        input_schema: {
          type: 'object',
          properties: {
            tasks: {
              type: 'array',
              items: {
                type: 'object',
                properties: {
                  type: { enum: ['research', 'code', 'analysis', 'writing'] },
                  description: { type: 'string' },
                  priority: { type: 'number' },
                },
              },
            },
          },
        },
      },
    ],
    messages: [
      {
        role: 'user',
        content: `Decompose this request into tasks for specialist workers: "${userRequest}"`,
      },
    ],
  });

  // Extract task decomposition
  const decomposition = analysisResponse.content[0];

  if (decomposition.type !== 'tool_use') {
    throw new Error('Orchestrator failed to decompose task');
  }

  const tasks = (decomposition.input as { tasks: Array<{ type: string; description: string; priority: number }> }).tasks;

  // Step 2: Route tasks to workers in parallel
  const workerPromises = tasks.map((task) =>
    routeToWorker(task.type, task.description)
  );

  const results = await Promise.all(workerPromises);

  // Step 3: Synthesize results
  const synthesis = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    messages: [
      {
        role: 'user',
        content: `Synthesize these worker outputs into a cohesive response: ${JSON.stringify(results)}`,
      },
    ],
  });

  return synthesis.content[0].type === 'text' ? synthesis.content[0].text : '';
}

async function routeToWorker(
  type: string,
  description: string
): Promise<unknown> {
  const task: Task = {
    id: crypto.randomUUID(),
    type: type as 'research' | 'code' | 'analysis' | 'writing',
    input: description,
    status: 'pending',
  };

  state.tasks.set(task.id, task);

  try {
    let result;

    switch (task.type) {
      case 'research':
        result = await researchWorker(description);
        break;
      case 'code':
        result = await codeWorker(description);
        break;
      case 'analysis':
        result = await analysisWorker(description);
        break;
      case 'writing':
        result = await writingWorker(description);
        break;
    }

    task.status = 'completed';
    task.result = result;
    return result;
  } catch (error) {
    task.status = 'failed';
    task.error = (error as Error).message;
    throw error;
  }
}

Worker Agent Implementation

Each worker is a specialized agent optimized for its task type:

async function codeWorker(request: string): Promise<string> {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 4096,
    system: `You are a expert software engineer. Write production-quality TypeScript code.
Always include error handling, type safety, and comments.`,
    messages: [
      {
        role: 'user',
        content: request,
      },
    ],
  });

  return response.content[0].type === 'text' ? response.content[0].text : '';
}

async function researchWorker(topic: string): Promise<string> {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    system: `You are a research analyst. Provide well-sourced, factual information.
Use concrete examples and cite your reasoning.`,
    messages: [
      {
        role: 'user',
        content: `Research and summarize: ${topic}`,
      },
    ],
  });

  return response.content[0].type === 'text' ? response.content[0].text : '';
}

async function analysisWorker(data: string): Promise<string> {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    system: `You are a data analyst. Provide clear insights, identify patterns,
and explain implications.`,
    messages: [
      {
        role: 'user',
        content: `Analyse this data: ${data}`,
      },
    ],
  });

  return response.content[0].type === 'text' ? response.content[0].text : '';
}

async function writingWorker(content: string): Promise<string> {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 3000,
    system: `You are a professional writer. Produce clear, engaging, well-structured content.`,
    messages: [
      {
        role: 'user',
        content: `Write or refine this content: ${content}`,
      },
    ],
  });

  return response.content[0].type === 'text' ? response.content[0].text : '';
}

Shared State Management

State must be accessible to all agents:

import Redis from 'ioredis';

const redis = new Redis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
});

interface SharedContext {
  userId: string;
  sessionId: string;
  previousResults: Record<string, unknown>;
  constants: Record<string, unknown>;
}

async function saveContext(context: SharedContext): Promise<void> {
  await redis.set(
    `context:${context.sessionId}`,
    JSON.stringify(context),
    'EX',
    3600
  );
}

async function getContext(sessionId: string): Promise<SharedContext | null> {
  const data = await redis.get(`context:${sessionId}`);
  return data ? JSON.parse(data) : null;
}

async function updateContextResult(
  sessionId: string,
  taskType: string,
  result: unknown
): Promise<void> {
  const context = await getContext(sessionId);
  if (!context) return;

  context.previousResults[taskType] = result;
  await saveContext(context);
}

Error Handling and Recovery

When a worker fails, the orchestrator can retry, fallback, or replan:

async function executeTaskWithRetry(
  task: Task,
  maxRetries: number = 3
): Promise<unknown> {
  let lastError;

  for (let attempt = 1; attempt &lt;= maxRetries; attempt++) {
    try {
      return await routeToWorker(task.type, task.input);
    } catch (error) {
      lastError = error;
      const backoffMs = Math.pow(2, attempt - 1) * 1000;

      console.log(
        `Task ${task.id} failed (attempt ${attempt}/${maxRetries}). Retrying in ${backoffMs}ms`
      );

      await new Promise((resolve) => setTimeout(resolve, backoffMs));
    }
  }

  throw new Error(
    `Task ${task.id} failed after ${maxRetries} attempts: ${lastError}`
  );
}

// If primary worker fails, try alternative approach
async function executeTaskWithFallback(task: Task): Promise<unknown> {
  try {
    return await routeToWorker(task.type, task.input);
  } catch (primaryError) {
    console.log(
      `Primary approach failed for ${task.type}. Trying alternative...`
    );

    // Use cheaper, faster model for fallback
    const fallbackResponse = await client.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 1024,
      messages: [
        {
          role: 'user',
          content: `Quickly address this (may be less detailed): ${task.input}`,
        },
      ],
    });

    return fallbackResponse.content[0].type === 'text'
      ? fallbackResponse.content[0].text
      : '';
  }
}

Cost Optimization

Use model selection to reduce costs:

interface ModelChoice {
  modelId: string;
  costPer1kTokens: number;
  speedMs: number;
  qualityScore: number;
}

const modelChoices: Record<string, ModelChoice> = {
  'claude-opus-4-1': {
    modelId: 'claude-opus-4-1',
    costPer1kTokens: 0.015,
    speedMs: 2500,
    qualityScore: 10,
  },
  'claude-3-5-sonnet-20241022': {
    modelId: 'claude-3-5-sonnet-20241022',
    costPer1kTokens: 0.003,
    speedMs: 1500,
    qualityScore: 8,
  },
  'claude-3-5-haiku-20241022': {
    modelId: 'claude-3-5-haiku-20241022',
    costPer1kTokens: 0.0008,
    speedMs: 800,
    qualityScore: 6,
  },
};

function selectModel(taskType: string, quality: 'fast' | 'balanced' | 'best'): ModelChoice {
  if (taskType === 'code') {
    return modelChoices['claude-opus-4-1'];
  }

  if (quality === 'fast') {
    return modelChoices['claude-3-5-haiku-20241022'];
  }

  if (quality === 'balanced') {
    return modelChoices['claude-3-5-sonnet-20241022'];
  }

  return modelChoices['claude-opus-4-1'];
}

Observability and Tracing

Track execution across agent boundaries:

import * as opentelemetry from '@opentelemetry/api';

const tracer = opentelemetry.trace.getTracer('orchestrator');

async function orchestrateWithTracing(
  userRequest: string
): Promise<string> {
  const span = tracer.startSpan('orchestrate');

  try {
    span.addEvent('decomposing_task', {
      requestLength: userRequest.length,
    });

    const tasks = await decomposeTasks(userRequest);

    const workerSpans = tasks.map((task) => {
      const workerSpan = tracer.startSpan('worker_task', {
        parent: span,
        attributes: {
          taskType: task.type,
          taskDescription: task.description,
        },
      });
      return executeTask(task, workerSpan);
    });

    const results = await Promise.all(workerSpans);

    span.addEvent('synthesis_started', { resultCount: results.length });
    const finalResult = await synthesizeResults(results);

    span.setStatus({ code: opentelemetry.SpanStatusCode.OK });
    return finalResult;
  } catch (error) {
    span.recordException(error as Error);
    span.setStatus({
      code: opentelemetry.SpanStatusCode.ERROR,
      message: (error as Error).message,
    });
    throw error;
  } finally {
    span.end();
  }
}

Checklist

  • Design orchestrator-worker architecture
  • Implement task decomposition in orchestrator
  • Build specialized worker agents
  • Set up shared state with Redis
  • Implement retry and fallback logic
  • Optimize model selection by task type
  • Add distributed tracing

Conclusion

Multi-agent orchestration scales AI systems by specializing agents and decoupling responsibilities. Start with a simple orchestrator that decomposes tasks, route to workers, and handle failures gracefully. Use cheaper models for workers, frontier models for orchestrators, and add observability from the start. As your system grows, shared state management and efficient task routing become critical for both cost and latency.