- Published on
CrewAI in Production — Building Multi-Agent Teams That Actually Deliver
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
CrewAI is a framework for coordinating multiple AI agents into teams. Unlike single-agent systems, CrewAI manages communication, memory, and task execution across agents. This post covers building production-ready crews, memory architectures, custom tools, and deployment patterns for enterprise AI systems.
- Core CrewAI Concepts
- Sequential vs Hierarchical Process
- Custom Tools with @tool Decorator
- Memory Systems: Short-term, Long-term, Entity
- Connecting to Custom LLMs
- Async Crew Execution
- Handling Crew Failures
- CrewAI Flows for Event-Driven Agents
- Integration with FastAPI/Express
- Cost Tracking Per Crew Run
- Checklist
- Conclusion
Core CrewAI Concepts
CrewAI organizes AI coordination around four concepts:
Agent: An autonomous actor with a role, goal, and tools Task: A unit of work assigned to an agent Crew: A group of agents working together Process: How agents coordinate (sequential or hierarchical)
import { Agent, Task, Crew, Process } from 'crewai';
// Define an agent
const researchAgent = new Agent({
role: 'Research Analyst',
goal: 'Find accurate, factual information on topics',
backstory:
'You are an expert researcher with deep knowledge of information gathering.',
tools: [searchTool, webScraperTool],
maxIterations: 10,
});
const writerAgent = new Agent({
role: 'Content Writer',
goal: 'Write engaging, well-structured content',
backstory:
'You are a professional writer with years of experience in technical and general writing.',
tools: [documentationTool],
maxIterations: 5,
});
// Define tasks
const researchTask = new Task({
description: 'Research the topic of AI agents and provide a summary',
agent: researchAgent,
expectedOutput: 'A detailed summary of findings with sources',
});
const writeTask = new Task({
description: 'Write a blog post based on research findings',
agent: writerAgent,
expectedOutput: 'A complete, well-written blog post',
});
// Create crew
const crew = new Crew({
agents: [researchAgent, writerAgent],
tasks: [researchTask, writeTask],
process: Process.SEQUENTIAL, // Execute tasks in order
verbose: true,
});
// Execute
const result = await crew.kickoff();
console.log(result);
Sequential vs Hierarchical Process
Sequential: Agents execute tasks one after another in defined order.
const sequentialCrew = new Crew({
agents: [researchAgent, writerAgent],
tasks: [researchTask, writeTask],
process: Process.SEQUENTIAL,
});
// Task 1 (research) completes → Task 2 (write) starts
Hierarchical: A manager agent orchestrates work assignments and coordinates.
const managerAgent = new Agent({
role: 'Project Manager',
goal: 'Coordinate team to complete all tasks efficiently',
backstory:
'You manage a team of specialists. Assign tasks and ensure quality.',
tools: [coordinationTool],
});
const hierarchicalCrew = new Crew({
agents: [managerAgent, researchAgent, writerAgent],
tasks: [researchTask, writeTask],
process: Process.HIERARCHICAL,
managerAgent,
});
Custom Tools with @tool Decorator
Define custom tools that agents can invoke:
import { tool } from 'crewai';
import postgres from 'postgres';
const sql = postgres(process.env.DATABASE_URL!);
const databaseQueryTool = tool({
name: 'query_database',
description:
'Execute SQL queries against the analytics database. Use for data analysis tasks.',
function: async (sqlQuery: string) => {
try {
const result = await sql.unsafe(sqlQuery);
return JSON.stringify(result, null, 2);
} catch (error) {
return `Error executing query: ${(error as Error).message}`;
}
},
});
const customerLookupTool = tool({
name: 'lookup_customer',
description: 'Find customer information by ID or email',
function: async (identifier: string) => {
const customer = await sql`
SELECT id, name, email, plan, created_at
FROM customers
WHERE id = ${identifier} OR email = ${identifier}
`;
if (customer.length === 0) {
return 'Customer not found';
}
return JSON.stringify(customer[0], null, 2);
},
});
const slackNotificationTool = tool({
name: 'send_slack_message',
description: 'Send a message to a Slack channel',
function: async (channel: string, message: string) => {
// Implementation would use Slack SDK
return `Message sent to #${channel}`;
},
});
Memory Systems: Short-term, Long-term, Entity
CrewAI provides three memory types:
Short-term: Conversation history within current execution Long-term: Persistent memory across executions (typically vector DB) Entity: Knowledge about specific entities
import { Agent, Task, Crew, Memory } from 'crewai';
const agent = new Agent({
role: 'Customer Support Specialist',
goal: 'Provide excellent customer support',
backstory: 'You are an experienced support specialist.',
tools: [customerLookupTool, knowledgeBaseTool],
memory: {
shortTermMemory: {
enabled: true,
contextWindow: 10, // Keep last 10 messages
},
longTermMemory: {
enabled: true,
type: 'vector', // Use vector database
embeddings: 'openai', // Use OpenAI embeddings
storage: {
type: 'chroma',
host: 'localhost',
port: 8000,
},
},
entityMemory: {
enabled: true,
storage: 'redis', // Store entity knowledge in Redis
},
},
});
interface MemoryEntry {
content: string;
metadata: {
timestamp: Date;
agent: string;
taskId: string;
};
}
class EntityMemoryManager {
private redis: any;
constructor(redisUrl: string) {
// Initialize Redis connection
}
async rememberEntity(
entityId: string,
knowledge: string
): Promise<void> {
await this.redis.hset(
`entity:${entityId}`,
'knowledge',
knowledge,
'updated_at',
new Date().toISOString()
);
}
async recallEntity(entityId: string): Promise<string | null> {
return await this.redis.hget(`entity:${entityId}`, 'knowledge');
}
async updateEntityKnowledge(
entityId: string,
newKnowledge: string
): Promise<void> {
const existing = await this.recallEntity(entityId);
const combined = existing
? `${existing}\n[Updated] ${newKnowledge}`
: newKnowledge;
await this.rememberEntity(entityId, combined);
}
}
Connecting to Custom LLMs
CrewAI supports Ollama, Azure OpenAI, and other providers:
import { Agent, Task, Crew } from 'crewai';
// Use local Ollama model
const localAgent = new Agent({
role: 'Analysis Agent',
goal: 'Analyse data and provide insights',
backstory: 'You are a data analyst.',
tools: [],
llm: {
provider: 'ollama',
model: 'mistral:latest',
baseUrl: 'http://localhost:11434',
temperature: 0.7,
},
});
// Use Azure OpenAI
const azureAgent = new Agent({
role: 'Production Agent',
goal: 'Generate production-ready code',
backstory: 'You are a senior software engineer.',
tools: [],
llm: {
provider: 'azure',
model: 'gpt-4-turbo',
apiKey: process.env.AZURE_OPENAI_API_KEY,
endpoint: process.env.AZURE_OPENAI_ENDPOINT,
temperature: 0.3,
},
});
// Cost optimization: use cheap model for simple tasks
const cheapAgent = new Agent({
role: 'Summarizer',
goal: 'Summarise content',
backstory: 'You summarise text efficiently.',
tools: [],
llm: {
provider: 'openai',
model: 'gpt-3.5-turbo',
temperature: 0.5,
},
});
Async Crew Execution
Run crews asynchronously for scalability:
interface CrewExecutionJob {
jobId: string;
crewId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
input: Record<string, unknown>;
result?: unknown;
error?: string;
createdAt: Date;
completedAt?: Date;
}
const executionQueue = new Map<string, CrewExecutionJob>();
async function executeCrewAsync(
crewId: string,
input: Record<string, unknown>
): Promise<string> {
const jobId = crypto.randomUUID();
const job: CrewExecutionJob = {
jobId,
crewId,
status: 'pending',
input,
createdAt: new Date(),
};
executionQueue.set(jobId, job);
// Queue the work
await jobQueue.enqueue({
type: 'crew_execution',
jobId,
crewId,
input,
});
return jobId;
}
async function getJobStatus(jobId: string): Promise<CrewExecutionJob | null> {
return executionQueue.get(jobId) || null;
}
// Worker process
async function processCrewJob(job: any) {
const executionJob = executionQueue.get(job.jobId);
if (!executionJob) return;
try {
executionJob.status = 'running';
const crew = crews.get(job.crewId);
if (!crew) {
throw new Error(`Crew ${job.crewId} not found`);
}
const result = await crew.kickoff(job.input);
executionJob.status = 'completed';
executionJob.result = result;
executionJob.completedAt = new Date();
} catch (error) {
executionJob.status = 'failed';
executionJob.error = (error as Error).message;
executionJob.completedAt = new Date();
}
}
Handling Crew Failures
Implement recovery and retry strategies:
interface CrewExecutionConfig {
maxRetries: number;
timeoutMs: number;
onRetry?: (attempt: number, error: Error) => void;
onFailure?: (error: Error) => void;
}
async function executeWithRetry(
crew: Crew,
input: Record<string, unknown>,
config: CrewExecutionConfig
): Promise<unknown> {
let lastError;
for (let attempt = 1; attempt <= config.maxRetries; attempt++) {
try {
const result = await Promise.race([
crew.kickoff(input),
new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Crew execution timeout')),
config.timeoutMs
)
),
]);
return result;
} catch (error) {
lastError = error;
if (attempt < config.maxRetries) {
const backoffMs = Math.pow(2, attempt - 1) * 1000;
console.log(
`Crew execution failed (attempt ${attempt}/${config.maxRetries}). Retrying in ${backoffMs}ms`
);
if (config.onRetry) {
config.onRetry(attempt, error as Error);
}
await new Promise((resolve) => setTimeout(resolve, backoffMs));
}
}
}
if (config.onFailure) {
config.onFailure(lastError as Error);
}
throw lastError;
}
CrewAI Flows for Event-Driven Agents
Use flows for complex, event-triggered workflows:
import { Flow, listen } from 'crewai';
const supportFlow = new Flow({
name: 'customer-support-flow',
description: 'Handles customer support tickets',
});
// Define flow states and transitions
supportFlow.addState('new_ticket', async (context) => {
const { ticketId, message } = context;
const analysisTask = new Task({
description: `Analyse this ticket: ${message}`,
agent: analyzerAgent,
expectedOutput: 'Ticket classification and priority',
});
const result = await crew.kickoff();
context.analysis = result;
// Route to next state based on priority
if (result.priority === 'urgent') {
return supportFlow.transition('urgent_handling', context);
} else {
return supportFlow.transition('standard_handling', context);
}
});
supportFlow.addState('urgent_handling', async (context) => {
// Immediate escalation
context.escalated = true;
return supportFlow.transition('completed', context);
});
supportFlow.addState('standard_handling', async (context) => {
// Standard response generation
context.responseGenerated = true;
return supportFlow.transition('completed', context);
});
supportFlow.addState('completed', async (context) => {
// Final state
return context;
});
// Listen for events
listen('ticket.created', async (event) => {
const result = await supportFlow.execute({
ticketId: event.ticketId,
message: event.message,
});
console.log('Ticket processed:', result);
});
Integration with FastAPI/Express
Expose crews via REST API:
import express, { Express, Request, Response } from 'express';
const app: Express = express();
app.use(express.json());
const crewMap = new Map<string, Crew>();
// Register crews
crewMap.set('support', supportCrew);
crewMap.set('research', researchCrew);
// Execute crew
app.post('/crews/:crewId/execute', async (req: Request, res: Response) => {
const { crewId } = req.params;
const { input } = req.body;
const crew = crewMap.get(crewId);
if (!crew) {
return res.status(404).json({ error: 'Crew not found' });
}
try {
const result = await executeWithRetry(crew, input, {
maxRetries: 3,
timeoutMs: 30000,
});
res.json({
success: true,
result,
executedAt: new Date(),
});
} catch (error) {
res.status(500).json({
error: (error as Error).message,
});
}
});
// Async execution
app.post('/crews/:crewId/execute-async', async (req: Request, res: Response) => {
const { crewId } = req.params;
const { input } = req.body;
const jobId = await executeCrewAsync(crewId, input);
res.status(202).json({ jobId });
});
// Check job status
app.get('/jobs/:jobId', (req: Request, res: Response) => {
const job = getJobStatus(req.params.jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
res.json(job);
});
app.listen(3000, () => {
console.log('CrewAI API running on port 3000');
});
Cost Tracking Per Crew Run
Monitor expenses for billing and optimization:
interface CostTracker {
crewId: string;
executionId: string;
tokensUsed: number;
costEstimate: number;
agents: Array<{
agentId: string;
modelUsed: string;
tokensUsed: number;
}>;
}
const costTrackers = new Map<string, CostTracker>();
async function trackCrewCosts(
crewId: string,
executionId: string
): Promise<void> {
const tracker: CostTracker = {
crewId,
executionId,
tokensUsed: 0,
costEstimate: 0,
agents: [],
};
// Intercept API calls to count tokens
// (In real implementation, hook into client library)
costTrackers.set(`${crewId}:${executionId}`, tracker);
}
function getCostReport(crewId: string): any {
const entries = Array.from(costTrackers.entries()).filter(([key]) =>
key.startsWith(crewId)
);
const totalCost = entries.reduce((sum, [, tracker]) => {
return sum + tracker.costEstimate;
}, 0);
const avgCostPerRun = totalCost / entries.length;
return {
crewId,
totalExecutions: entries.length,
totalCost,
avgCostPerRun,
executions: entries.map(([, tracker]) => tracker),
};
}
Checklist
- Design crew composition (agents, tasks, process)
- Implement custom tools with validation
- Configure memory systems (short, long, entity)
- Integrate custom LLMs and optimize costs
- Build async execution with job queue
- Add retry logic and error handling
- Deploy as REST API service
- Implement cost tracking and analytics
Conclusion
CrewAI scales multi-agent systems from simple sequential pipelines to complex hierarchical teams. Start with sequential crews, add tools and memory, then graduate to hierarchical management and event-driven flows. Use async execution for production scalability, custom LLM integration for cost control, and robust error handling for reliability. As your AI infrastructure grows, CrewAI''s team coordination primitives keep systems manageable and cost-effective.