Published on

MLOps for LLMs — CI/CD Pipelines for Model Training, Evaluation, and Deployment

Authors
  • Name
    Twitter

Introduction

MLOps transforms AI from research to production-grade operations. DevOps brought infrastructure-as-code and automated testing to software; MLOps brings the same rigor to models. This guide covers building complete CI/CD pipelines for LLM training, evaluation, and deployment.

LLM CI/CD Pipeline Architecture

A production CI/CD pipeline automates: code commit → eval test → train on new data → evaluate → canary deploy → production.

interface MLOpsStage {
  name: string;
  trigger: 'on-commit' | 'on-schedule' | 'manual';
  steps: PipelineStep[];
  timeout: number; // minutes
  skipOn: string[]; // Paths that skip this stage (e.g., skip training on doc changes)
}

interface PipelineStep {
  name: string;
  command: string;
  timeout: number;
  retryCount: number;
  onFailure: 'fail' | 'warn' | 'continue';
}

class MLOPsPipeline {
  private stages: MLOpsStage[] = [];

  addStage(stage: MLOpsStage): this {
    this.stages.push(stage);
    return this;
  }

  async execute(commit: GitCommit): Promise<PipelineResult> {
    const results: StageResult[] = [];

    for (const stage of this.stages) {
      console.log(`Executing stage: ${stage.name}`);

      // Check if should skip
      if (this.shouldSkip(stage, commit)) {
        console.log(`Skipping stage ${stage.name}`);
        continue;
      }

      const stageResult = await this.executeStage(stage, commit);
      results.push(stageResult);

      if (!stageResult.success && stage.steps.some((s) => s.onFailure === 'fail')) {
        console.log(`Stage ${stage.name} failed, stopping pipeline`);
        break;
      }
    }

    return { stages: results, success: results.every((r) => r.success) };
  }

  private shouldSkip(stage: MLOpsStage, commit: GitCommit): boolean {
    // Skip if only documentation changed
    return commit.changedFiles.every((f) =>
      stage.skipOn.some((pattern) => f.includes(pattern))
    );
  }

  private async executeStage(stage: MLOpsStage, commit: GitCommit): Promise<StageResult> {
    const stepResults: StepResult[] = [];

    for (const step of stage.steps) {
      const result = await this.executeStep(step);
      stepResults.push(result);

      if (!result.success) {
        if (step.onFailure === 'fail') {
          return { name: stage.name, success: false, steps: stepResults };
        }
      }
    }

    return { name: stage.name, success: true, steps: stepResults };
  }

  private async executeStep(step: PipelineStep): Promise<StepResult> {
    for (let attempt = 0; attempt <= step.retryCount; attempt++) {
      try {
        const output = await this.runCommand(step.command, step.timeout);
        return { name: step.name, success: true, output };
      } catch (error) {
        if (attempt === step.retryCount) {
          return { name: step.name, success: false, error: error.message };
        }
      }
    }
    return { name: step.name, success: false, error: 'Max retries exceeded' };
  }

  private async runCommand(command: string, timeout: number): Promise<string> {
    // Execute shell command with timeout
    return 'success';
  }
}

interface GitCommit {
  hash: string;
  message: string;
  author: string;
  changedFiles: string[];
}

interface StageResult {
  name: string;
  success: boolean;
  steps?: StepResult[];
}

interface StepResult {
  name: string;
  success: boolean;
  output?: string;
  error?: string;
}

interface PipelineResult {
  stages: StageResult[];
  success: boolean;
}

Automated Evaluation on Every Commit

class AutomatedEvaluation {
  async evaluateOnEveryCommit(commit: GitCommit): Promise<EvalResult> {
    // Step 1: Load current model
    const productionModel = await this.loadProductionModel();

    // Step 2: Check if code changes affect eval
    if (this.changesPromptLogic(commit)) {
      // Step 3: Run eval suite
      const evalResults = await this.runEvalSuite(productionModel);

      // Step 4: Compare with baseline
      const baseline = await this.getBaselineMetrics();
      const regression = this.detectRegression(evalResults, baseline);

      if (regression.detected) {
        console.log(`Eval regression detected: ${regression.metric} declined`);
        return { passed: false, reason: regression.reason, metrics: evalResults };
      }
    }

    return { passed: true, metrics: {} };
  }

