- Published on
LangGraph in Production — Stateful Agent Workflows With Checkpointing
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
LangGraph is the most production-ready framework for building stateful agent workflows. Unlike raw agent loops, LangGraph provides checkpointing (pause/resume), human-in-the-loop, streaming, and conditional logic out of the box. This post covers building, deploying, and running LangGraph workflows in production.
- LangGraph Graph Definition
- State Schema Design
- Checkpointing to Postgres
- Human-in-the-Loop Interrupts
- Streaming Agent Thoughts and Tool Calls
- Parallel Branches in Graph
- Error Handling Nodes
- Compiling and Deploying
- LangGraph vs Raw Agent Loop
- Checklist
- Conclusion
LangGraph Graph Definition
Define agent workflows as directed acyclic graphs of nodes and edges.
import { StateGraph, START, END } from '@langchain/langgraph';
import { Anthropic } from '@anthropic-ai/sdk';
// Define the state that flows through the graph
interface WorkflowState {
userQuery: string;
researchFindings?: string;
outline?: string;
draft?: string;
feedback?: string;
finalArticle?: string;
errors: string[];
}
// Create the graph
const workflow = new StateGraph<WorkflowState>({
channels: {
userQuery: {
value: null,
reducer: (x, y) => y || x, // Last write wins
},
researchFindings: {
value: null,
reducer: (x, y) => y || x,
},
outline: {
value: null,
reducer: (x, y) => y || x,
},
draft: {
value: null,
reducer: (x, y) => y || x,
},
feedback: {
value: null,
reducer: (x, y) => y || x,
},
finalArticle: {
value: null,
reducer: (x, y) => y || x,
},
errors: {
value: [],
reducer: (x, y) => [...(x || []), ...y], // Append errors
},
},
});
// Define node functions
const researcher = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Researching topic...');
try {
const findings = await performResearch(state.userQuery);
return {
researchFindings: findings,
};
} catch (error) {
return {
errors: [(error as Error).message],
};
}
};
const outlineCreator = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Creating outline...');
const outline = await createOutline(state.researchFindings || '');
return { outline };
};
const draftWriter = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Writing draft...');
const draft = await writeDraft(state.outline || '', state.researchFindings || '');
return { draft };
};
const reviewer = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Reviewing draft...');
const feedback = await reviewDraft(state.draft || '');
return { feedback };
};
const reviser = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Revising based on feedback...');
const revised = await reviseDraft(state.draft || '', state.feedback || '');
return { finalArticle: revised };
};
// Add nodes to graph
workflow.addNode('researcher', researcher);
workflow.addNode('outline_creator', outlineCreator);
workflow.addNode('draft_writer', draftWriter);
workflow.addNode('reviewer', reviewer);
workflow.addNode('reviser', reviser);
// Define edges (control flow)
workflow.addEdge(START, 'researcher');
workflow.addEdge('researcher', 'outline_creator');
workflow.addEdge('outline_creator', 'draft_writer');
workflow.addEdge('draft_writer', 'reviewer');
// Conditional edge: revise if feedback suggests issues
workflow.addConditionalEdges(
'reviewer',
async (state: WorkflowState) => {
const hasIssues = (state.feedback || '').toLowerCase().includes('revise');
return hasIssues ? 'revise' : 'finalize';
},
{
revise: 'reviser',
finalize: END,
},
);
workflow.addEdge('reviser', END);
// Compile the graph
const app = workflow.compile();
// Helper functions
async function performResearch(query: string): Promise<string> {
// Implement research logic
return 'Research findings...';
}
async function createOutline(findings: string): Promise<string> {
return 'Outline...';
}
async function writeDraft(outline: string, findings: string): Promise<string> {
return 'Draft article...';
}
async function reviewDraft(draft: string): Promise<string> {
return 'Feedback on draft...';
}
async function reviseDraft(draft: string, feedback: string): Promise<string> {
return 'Revised article...';
}
Graph definition is clear, testable, and visualizable.
State Schema Design
State is the single source of truth flowing through the graph.
interface ArticleWorkflowState {
// Input
topic: string;
targetAudience: string;
wordCount: number;
// Processing state
researchPhase: {
status: 'pending' | 'in_progress' | 'completed' | 'failed';
sources: Array<{ url: string; title: string; snippet: string }>;
findingsText: string;
};
writingPhase: {
status: 'pending' | 'in_progress' | 'completed';
outline: string;
sections: Array<{ heading: string; content: string }>;
draft: string;
};
reviewPhase: {
status: 'pending' | 'in_progress' | 'completed';
feedback: string;
issuesToFix: string[];
};
// Output
finalArticle: string;
// Metadata
createdAt: number;
updatedAt: number;
errorLog: Array<{ timestamp: number; error: string }>;
}
// Create workflow with explicit state schema
const articleWorkflow = new StateGraph<ArticleWorkflowState>({
channels: {
topic: {
value: null,
reducer: (x, y) => y || x,
},
targetAudience: {
value: null,
reducer: (x, y) => y || x,
},
wordCount: {
value: 0,
reducer: (x, y) => y || x,
},
researchPhase: {
value: {
status: 'pending',
sources: [],
findingsText: '',
},
reducer: (x, y) => ({ ...x, ...y }),
},
writingPhase: {
value: {
status: 'pending',
outline: '',
sections: [],
draft: '',
},
reducer: (x, y) => ({ ...x, ...y }),
},
reviewPhase: {
value: {
status: 'pending',
feedback: '',
issuesToFix: [],
},
reducer: (x, y) => ({ ...x, ...y }),
},
finalArticle: {
value: '',
reducer: (x, y) => y || x,
},
createdAt: {
value: Date.now(),
reducer: (x) => x,
},
updatedAt: {
value: Date.now(),
reducer: () => Date.now(),
},
errorLog: {
value: [],
reducer: (x, y) => [...(x || []), ...y],
},
},
});
Schema clarity prevents bugs and makes workflows self-documenting.
Checkpointing to Postgres
Save state to Postgres so workflows can pause and resume.
import { PostgresSaver } from '@langchain/langgraph-checkpoint-postgres';
const connectionString = process.env.DATABASE_URL;
const checkpointer = new PostgresSaver({
connectionString,
tableName: 'langgraph_checkpoints',
});
// Compile graph with checkpointing
const appWithCheckpoints = workflow.compile({
checkpointer,
interruptBefore: ['reviewer'], // Interrupt before review node for human inspection
});
// Run with checkpointing
async function runWorkflow(topic: string) {
const config = {
configurable: {
thread_id: `thread-${Date.now()}`,
},
};
// First run: gets to review node and pauses
const result1 = await appWithCheckpoints.invoke(
{
topic,
targetAudience: 'engineers',
wordCount: 2000,
},
config,
);
// In production, this is where humans review the draft
console.log('Draft created, paused for review');
// Later: resume from checkpoint
const result2 = await appWithCheckpoints.invoke(
{
...result1,
feedback: 'Good draft, just needs more technical depth',
},
config,
);
return result2.finalArticle;
}
// Query checkpoint history
async function getCheckpointHistory(threadId: string) {
const history = await checkpointer.getTuple({
configurable: { thread_id: threadId },
});
return history;
}
Checkpointing enables pause/resume workflows and audit trails.
Human-in-the-Loop Interrupts
Pause workflows for human decision-making.
const workflowWithHumanLoop = workflow.compile({
checkpointer,
interruptBefore: ['reviewer'], // Pause before review
interruptAfter: ['draft_writer'], // Also save after draft is written
});
interface HumanReviewRequest {
threadId: string;
draft: string;
requestedBy: string;
deadline: number;
}
async function requestHumanReview(draft: string, threadId: string): Promise<string> {
const request: HumanReviewRequest = {
threadId,
draft,
requestedBy: 'article-workflow',
deadline: Date.now() + 3600000, // 1 hour deadline
};
// In production: send to review queue (Slack, email, dashboard)
console.log(`Review requested for thread ${threadId}`);
// Wait for human response (with timeout)
const feedback = await waitForHumanFeedback(threadId);
return feedback;
}
async function waitForHumanFeedback(threadId: string, timeout: number = 3600000): Promise<string> {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
// Poll for feedback (in production: use webhooks or subscriptions)
const feedback = await checkFeedbackQueue(threadId);
if (feedback) {
return feedback;
}
// Wait 10 seconds before polling again
await new Promise((resolve) => setTimeout(resolve, 10000));
}
throw new Error('Human review timeout');
}
async function checkFeedbackQueue(threadId: string): Promise<string | null> {
// Check database for human feedback
return null;
}
// Resume with human feedback
async function resumeWithFeedback(threadId: string, feedback: string) {
const config = {
configurable: { thread_id: threadId },
};
// Resume from where it was paused
const result = await workflowWithHumanLoop.invoke(
{ feedback },
config,
);
return result;
}
Interrupts enable workflows to wait for human decisions without blocking.
Streaming Agent Thoughts and Tool Calls
Stream thoughts and actions in real-time.
async function streamWorkflow(topic: string) {
const config = {
configurable: {
thread_id: `stream-${Date.now()}`,
},
};
// Create async generator that streams events
const stream = await app.streamEvents(
{
topic,
targetAudience: 'engineers',
wordCount: 2000,
},
config,
);
for await (const event of stream) {
const { event: eventType, data } = event;
if (eventType === 'on_node_start') {
console.log(`Starting node: ${data.node}`);
}
if (eventType === 'on_node_end') {
console.log(`Completed node: ${data.node}`);
}
if (eventType === 'on_tool_start') {
console.log(`Tool call: ${data.tool_name} with args ${JSON.stringify(data.tool_input)}`);
}
if (eventType === 'on_tool_end') {
console.log(`Tool result: ${data.tool_output.substring(0, 100)}...`);
}
if (eventType === 'on_chat_model_start') {
console.log(`LLM thinking...`);
}
if (eventType === 'on_llm_stream') {
// Stream LLM tokens in real-time
process.stdout.write(data.chunk.content);
}
}
}
// For web apps: stream events to client
async function streamToWebSocket(topic: string, ws: WebSocket) {
const config = {
configurable: {
thread_id: `ws-${Date.now()}`,
},
};
const stream = await app.streamEvents(
{ topic, targetAudience: 'web', wordCount: 3000 },
config,
);
for await (const event of stream) {
// Send event to client
ws.send(
JSON.stringify({
type: event.event,
data: event.data,
}),
);
}
}
Streaming provides real-time feedback to users.
Parallel Branches in Graph
Run independent workflows in parallel.
const parallelWorkflow = new StateGraph<WorkflowState>({
channels: {
userQuery: { value: null, reducer: (x, y) => y || x },
researchFindings: { value: null, reducer: (x, y) => y || x },
codeExamples: { value: null, reducer: (x, y) => y || x },
references: { value: null, reducer: (x, y) => y || x },
draft: { value: null, reducer: (x, y) => y || x },
},
});
// Define nodes
parallelWorkflow.addNode('research', async (state: WorkflowState) => {
return {
researchFindings: 'Research findings...',
};
});
parallelWorkflow.addNode('gather_code_examples', async (state: WorkflowState) => {
return {
codeExamples: 'Code examples...',
};
});
parallelWorkflow.addNode('find_references', async (state: WorkflowState) => {
return {
references: 'References...',
};
});
parallelWorkflow.addNode('write_draft', async (state: WorkflowState) => {
// This node waits for all parallel nodes to complete
const draft = `Based on:
- Research: ${state.researchFindings}
- Code: ${state.codeExamples}
- References: ${state.references}
Draft: ...`;
return { draft };
});
// Start all three nodes in parallel
parallelWorkflow.addEdge(START, 'research');
parallelWorkflow.addEdge(START, 'gather_code_examples');
parallelWorkflow.addEdge(START, 'find_references');
// Wait for all to complete, then write
parallelWorkflow.addEdge('research', 'write_draft');
parallelWorkflow.addEdge('gather_code_examples', 'write_draft');
parallelWorkflow.addEdge('find_references', 'write_draft');
parallelWorkflow.addEdge('write_draft', END);
const parallelApp = parallelWorkflow.compile();
Parallel branches reduce total execution time.
Error Handling Nodes
Explicit error handling in workflows.
const robustWorkflow = new StateGraph<WorkflowState>({
channels: {
userQuery: { value: null, reducer: (x, y) => y || x },
researchFindings: { value: null, reducer: (x, y) => y || x },
draft: { value: null, reducer: (x, y) => y || x },
errors: { value: [], reducer: (x, y) => [...(x || []), ...y] },
},
});
// Node with error handling
const safeResearch = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
try {
const findings = await performResearch(state.userQuery);
return { researchFindings: findings };
} catch (error) {
return {
errors: [
`Research failed: ${(error as Error).message}. Proceeding with fallback.`,
],
};
}
};
const errorHandler = async (state: WorkflowState): Promise<Partial<WorkflowState>> => {
console.log('Handling errors:', state.errors);
// Decide next action based on error severity
if (state.errors.some((e) => e.includes('Critical'))) {
throw new Error('Critical error, aborting workflow');
}
// Log errors for monitoring
await logErrors(state.errors);
return {};
};
robustWorkflow.addNode('research', safeResearch);
robustWorkflow.addNode('handle_errors', errorHandler);
robustWorkflow.addNode('draft_writer', async (state: WorkflowState) => {
return { draft: 'Draft...' };
});
robustWorkflow.addEdge(START, 'research');
robustWorkflow.addConditionalEdges(
'research',
async (state: WorkflowState) => {
return state.errors.length > 0 ? 'handle_errors' : 'draft_writer';
},
{
handle_errors: 'handle_errors',
draft_writer: 'draft_writer',
},
);
robustWorkflow.addEdge('handle_errors', 'draft_writer');
robustWorkflow.addEdge('draft_writer', END);
async function logErrors(errors: string[]): Promise<void> {
console.error('Workflow errors:', errors);
// Send to monitoring system
}
Explicit error nodes make error handling first-class.
Compiling and Deploying
Compile workflows for production deployment.
import { LangGraphRunnableConfig } from '@langchain/langgraph';
// Compile with all features
const productionApp = workflow.compile({
checkpointer,
interruptBefore: ['reviewer'],
interruptAfter: ['draft_writer'],
});
// Deployment: expose as API
import express from 'express';
const app = express();
app.post('/workflow/start', async (req, res) => {
const { topic, targetAudience } = req.body;
const threadId = `thread-${Date.now()}`;
try {
const result = await productionApp.invoke(
{
topic,
targetAudience,
wordCount: 2000,
},
{
configurable: { thread_id: threadId },
},
);
res.json({
threadId,
status: 'paused_for_review',
draft: result.draft,
});
} catch (error) {
res.status(500).json({ error: (error as Error).message });
}
});
app.post('/workflow/:threadId/resume', async (req, res) => {
const { threadId } = req.params;
const { feedback } = req.body;
try {
const result = await productionApp.invoke(
{ feedback },
{
configurable: { thread_id: threadId },
},
);
res.json({
status: 'completed',
finalArticle: result.finalArticle,
});
} catch (error) {
res.status(500).json({ error: (error as Error).message });
}
});
app.get('/workflow/:threadId/history', async (req, res) => {
const { threadId } = req.params;
const history = await checkpointer.getTuple({
configurable: { thread_id: threadId },
});
res.json(history);
});
app.listen(3000, () => {
console.log('Workflow API running on port 3000');
});
LangGraph apps deploy as standard Node services.
LangGraph vs Raw Agent Loop
When to use LangGraph over hand-rolled agents:
// Raw agent loop: you manage everything
class RawAgent {
async run(task: string): Promise<string> {
let state = { task, messages: [], iteration: 0 };
while (state.iteration < 10) {
state.iteration++;
// You implement: LLM call, tool routing, state updates
// You implement: checkpointing, interrupts, streaming, error handling
// You implement: graph visualization, debugging
}
return '';
}
}
// LangGraph: framework handles infrastructure
const langgraphApp = workflow.compile({
checkpointer, // Automatic checkpointing
interruptBefore: ['node'], // Built-in interrupts
// Streaming: automatic via streamEvents()
// Error handling: explicit nodes
// Visualization: automatic via .getGraph().draw()
});
// Choose LangGraph if you need:
// - Checkpointing/pause-resume
// - Human-in-the-loop
// - Streaming to users
// - Complex control flow (conditionals, loops)
// - Production observability
// Choose raw agent if:
// - Single simple loop
// - No human interaction
// - No checkpointing needed
// - Minimal state
LangGraph is worth it for production workflows.
Checklist
- State schema: explicit, immutable, single source of truth
- Graph definition: clear nodes and edges, conditional routing
- Checkpointing: save to Postgres for resume capability
- Interrupts: pause for human decision-making
- Streaming: real-time feedback to users
- Parallel: run independent tasks simultaneously
- Error handling: explicit error nodes
- Deployment: expose as HTTP API
Conclusion
LangGraph provides a production-ready framework for stateful agent workflows. Define control flow as graphs, checkpoint to Postgres for resumability, stream progress to users, handle errors explicitly, and deploy as standard services. For complex workflows with human interaction, LangGraph eliminates months of infrastructure work.