Published on

LangGraph in Production — Stateful AI Agents With Checkpointing and Human-in-the-Loop

Authors

Introduction

LangGraph is LangChain''s framework for building stateful AI agents as directed graphs. Unlike simple prompt chains, LangGraph manages complex workflows with conditional routing, state persistence, and human interruption points. This post covers graph design, checkpointing to databases, resuming interrupted workflows, and production deployment.

Understanding LangGraph: State Machines for AI

LangGraph models agent workflows as state machines where:

  • Nodes are functions that process state
  • Edges define transitions (can be conditional)
  • State is a schema defining all data in the workflow
import {
  Annotation,
  StateGraph,
  START,
  END,
} from '@langchain/langgraph';
import Anthropic from '@anthropic-ai/sdk';

// Define the state schema
const AgentState = Annotation.Root({
  messages: Annotation<{ role: 'user' | 'assistant'; content: string }[]>,
  research: Annotation<string>,
  analysis: Annotation<string>,
  final_response: Annotation<string>,
});

const client = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Node 1: Research
async function researchNode(state: typeof AgentState.State) {
  const messages = state.messages;

  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    system: 'You are a research analyst. Gather information on the topic.',
    messages: messages.map((m) => ({
      role: m.role,
      content: m.content,
    })) as Parameters<typeof client.messages.create>[0]['messages'],
  });

  const research =
    response.content[0].type === 'text' ? response.content[0].text : '';

  return {
    ...state,
    research,
    messages: [
      ...state.messages,
      { role: 'assistant', content: `Research complete: ${research}` },
    ],
  };
}

// Node 2: Analysis
async function analysisNode(state: typeof AgentState.State) {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 1024,
    system: 'You are an analyst. Provide insights based on the research.',
    messages: [
      {
        role: 'user',
        content: `Analyse this research: ${state.research}`,
      },
    ],
  });

  const analysis =
    response.content[0].type === 'text' ? response.content[0].text : '';

  return {
    ...state,
    analysis,
  };
}

// Node 3: Response generation
async function responseNode(state: typeof AgentState.State) {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 1024,
    system: 'You are a communicator. Write a clear, concise response.',
    messages: [
      {
        role: 'user',
        content: `Research: ${state.research}\nAnalysis: ${state.analysis}\nProvide final response.`,
      },
    ],
  });

  const finalResponse =
    response.content[0].type === 'text' ? response.content[0].text : '';

  return {
    ...state,
    final_response: finalResponse,
  };
}

// Build the graph
const graph = new StateGraph(AgentState)
  .addNode('research', researchNode)
  .addNode('analysis', analysisNode)
  .addNode('response', responseNode)
  .addEdge(START, 'research')
  .addEdge('research', 'analysis')
  .addEdge('analysis', 'response')
  .addEdge('response', END);

const agent = graph.compile();

Conditional Routing

Route nodes based on state:

const ConditionalStateGraph = new StateGraph(AgentState)
  .addNode('research', researchNode)
  .addNode('analysis', analysisNode)
  .addNode('fact_check', factCheckNode)
  .addNode('response', responseNode)
  .addEdge(START, 'research')
  .addConditionalEdges(
    'research',
    // Router function: examines state and returns next node
    (state: typeof AgentState.State) => {
      // If research is &lt;100 chars, fact-check
      if (state.research.length &lt; 100) {
        return 'fact_check';
      }
      // Otherwise, proceed to analysis
      return 'analysis';
    },
    {
      fact_check: 'fact_check',
      analysis: 'analysis',
    }
  )
  .addEdge('fact_check', 'analysis')
  .addEdge('analysis', 'response')
  .addEdge('response', END);

const conditionalAgent = ConditionalStateGraph.compile();

async function factCheckNode(state: typeof AgentState.State) {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 512,
    system: 'Verify the accuracy of this research.',
    messages: [
      {
        role: 'user',
        content: `Fact-check this: ${state.research}`,
      },
    ],
  });

  return {
    ...state,
    research: (response.content[0].type === 'text' ? response.content[0].text : '') + '\n[Fact-checked]',
  };
}

Checkpointing to PostgreSQL

Persist state so workflows can be resumed:

import { Client } from 'pg';

interface CheckpointData {
  thread_id: string;
  checkpoint_id: string;
  node_id: string;
  state: string;
  timestamp: Date;
  created_at: Date;
}

class PostgresCheckpointer {
  private db: Client;

  constructor(connectionString: string) {
    this.db = new Client({ connectionString });
  }