  private changesPromptLogic(commit: GitCommit): boolean {
    // Check if commit modifies prompt files, generation logic, etc.
    return commit.changedFiles.some((f) =>
      /prompts|generation|inference/.test(f)
    );
  }

  private async runEvalSuite(model: LLMModel): Promise<Record<string, number>> {
    const evals = {
      accuracy: await this.evalAccuracy(model),
      latency: await this.evalLatency(model),
      toxicity: await this.evalToxicity(model),
      instructionFollowing: await this.evalInstructionFollowing(model)
    };

    return evals;
  }

  private async evalAccuracy(model: LLMModel): Promise<number> {
    // Run evaluation on validation set
    return 0.85;
  }

  private async evalLatency(model: LLMModel): Promise<number> {
    // Measure p95 latency
    return 500; // ms
  }

  private async evalToxicity(model: LLMModel): Promise<number> {
    // Measure % of outputs that are toxic
    return 0.02;
  }

  private async evalInstructionFollowing(model: LLMModel): Promise<number> {
    // Measure % that follow explicit constraints
    return 0.92;
  }

  private async getBaselineMetrics(): Promise<Record<string, number>> {
    // Get metrics from last passing commit
    return { accuracy: 0.84, latency: 520, toxicity: 0.03 };
  }

  private detectRegression(
    current: Record<string, number>,
    baseline: Record<string, number>
  ): { detected: boolean; metric?: string; reason?: string } {
    for (const metric in current) {
      const degradation = baseline[metric] - current[metric];
      const percentChange = (degradation / baseline[metric]) * 100;

      if (metric === 'accuracy' && percentChange > 2) {
        return { detected: true, metric, reason: `${metric} declined ${percentChange.toFixed(1)}%` };
      }
      if (metric === 'latency' && percentChange > 10) {
        return { detected: true, metric, reason: `${metric} increased ${percentChange.toFixed(1)}%` };
      }
    }
    return { detected: false };
  }
}

interface EvalResult {
  passed: boolean;
  reason?: string;
  metrics: Record<string, number>;
}

Training Pipelines Triggered by Data Changes

class TrainingPipeline {
  async triggerOnDataChange(dataVersion: string): Promise<TrainingResult> {
    console.log(`Data changed to version ${dataVersion}, starting training`);

    // Step 1: Load training data
    const trainingData = await this.loadData(dataVersion);

    // Step 2: Validate data quality
    const qualityCheck = await this.validateDataQuality(trainingData);
    if (!qualityCheck.passed) {
      return { success: false, reason: 'Data quality check failed' };
    }

    // Step 3: Train new model
    console.log('Starting model training...');
    const newModel = await this.trainModel(trainingData);

    // Step 4: Evaluate new model
    const evalResults = await this.evaluateModel(newModel);

    // Step 5: Compare with current production model
    const productionModel = await this.loadProductionModel();
    const prodEvalResults = await this.evaluateModel(productionModel);

    const improvement = this.computeImprovement(evalResults, prodEvalResults);
    if (improvement < 0.01) {
      return {
        success: false,
        reason: `New model only improved by ${(improvement * 100).toFixed(2)}%`
      };
    }

    // Step 6: Move to staging
    await this.deployToStaging(newModel);

    return {
      success: true,
      newModelId: newModel.id,
      improvement,
      evalResults
    };
  }

  private async validateDataQuality(data: any): Promise<{ passed: boolean }> {
    // Check for duplicates, missing values, data drift
    return { passed: true };
  }

  private async trainModel(data: any): Promise<LLMModel> {
    // SFT fine-tuning
    return new LLMModel();
  }

  private async evaluateModel(model: LLMModel): Promise<Record<string, number>> {
    return {};
  }

  private async loadProductionModel(): Promise<LLMModel> {
    return new LLMModel();
  }

  private async loadData(version: string): Promise<any> {
    return [];
  }

  private computeImprovement(
    newMetrics: Record<string, number>,
    oldMetrics: Record<string, number>
  ): number {
    const avgNew = Object.values(newMetrics).reduce((a, b) => a + b) / Object.values(newMetrics).length;
    const avgOld = Object.values(oldMetrics).reduce((a, b) => a + b) / Object.values(oldMetrics).length;
    return (avgNew - avgOld) / avgOld;
  }

  private async deployToStaging(model: LLMModel): Promise<void> {
    // Deploy to staging environment for integration tests
  }
}

