Published on

CrewAI in Production — Building Multi-Agent Teams That Actually Deliver

Authors

Introduction

CrewAI is a framework for coordinating multiple AI agents into teams. Unlike single-agent systems, CrewAI manages communication, memory, and task execution across agents. This post covers building production-ready crews, memory architectures, custom tools, and deployment patterns for enterprise AI systems.

Core CrewAI Concepts

CrewAI organizes AI coordination around four concepts:

Agent: An autonomous actor with a role, goal, and tools Task: A unit of work assigned to an agent Crew: A group of agents working together Process: How agents coordinate (sequential or hierarchical)

import { Agent, Task, Crew, Process } from 'crewai';

// Define an agent
const researchAgent = new Agent({
  role: 'Research Analyst',
  goal: 'Find accurate, factual information on topics',
  backstory:
    'You are an expert researcher with deep knowledge of information gathering.',
  tools: [searchTool, webScraperTool],
  maxIterations: 10,
});

const writerAgent = new Agent({
  role: 'Content Writer',
  goal: 'Write engaging, well-structured content',
  backstory:
    'You are a professional writer with years of experience in technical and general writing.',
  tools: [documentationTool],
  maxIterations: 5,
});

// Define tasks
const researchTask = new Task({
  description: 'Research the topic of AI agents and provide a summary',
  agent: researchAgent,
  expectedOutput: 'A detailed summary of findings with sources',
});

const writeTask = new Task({
  description: 'Write a blog post based on research findings',
  agent: writerAgent,
  expectedOutput: 'A complete, well-written blog post',
});

// Create crew
const crew = new Crew({
  agents: [researchAgent, writerAgent],
  tasks: [researchTask, writeTask],
  process: Process.SEQUENTIAL, // Execute tasks in order
  verbose: true,
});

// Execute
const result = await crew.kickoff();
console.log(result);

Sequential vs Hierarchical Process

Sequential: Agents execute tasks one after another in defined order.

const sequentialCrew = new Crew({
  agents: [researchAgent, writerAgent],
  tasks: [researchTask, writeTask],
  process: Process.SEQUENTIAL,
});

// Task 1 (research) completes → Task 2 (write) starts

Hierarchical: A manager agent orchestrates work assignments and coordinates.

const managerAgent = new Agent({
  role: 'Project Manager',
  goal: 'Coordinate team to complete all tasks efficiently',
  backstory:
    'You manage a team of specialists. Assign tasks and ensure quality.',
  tools: [coordinationTool],
});

const hierarchicalCrew = new Crew({
  agents: [managerAgent, researchAgent, writerAgent],
  tasks: [researchTask, writeTask],
  process: Process.HIERARCHICAL,
  managerAgent,
});

Custom Tools with @tool Decorator

Define custom tools that agents can invoke:

import { tool } from 'crewai';
import postgres from 'postgres';

const sql = postgres(process.env.DATABASE_URL!);

const databaseQueryTool = tool({
  name: 'query_database',
  description:
    'Execute SQL queries against the analytics database. Use for data analysis tasks.',
  function: async (sqlQuery: string) => {
    try {
      const result = await sql.unsafe(sqlQuery);
      return JSON.stringify(result, null, 2);
    } catch (error) {
      return `Error executing query: ${(error as Error).message}`;
    }
  },
});

const customerLookupTool = tool({
  name: 'lookup_customer',
  description: 'Find customer information by ID or email',
  function: async (identifier: string) => {
    const customer = await sql`
      SELECT id, name, email, plan, created_at
      FROM customers
      WHERE id = ${identifier} OR email = ${identifier}
    `;

    if (customer.length === 0) {
      return 'Customer not found';
    }

    return JSON.stringify(customer[0], null, 2);
  },
});

const slackNotificationTool = tool({
  name: 'send_slack_message',
  description: 'Send a message to a Slack channel',
  function: async (channel: string, message: string) => {
    // Implementation would use Slack SDK
    return `Message sent to #${channel}`;
  },
});

Memory Systems: Short-term, Long-term, Entity

CrewAI provides three memory types:

Short-term: Conversation history within current execution Long-term: Persistent memory across executions (typically vector DB) Entity: Knowledge about specific entities

import { Agent, Task, Crew, Memory } from 'crewai';

const agent = new Agent({
  role: 'Customer Support Specialist',
  goal: 'Provide excellent customer support',
  backstory: 'You are an experienced support specialist.',
  tools: [customerLookupTool, knowledgeBaseTool],
  memory: {
    shortTermMemory: {
      enabled: true,
      contextWindow: 10, // Keep last 10 messages
    },
    longTermMemory: {
      enabled: true,
      type: 'vector', // Use vector database
      embeddings: 'openai', // Use OpenAI embeddings
      storage: {
        type: 'chroma',
        host: 'localhost',
        port: 8000,
      },
    },
    entityMemory: {
      enabled: true,
      storage: 'redis', // Store entity knowledge in Redis
    },
  },
});