  async connect(): Promise<void> {
    await this.db.connect();

    // Create table if not exists
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS graph_checkpoints (
        thread_id TEXT NOT NULL,
        checkpoint_id TEXT NOT NULL,
        node_id TEXT NOT NULL,
        state JSONB NOT NULL,
        timestamp TIMESTAMPTZ NOT NULL,
        created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (thread_id, checkpoint_id)
      );

      CREATE INDEX IF NOT EXISTS idx_checkpoints_thread
        ON graph_checkpoints(thread_id, timestamp DESC);
    `);
  }

  async saveCheckpoint(
    threadId: string,
    checkpointId: string,
    nodeId: string,
    state: typeof AgentState.State
  ): Promise<void> {
    await this.db.query(
      `INSERT INTO graph_checkpoints
        (thread_id, checkpoint_id, node_id, state, timestamp)
       VALUES ($1, $2, $3, $4, $5)
       ON CONFLICT (thread_id, checkpoint_id)
       DO UPDATE SET state = $4, timestamp = $5`,
      [threadId, checkpointId, nodeId, JSON.stringify(state), new Date()]
    );
  }

  async loadCheckpoint(
    threadId: string,
    checkpointId?: string
  ): Promise<CheckpointData | null> {
    const query = checkpointId
      ? `SELECT * FROM graph_checkpoints
         WHERE thread_id = $1 AND checkpoint_id = $2`
      : `SELECT * FROM graph_checkpoints
         WHERE thread_id = $1
         ORDER BY timestamp DESC LIMIT 1`;

    const params = checkpointId ? [threadId, checkpointId] : [threadId];
    const result = await this.db.query(query, params);

    if (result.rows.length === 0) {
      return null;
    }

    const row = result.rows[0];
    return {
      thread_id: row.thread_id,
      checkpoint_id: row.checkpoint_id,
      node_id: row.node_id,
      state: row.state,
      timestamp: row.timestamp,
      created_at: row.created_at,
    };
  }

  async disconnect(): Promise<void> {
    await this.db.end();
  }
}

const checkpointer = new PostgresCheckpointer(
  process.env.DATABASE_URL || 'postgres://localhost/langgraph'
);
await checkpointer.connect();

// Wrap agent with checkpointing
async function runWithCheckpoints(
  threadId: string,
  input: { messages: { role: 'user' | 'assistant'; content: string }[] }
) {
  let state = input;

  // Load previous checkpoint if exists
  const checkpoint = await checkpointer.loadCheckpoint(threadId);
  if (checkpoint) {
    state = JSON.parse(checkpoint.state);
    console.log(`Resuming from checkpoint at node: ${checkpoint.node_id}`);
  }

  // Create a checkpoint before running
  const checkpointId = `cp-${Date.now()}`;

  // Run the agent
  const stream = agent.stream(state, {
    threadId,
    checkpointId,
  });

  for await (const event of stream) {
    const [nodeId, nodeState] = Object.entries(event)[0] as [
      string,
      typeof AgentState.State
    ];

    if (nodeState) {
      await checkpointer.saveCheckpoint(threadId, checkpointId, nodeId, nodeState);
      console.log(`Checkpoint saved at node: ${nodeId}`);
    }
  }
}

Human-in-the-Loop Interrupts

Pause workflows for human review:

import { Annotation, StateGraph, START, END } from '@langchain/langgraph';

const InterruptibleState = Annotation.Root({
  messages: Annotation<{ role: 'user' | 'assistant'; content: string }[]>,
  draft_response: Annotation<string>,
  approved: Annotation<boolean | null>,
  final_response: Annotation<string>,
});

async function generateDraftNode(state: typeof InterruptibleState.State) {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    messages: state.messages as Parameters<typeof client.messages.create>[0]['messages'],
  });

  return {
    ...state,
    draft_response:
      response.content[0].type === 'text' ? response.content[0].text : '',
  };
}

function shouldApproveNode(state: typeof InterruptibleState.State) {
  // If approved is null, interrupt workflow
  if (state.approved === null) {
    return 'interrupt';
  }

  // If human approved, finalize
  if (state.approved === true) {
    return 'finalize';
  }

  // If human rejected, regenerate
  return 'regenerate';
}

async function finalizeNode(state: typeof InterruptibleState.State) {
  return {
    ...state,
    final_response: state.draft_response,
  };
}

async function regenerateNode(state: typeof InterruptibleState.State) {
  const response = await client.messages.create({
    model: 'claude-opus-4-1',
    max_tokens: 2048,
    messages: [
      ...state.messages,
      {
        role: 'user',
        content: 'Human rejected the previous response. Try a different approach.',
      },
    ] as Parameters<typeof client.messages.create>[0]['messages'],
  });

  return {
    ...state,
    draft_response:
      response.content[0].type === 'text' ? response.content[0].text : '',
    approved: null, // Reset for re-review
  };
}

const interruptibleGraph = new StateGraph(InterruptibleState)
  .addNode('draft', generateDraftNode)
  .addNode('finalize', finalizeNode)
  .addNode('regenerate', regenerateNode)
  .addEdge(START, 'draft')
  .addConditionalEdges('draft', shouldApproveNode, {
    interrupt: END,
    finalize: 'finalize',
    regenerate: 'regenerate',
  })
  .addConditionalEdges('regenerate', shouldApproveNode, {
    interrupt: END,
    finalize: 'finalize',
    regenerate: 'regenerate',
  })
  .addEdge('finalize', END);

const interruptibleAgent = interruptibleGraph.compile({
  interruptBefore: ['draft'],
});

// Usage with human approval
async function runWithApproval(threadId: string, input: typeof InterruptibleState.State) {
  const stream = interruptibleAgent.stream(input, {
    threadId,
  });

  for await (const event of stream) {
    const [nodeId, nodeState] = Object.entries(event)[0] as [
      string,
      typeof InterruptibleState.State
    ];

    if (nodeId === 'draft') {
      console.log('Draft response:');
      console.log(nodeState.draft_response);
      console.log('\nWaiting for human approval...');

      // In real app, get approval from user
      // For demo, auto-approve
      nodeState.approved = true;
    }
  }
}

Streaming State Updates

Stream intermediate results:

async function streamingAgent(
  input: typeof AgentState.State,
  onUpdate: (node: string, state: typeof AgentState.State) => void
) {
  const stream = agent.stream(input);

  for await (const event of stream) {
    const [nodeId, nodeState] = Object.entries(event)[0] as [
      string,
      typeof AgentState.State
    ];

    if (nodeState) {
      onUpdate(nodeId, nodeState);
    }
  }
}

// Stream to client
async function handleAgentRequest(req: any, res: any) {
  const { messages } = req.body;

  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  await streamingAgent(
    { messages, research: '', analysis: '', final_response: '' },
    (nodeId, state) => {
      res.write(
        `event: state-update\ndata: ${JSON.stringify({ nodeId, state })}\n\n`
      );
    }
  );

  res.end();
}

Production Deployment

Deploy LangGraph as an API service:

import express, { Express } from 'express';
import { v4 as uuidv4 } from 'uuid';

const app: Express = express();
app.use(express.json());

interface ThreadMetadata {
  threadId: string;
  createdAt: Date;
  lastUpdated: Date;
  status: 'running' | 'paused' | 'completed';
}

const threads = new Map<string, ThreadMetadata>();

// Start a new workflow
app.post('/workflows', async (req, res) => {
  const { input } = req.body;
  const threadId = uuidv4();

  threads.set(threadId, {
    threadId,
    createdAt: new Date(),
    lastUpdated: new Date(),
    status: 'running',
  });

  res.json({ threadId });

  // Run agent asynchronously
  setImmediate(() => runWithCheckpoints(threadId, input));
});

// Get workflow status
app.get('/workflows/:threadId', async (req, res) => {
  const metadata = threads.get(req.params.threadId);

  if (!metadata) {
    return res.status(404).json({ error: 'Thread not found' });
  }

  const checkpoint = await checkpointer.loadCheckpoint(req.params.threadId);

  res.json({
    threadId: metadata.threadId,
    status: metadata.status,
    lastCheckpoint: checkpoint
      ? {
          nodeId: checkpoint.node_id,
          timestamp: checkpoint.timestamp,
        }
      : null,
  });
});

// Resume workflow after human approval
app.post('/workflows/:threadId/approve', async (req, res) => {
  const { approved } = req.body;
  const threadId = req.params.threadId;

  // Update state in Redis or DB to signal approval
  // Re-run agent from last checkpoint
  res.json({ threadId, approved });
});

app.listen(3000, () => {
  console.log('LangGraph API running on port 3000');
});

Checklist

  • Design state schema and graph structure
  • Build nodes and conditional routing logic
  • Implement checkpointing to PostgreSQL
  • Add human-in-the-loop interrupt points
  • Stream state updates to clients
  • Deploy as an API service
  • Add error recovery and retries

Conclusion

LangGraph transforms AI workflows from simple chains into sophisticated state machines. Checkpointing enables long-running workflows that survive interruptions. Human-in-the-loop patterns make agents safer by keeping humans in control. Start with a simple graph, add checkpointing, then introduce conditional routing and interrupts. As your workflows grow more complex, LangGraph''s primitives scale with you.