interface TrainingResult {
  success: boolean;
  reason?: string;
  newModelId?: string;
  improvement?: number;
  evalResults?: Record<string, number>;
}

Canary Deployment Automation

class CanaryDeploymentAutomation {
  async canaryDeploy(newModel: LLMModel): Promise<DeploymentResult> {
    const config: CanaryConfig = {
      initialTrafficPercent: 5,
      increments: [10, 25, 50, 100],
      metricsThresholds: {
        latencyP95: 1.2, // 20% increase threshold
        errorRate: 1.1, // 10% increase threshold
        accuracy: 0.98 // 2% decrease threshold
      },
      monitoringDurationMinutes: 30
    };

    let currentTraffic = config.initialTrafficPercent;

    for (const targetTraffic of config.increments) {
      console.log(`Routing ${currentTraffic}% → ${targetTraffic}% to new model`);

      // Set traffic split
      await this.setTrafficSplit(newModel.id, targetTraffic);

      // Monitor for specified duration
      const metricsOK = await this.monitorDeployment(
        config.monitoringDurationMinutes,
        config.metricsThresholds
      );

      if (!metricsOK) {
        console.log('Metrics degradation detected, rolling back');
        await this.rollback(newModel.id);
        return { success: false, reason: 'Metrics degradation' };
      }

      currentTraffic = targetTraffic;
    }

    console.log('Canary deployment successful!');
    return { success: true };
  }

  private async monitorDeployment(
    durationMinutes: number,
    thresholds: Record<string, number>
  ): Promise<boolean> {
    const startTime = Date.now();
    const endTime = startTime + durationMinutes * 60 * 1000;

    while (Date.now() < endTime) {
      const currentMetrics = await this.getCurrentMetrics();
      const baselineMetrics = await this.getBaselineMetrics();

      for (const [metric, threshold] of Object.entries(thresholds)) {
        const ratio = currentMetrics[metric] / baselineMetrics[metric];

        if (metric.includes('errorRate') || metric.includes('latency')) {
          if (ratio > threshold) {
            return false;
          }
        } else if (metric.includes('accuracy')) {
          if (ratio < threshold) {
            return false;
          }
        }
      }

      // Wait before next check
      await new Promise((r) => setTimeout(r, 60000)); // Check every minute
    }

    return true;
  }

  private async setTrafficSplit(modelId: string, percentTraffic: number): Promise<void> {
    // Configure load balancer/API gateway
  }

  private async rollback(modelId: string): Promise<void> {
    // Revert to previous model version
  }

  private async getCurrentMetrics(): Promise<Record<string, number>> {
    // Query monitoring system
    return {};
  }

  private async getBaselineMetrics(): Promise<Record<string, number>> {
    // Baseline from previous model
    return {};
  }
}

interface CanaryConfig {
  initialTrafficPercent: number;
  increments: number[];
  metricsThresholds: Record<string, number>;
  monitoringDurationMinutes: number;
}

interface DeploymentResult {
  success: boolean;
  reason?: string;
}

Feature Stores for LLM Apps

interface FeatureStore {
  getFeatures(userId: string): Promise<UserFeatures>;
  setFeatures(userId: string, features: UserFeatures): Promise<void>;
}

interface UserFeatures {
  userHistory: string[];
  preferences: Record<string, string>;
  recentQueries: string[];
  feedbackScores: number[];
}

class FeatureStoreClient {
  async enrichPrompt(userId: string, basePrompt: string): Promise<string> {
    const features = await this.getFeatures(userId);

    return `
${basePrompt}

User Context:
- Previous queries: ${features.recentQueries.slice(-3).join(', ')}
- Preferences: ${Object.entries(features.preferences)
      .map(([k, v]) => `${k}=${v}`)
      .join(', ')}
- Satisfaction: ${features.feedbackScores.slice(-5).reduce((a, b) => a + b) / 5}`;
  }

  private async getFeatures(userId: string): Promise<UserFeatures> {
    return {
      userHistory: [],
      preferences: {},
      recentQueries: [],
      feedbackScores: []
    };
  }
}

Experiment Tracking

interface ExperimentRun {
  id: string;
  name: string;
  model: string;
  hyperparameters: Record<string, any>;
  metrics: Record<string, number>;
  artifacts: string[];
  tags: string[];
  status: 'running' | 'completed' | 'failed';
}