interface MemoryEntry {
  content: string;
  metadata: {
    timestamp: Date;
    agent: string;
    taskId: string;
  };
}

class EntityMemoryManager {
  private redis: any;

  constructor(redisUrl: string) {
    // Initialize Redis connection
  }

  async rememberEntity(
    entityId: string,
    knowledge: string
  ): Promise<void> {
    await this.redis.hset(
      `entity:${entityId}`,
      'knowledge',
      knowledge,
      'updated_at',
      new Date().toISOString()
    );
  }

  async recallEntity(entityId: string): Promise<string | null> {
    return await this.redis.hget(`entity:${entityId}`, 'knowledge');
  }

  async updateEntityKnowledge(
    entityId: string,
    newKnowledge: string
  ): Promise<void> {
    const existing = await this.recallEntity(entityId);
    const combined = existing
      ? `${existing}\n[Updated] ${newKnowledge}`
      : newKnowledge;
    await this.rememberEntity(entityId, combined);
  }
}

Connecting to Custom LLMs

CrewAI supports Ollama, Azure OpenAI, and other providers:

import { Agent, Task, Crew } from 'crewai';

// Use local Ollama model
const localAgent = new Agent({
  role: 'Analysis Agent',
  goal: 'Analyse data and provide insights',
  backstory: 'You are a data analyst.',
  tools: [],
  llm: {
    provider: 'ollama',
    model: 'mistral:latest',
    baseUrl: 'http://localhost:11434',
    temperature: 0.7,
  },
});

// Use Azure OpenAI
const azureAgent = new Agent({
  role: 'Production Agent',
  goal: 'Generate production-ready code',
  backstory: 'You are a senior software engineer.',
  tools: [],
  llm: {
    provider: 'azure',
    model: 'gpt-4-turbo',
    apiKey: process.env.AZURE_OPENAI_API_KEY,
    endpoint: process.env.AZURE_OPENAI_ENDPOINT,
    temperature: 0.3,
  },
});

// Cost optimization: use cheap model for simple tasks
const cheapAgent = new Agent({
  role: 'Summarizer',
  goal: 'Summarise content',
  backstory: 'You summarise text efficiently.',
  tools: [],
  llm: {
    provider: 'openai',
    model: 'gpt-3.5-turbo',
    temperature: 0.5,
  },
});

Async Crew Execution

Run crews asynchronously for scalability:

interface CrewExecutionJob {
  jobId: string;
  crewId: string;
  status: 'pending' | 'running' | 'completed' | 'failed';
  input: Record<string, unknown>;
  result?: unknown;
  error?: string;
  createdAt: Date;
  completedAt?: Date;
}

const executionQueue = new Map<string, CrewExecutionJob>();

async function executeCrewAsync(
  crewId: string,
  input: Record<string, unknown>
): Promise<string> {
  const jobId = crypto.randomUUID();

  const job: CrewExecutionJob = {
    jobId,
    crewId,
    status: 'pending',
    input,
    createdAt: new Date(),
  };

  executionQueue.set(jobId, job);

  // Queue the work
  await jobQueue.enqueue({
    type: 'crew_execution',
    jobId,
    crewId,
    input,
  });

  return jobId;
}

async function getJobStatus(jobId: string): Promise<CrewExecutionJob | null> {
  return executionQueue.get(jobId) || null;
}

// Worker process
async function processCrewJob(job: any) {
  const executionJob = executionQueue.get(job.jobId);
  if (!executionJob) return;

  try {
    executionJob.status = 'running';

    const crew = crews.get(job.crewId);
    if (!crew) {
      throw new Error(`Crew ${job.crewId} not found`);
    }

    const result = await crew.kickoff(job.input);

    executionJob.status = 'completed';
    executionJob.result = result;
    executionJob.completedAt = new Date();
  } catch (error) {
    executionJob.status = 'failed';
    executionJob.error = (error as Error).message;
    executionJob.completedAt = new Date();
  }
}

Handling Crew Failures

Implement recovery and retry strategies:

interface CrewExecutionConfig {
  maxRetries: number;
  timeoutMs: number;
  onRetry?: (attempt: number, error: Error) => void;
  onFailure?: (error: Error) => void;
}

async function executeWithRetry(
  crew: Crew,
  input: Record<string, unknown>,
  config: CrewExecutionConfig
): Promise<unknown> {
  let lastError;

  for (let attempt = 1; attempt &lt;= config.maxRetries; attempt++) {
    try {
      const result = await Promise.race([
        crew.kickoff(input),
        new Promise((_, reject) =>
          setTimeout(
            () => reject(new Error('Crew execution timeout')),
            config.timeoutMs
          )
        ),
      ]);

      return result;
    } catch (error) {
      lastError = error;

      if (attempt &lt; config.maxRetries) {
        const backoffMs = Math.pow(2, attempt - 1) * 1000;
        console.log(
          `Crew execution failed (attempt ${attempt}/${config.maxRetries}). Retrying in ${backoffMs}ms`
        );

        if (config.onRetry) {
          config.onRetry(attempt, error as Error);
        }

        await new Promise((resolve) => setTimeout(resolve, backoffMs));
      }
    }
  }

  if (config.onFailure) {
    config.onFailure(lastError as Error);
  }

  throw lastError;
}