class ExperimentTracker {
  private runs: Map<string, ExperimentRun> = new Map();

  startRun(name: string, hyperparams: Record<string, any>): string {
    const runId = `run-${Date.now()}`;
    this.runs.set(runId, {
      id: runId,
      name,
      model: '',
      hyperparameters: hyperparams,
      metrics: {},
      artifacts: [],
      tags: [],
      status: 'running'
    });
    return runId;
  }

  logMetric(runId: string, metric: string, value: number): void {
    const run = this.runs.get(runId);
    if (run) {
      run.metrics[metric] = value;
    }
  }

  completeRun(runId: string): void {
    const run = this.runs.get(runId);
    if (run) {
      run.status = 'completed';
    }
  }

  async compareRuns(runId1: string, runId2: string): Promise<ComparisonResult> {
    const run1 = this.runs.get(runId1);
    const run2 = this.runs.get(runId2);

    return {
      run1Name: run1?.name,
      run2Name: run2?.name,
      winner: this.determineWinner(run1, run2),
      metricDifferences: this.compareMetrics(run1, run2)
    };
  }

  private determineWinner(
    run1: ExperimentRun | undefined,
    run2: ExperimentRun | undefined
  ): string {
    if (!run1 || !run2) return 'unknown';
    const avg1 = Object.values(run1.metrics).reduce((a, b) => a + b) / Object.keys(run1.metrics).length;
    const avg2 = Object.values(run2.metrics).reduce((a, b) => a + b) / Object.keys(run2.metrics).length;
    return avg1 > avg2 ? run1.name : run2.name;
  }

  private compareMetrics(
    run1: ExperimentRun | undefined,
    run2: ExperimentRun | undefined
  ): Record<string, { run1: number; run2: number; change: number }> {
    return {};
  }
}

interface ComparisonResult {
  run1Name?: string;
  run2Name?: string;
  winner: string;
  metricDifferences: Record<string, { run1: number; run2: number; change: number }>;
}

Observability Stack

interface ObservabilityConfig {
  traces: { enabled: boolean; samplingRate: number };
  metrics: { enabled: boolean; interval: number };
  logs: { enabled: boolean; level: string };
  alerts: AlertRule[];
}

interface AlertRule {
  metric: string;
  threshold: number;
  duration: number; // seconds
  action: 'email' | 'slack' | 'pagerduty';
}

class ObservabilityStack {
  private traces: any[] = [];
  private metrics: Map<string, number[]> = new Map();

  async recordTrace(span: Span): Promise<void> {
    // Store in trace backend (Jaeger, DataDog, etc.)
    this.traces.push(span);
  }

  async recordMetric(name: string, value: number, tags: Record<string, string> = {}): Promise<void> {
    if (!this.metrics.has(name)) {
      this.metrics.set(name, []);
    }
    this.metrics.get(name)!.push(value);
  }

  async checkAlerts(config: ObservabilityConfig): Promise<void> {
    for (const rule of config.alerts) {
      const values = this.metrics.get(rule.metric) || [];
      const recentValues = values.slice(-10); // Last 10 values

      if (recentValues.every((v) => v > rule.threshold)) {
        console.log(`Alert triggered for ${rule.metric}`);
        await this.sendAlert(rule);
      }
    }
  }

  private async sendAlert(rule: AlertRule): Promise<void> {
    // Send notification via specified channel
  }
}

interface Span {
  traceId: string;
  spanId: string;
  operation: string;
  duration: number;
  tags: Record<string, string>;
}

Checklist

  • Build CI/CD pipeline triggered on code commits
  • Run automated evals on every prompt/generation change
  • Trigger training pipelines when data is updated
  • Implement staged environments (dev → staging → production)
  • Set up canary deployments with automated rollback triggers
  • Configure feature stores for user context enrichment
  • Track all experiments with versioning and comparison
  • Implement comprehensive observability (traces, metrics, logs, alerts)
  • Set up model monitoring with drift detection
  • Create incident response playbooks for model degradation

Conclusion

MLOps infrastructure transforms model deployment from manual error-prone operations to automated, observable, and recoverable processes. CI/CD pipelines catch regressions early. Automated evaluation validates changes. Canary deployments minimize user impact. Feature stores enable personalization. Experiment tracking enables learning. Observability enables rapid incident response. Together, these practices enable high-velocity, high-confidence model deployment.