CrewAI Flows for Event-Driven Agents

Use flows for complex, event-triggered workflows:

import { Flow, listen } from 'crewai';

const supportFlow = new Flow({
  name: 'customer-support-flow',
  description: 'Handles customer support tickets',
});

// Define flow states and transitions
supportFlow.addState('new_ticket', async (context) => {
  const { ticketId, message } = context;

  const analysisTask = new Task({
    description: `Analyse this ticket: ${message}`,
    agent: analyzerAgent,
    expectedOutput: 'Ticket classification and priority',
  });

  const result = await crew.kickoff();
  context.analysis = result;

  // Route to next state based on priority
  if (result.priority === 'urgent') {
    return supportFlow.transition('urgent_handling', context);
  } else {
    return supportFlow.transition('standard_handling', context);
  }
});

supportFlow.addState('urgent_handling', async (context) => {
  // Immediate escalation
  context.escalated = true;
  return supportFlow.transition('completed', context);
});

supportFlow.addState('standard_handling', async (context) => {
  // Standard response generation
  context.responseGenerated = true;
  return supportFlow.transition('completed', context);
});

supportFlow.addState('completed', async (context) => {
  // Final state
  return context;
});

// Listen for events
listen('ticket.created', async (event) => {
  const result = await supportFlow.execute({
    ticketId: event.ticketId,
    message: event.message,
  });

  console.log('Ticket processed:', result);
});

Integration with FastAPI/Express

Expose crews via REST API:

import express, { Express, Request, Response } from 'express';

const app: Express = express();
app.use(express.json());

const crewMap = new Map<string, Crew>();

// Register crews
crewMap.set('support', supportCrew);
crewMap.set('research', researchCrew);

// Execute crew
app.post('/crews/:crewId/execute', async (req: Request, res: Response) => {
  const { crewId } = req.params;
  const { input } = req.body;

  const crew = crewMap.get(crewId);
  if (!crew) {
    return res.status(404).json({ error: 'Crew not found' });
  }

  try {
    const result = await executeWithRetry(crew, input, {
      maxRetries: 3,
      timeoutMs: 30000,
    });

    res.json({
      success: true,
      result,
      executedAt: new Date(),
    });
  } catch (error) {
    res.status(500).json({
      error: (error as Error).message,
    });
  }
});

// Async execution
app.post('/crews/:crewId/execute-async', async (req: Request, res: Response) => {
  const { crewId } = req.params;
  const { input } = req.body;

  const jobId = await executeCrewAsync(crewId, input);

  res.status(202).json({ jobId });
});

// Check job status
app.get('/jobs/:jobId', (req: Request, res: Response) => {
  const job = getJobStatus(req.params.jobId);

  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }

  res.json(job);
});

app.listen(3000, () => {
  console.log('CrewAI API running on port 3000');
});

Cost Tracking Per Crew Run

Monitor expenses for billing and optimization:

interface CostTracker {
  crewId: string;
  executionId: string;
  tokensUsed: number;
  costEstimate: number;
  agents: Array<{
    agentId: string;
    modelUsed: string;
    tokensUsed: number;
  }>;
}

const costTrackers = new Map<string, CostTracker>();

async function trackCrewCosts(
  crewId: string,
  executionId: string
): Promise<void> {
  const tracker: CostTracker = {
    crewId,
    executionId,
    tokensUsed: 0,
    costEstimate: 0,
    agents: [],
  };

  // Intercept API calls to count tokens
  // (In real implementation, hook into client library)

  costTrackers.set(`${crewId}:${executionId}`, tracker);
}

function getCostReport(crewId: string): any {
  const entries = Array.from(costTrackers.entries()).filter(([key]) =>
    key.startsWith(crewId)
  );

  const totalCost = entries.reduce((sum, [, tracker]) => {
    return sum + tracker.costEstimate;
  }, 0);

  const avgCostPerRun = totalCost / entries.length;

  return {
    crewId,
    totalExecutions: entries.length,
    totalCost,
    avgCostPerRun,
    executions: entries.map(([, tracker]) => tracker),
  };
}

Checklist

  • Design crew composition (agents, tasks, process)
  • Implement custom tools with validation
  • Configure memory systems (short, long, entity)
  • Integrate custom LLMs and optimize costs
  • Build async execution with job queue
  • Add retry logic and error handling
  • Deploy as REST API service
  • Implement cost tracking and analytics

Conclusion

CrewAI scales multi-agent systems from simple sequential pipelines to complex hierarchical teams. Start with sequential crews, add tools and memory, then graduate to hierarchical management and event-driven flows. Use async execution for production scalability, custom LLM integration for cost control, and robust error handling for reliability. As your AI infrastructure grows, CrewAI''s team coordination primitives keep systems manageable and cost-effective.