Published on

AI Data Ingestion Pipelines — Processing Documents at Scale for RAG

Authors

Introduction

RAG is only as good as its ingestion pipeline. A poor pipeline introduces duplicates, loses metadata, produces inconsistent embeddings, or crashes silently. Building a production ingestion system means orchestrating extraction, cleaning, chunking, embedding, and error recovery—all at scale and cost-effectively.

Pipeline Stages: Extract → Clean → Chunk → Embed → Store

A robust pipeline follows predictable stages:

interface PipelineStage {
  name: string;
  process: (input: any) => Promise<any>;
  retryPolicy: { maxRetries: number; backoffMs: number };
  errorHandler: (error: Error) => Promise<void>;
}

interface Document {
  id: string;
  content: string;
  metadata: Record<string, any>;
  originalFormat: string;
  extractedAt: number;
}

interface DocumentChunk {
  docId: string;
  chunkIndex: number;
  content: string;
  metadata: Record<string, any>;
}

async function ingestDocument(sourceUri: string): Promise<void> {
  try {
    // Stage 1: Extract
    const doc = await extractDocument(sourceUri);

    // Stage 2: Clean
    const cleaned = cleanText(doc.content);
    doc.content = cleaned;

    // Stage 3: Chunk
    const chunks = chunkDocument(doc);

    // Stage 4: Deduplicate
    const uniqueChunks = await deduplicateChunks(chunks);

    // Stage 5: Embed
    const embedded = await embedChunks(uniqueChunks);

    // Stage 6: Store
    await storeChunks(embedded);
  } catch (error) {
    await handlePipelineError(sourceUri, error);
  }
}

Parallel Processing With Worker Queues

Process documents in parallel using a job queue (BullMQ, Celery, AWS SQS).

import Queue from 'bull';

const ingestionQueue = new Queue('document-ingestion', {
  redis: { host: 'localhost', port: 6379 },
});

// Enqueue a document for processing
async function enqueueDocument(sourceUri: string): Promise<void> {
  await ingestionQueue.add(
    { sourceUri },
    {
      priority: 10,
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 },
      removeOnComplete: true,
    }
  );
}

// Workers process documents in parallel
ingestionQueue.process(
  10, // 10 parallel workers
  async (job) => {
    const { sourceUri } = job.data;
    console.log(`Processing: ${sourceUri}`);

    try {
      await ingestDocument(sourceUri);
      job.progress(100);
    } catch (error) {
      job.log(`Failed: ${error.message}`);
      throw error; // Trigger retry
    }
  }
);

// Monitor queue health
ingestionQueue.on('completed', (job) => {
  console.log(`Completed: ${job.data.sourceUri}`);
});

ingestionQueue.on('failed', (job, error) => {
  console.error(`Failed permanently: ${job.data.sourceUri}`, error);
});

This parallelizes document processing, dramatically reducing end-to-end ingestion time.

Document Format Support

Different formats require different extraction logic.

type DocumentFormat = 'pdf' | 'docx' | 'html' | 'csv' | 'txt';

async function extractDocument(sourceUri: string): Promise<Document> {
  const format = detectFormat(sourceUri);

  let content: string;
  const metadata: Record<string, any> = {};

  switch (format) {
    case 'pdf':
      const pdfData = await extractPDF(sourceUri);
      content = pdfData.text;
      metadata.pageCount = pdfData.pageCount;
      break;

    case 'docx':
      const docxData = await extractDOCX(sourceUri);
      content = docxData.text;
      metadata.author = docxData.author;
      break;

    case 'html':
      content = await extractHTML(sourceUri); // strip tags, normalize
      break;

    case 'csv':
      const rows = await parseCSV(sourceUri);
      content = rows.map(row => Object.values(row).join(' ')).join('\n');
      break;

    case 'txt':
      content = await readFile(sourceUri);
      break;

    default:
      throw new Error(`Unsupported format: ${format}`);
  }

  return {
    id: generateId(),
    content,
    metadata,
    originalFormat: format,
    extractedAt: Date.now(),
  };
}

Use libraries like pdf-parse, docx, cheerio to handle format-specific extraction.

Incremental Updates: Only Re-Process Changed Docs

Don't re-embed documents that haven't changed.

interface DocumentVersion {
  uri: string;
  hash: string; // sha256(content)
  embeddedAt: number;
}

async function ingestDocumentIncremental(
  sourceUri: string,
  versionStore: Map<string, DocumentVersion>
): Promise<{ processed: boolean; reason?: string }> {
  const content = await fetchDocumentContent(sourceUri);
  const currentHash = sha256(content);

  const stored = versionStore.get(sourceUri);

  if (stored && stored.hash === currentHash) {
    return {
      processed: false,
      reason: 'Document unchanged; skipping embedding',
    };
  }

  // Document changed or new: process it
  const doc: Document = {
    id: generateId(),
    content,
    metadata: { sourceUri },
    originalFormat: detectFormat(sourceUri),
    extractedAt: Date.now(),
  };

  const chunks = chunkDocument(doc);
  const embedded = await embedChunks(chunks);
  await storeChunks(embedded);

  // Update version store
  versionStore.set(sourceUri, {
    uri: sourceUri,
    hash: currentHash,
    embeddedAt: Date.now(),
  });

  return { processed: true };
}

This reduces embedding costs dramatically when documents rarely change.

Deduplication

Prevent duplicate documents from inflating the knowledge base.

async function deduplicateChunks(chunks: DocumentChunk[]): Promise<DocumentChunk[]> {
  const seen = new Set<string>();
  const unique: DocumentChunk[] = [];

  for (const chunk of chunks) {
    // Hash chunk content; if seen before, skip
    const hash = sha256(chunk.content);

    if (!seen.has(hash)) {
      seen.add(hash);
      unique.push(chunk);
    }
  }

  return unique;
}

// For semantic deduplication (similar, not identical chunks):
async function semanticDeduplicateChunks(
  chunks: DocumentChunk[],
  vectorDb: VectorStore,
  threshold: number = 0.95
): Promise<DocumentChunk[]> {
  const embedded = await embedChunks(chunks);
  const unique: DocumentChunk[] = [];

  for (const chunk of embedded) {
    // Check if similar chunk exists in DB
    const similar = await vectorDb.search(chunk.embedding, {
      topK: 1,
      filter: c => cosineSimilarity(chunk.embedding, c.embedding) > threshold,
    });

    if (similar.length === 0) {
      unique.push(chunk);
    }
  }

  return unique;
}

Exact deduplication is fast (hash-based). Semantic deduplication is slower but catches near-duplicates.

Metadata Extraction

Preserve document metadata for filtering and context.

async function extractMetadata(
  doc: Document
): Promise<Record<string, any>> {
  const metadata = { ...doc.metadata };

  // Auto-extract: title, date, author
  if (!metadata.title) {
    metadata.title = await llm.extractTitle(doc.content);
  }

  if (!metadata.date) {
    metadata.date = extractDate(doc.content);
  }

  if (!metadata.category) {
    metadata.category = await llm.classify(doc.content, [
      'technical',
      'policy',
      'announcement',
      'other',
    ]);
  }

  return metadata;
}

This enriches documents with structured information for better retrieval filtering.

Progress Tracking and Error Recovery

Monitor pipeline health and resume from failures.

interface PipelineProgress {
  totalDocuments: number;
  processed: number;
  failed: number;
  tokensEmbedded: number;
  timeElapsed: number;
}

class PipelineMonitor {
  private progress: PipelineProgress = {
    totalDocuments: 0,
    processed: 0,
    failed: 0,
    tokensEmbedded: 0,
    timeElapsed: 0,
  };

  private startTime: number = Date.now();

  recordSuccess(tokensEmbedded: number): void {
    this.progress.processed++;
    this.progress.tokensEmbedded += tokensEmbedded;
    this.progress.timeElapsed = Date.now() - this.startTime;
  }

  recordFailure(): void {
    this.progress.failed++;
  }

  getReport(): string {
    const throughput = (
      this.progress.processed /
      (this.progress.timeElapsed / 1000)
    ).toFixed(2);
    return `Processed: ${this.progress.processed} | Failed: ${this.progress.failed} | Throughput: ${throughput} docs/s | Tokens: ${this.progress.tokensEmbedded.toLocaleString()}`;
  }
}

// Checkpointing for resume capability
async function ingestDocumentsWithCheckpoint(
  documents: string[],
  checkpointFile: string
): Promise<void> {
  const processed = new Set(
    (await readJSON(checkpointFile)).processed ?? []
  );

  for (const docUri of documents) {
    if (processed.has(docUri)) continue;

    try {
      await ingestDocument(docUri);
      processed.add(docUri);

      // Periodically save checkpoint
      if (processed.size % 100 === 0) {
        await writeJSON(checkpointFile, { processed: Array.from(processed) });
      }
    } catch (error) {
      console.error(`Failed: ${docUri}`, error);
    }
  }
}

Checkpoints allow resuming from the last processed document if the pipeline crashes.

Cost Estimation for Embedding Pipeline

Calculate embedding costs upfront to budget appropriately.

interface EmbeddingCost {
  documentsCount: number;
  averageTokensPerDocument: number;
  totalTokens: number;
  costPer1MTokens: number;
  totalCost: number;
}

function estimateEmbeddingCost(
  documents: Document[],
  embeddingModel: string = 'text-embedding-3-small'
): EmbeddingCost {
  const avgTokensPerDoc = documents.reduce(
    (sum, doc) => sum + estimateTokens(doc.content),
    0
  ) / documents.length;

  const totalTokens = documents.length * avgTokensPerDoc;

  // Pricing: text-embedding-3-small = $0.02 per 1M tokens
  const costPer1MTokens =
    embeddingModel === 'text-embedding-3-small' ? 0.02 : 0.15;

  return {
    documentsCount: documents.length,
    averageTokensPerDocument: avgTokensPerDoc,
    totalTokens,
    costPer1MTokens,
    totalCost: (totalTokens / 1_000_000) * costPer1MTokens,
  };
}

// Example:
const cost = estimateEmbeddingCost(documents);
console.log(`Embedding ${cost.documentsCount} docs will cost: $${cost.totalCost.toFixed(2)}`);

Use this to negotiate batch pricing with embedding providers or optimize chunking to reduce tokens.

Monitoring Pipeline Health

Track error rates, latency, and throughput.

interface PipelineHealthMetrics {
  documentsProcessedToday: number;
  documentsFailedToday: number;
  successRate: number; // %
  averageProcessingTimeMs: number;
  estimatedTokensEmbeddedToday: number;
}

async function checkPipelineHealth(): Promise<PipelineHealthMetrics> {
  const logs = await fetchLogs({
    filter: { service: 'document-ingestion' },
    timeRange: 'last-24h',
  });

  const processed = logs.filter(l => l.event === 'completed').length;
  const failed = logs.filter(l => l.event === 'failed').length;
  const total = processed + failed;

  const processingTimes = logs
    .filter(l => l.duration)
    .map(l => l.duration);
  const avgTime =
    processingTimes.reduce((a, b) => a + b, 0) / processingTimes.length;

  const tokens = logs
    .filter(l => l.tokensEmbedded)
    .reduce((sum, l) => sum + l.tokensEmbedded, 0);

  return {
    documentsProcessedToday: processed,
    documentsFailedToday: failed,
    successRate: (processed / total) * 100,
    averageProcessingTimeMs: avgTime,
    estimatedTokensEmbeddedToday: tokens,
  };
}

Alert if success rate drops below 95% or throughput dips unexpectedly.

Checklist

  • Build pipeline stages: extract, clean, chunk, embed, store
  • Use worker queues for parallel processing
  • Support PDF, DOCX, HTML, CSV, and TXT formats
  • Implement incremental updates to avoid re-embedding unchanged docs
  • Deduplicate chunks to prevent knowledge base bloat
  • Extract and preserve metadata for filtering
  • Implement checkpointing for fault recovery
  • Estimate embedding costs before running at scale
  • Monitor pipeline health: success rate, throughput, token usage

Conclusion

A robust ingestion pipeline is invisible when working well and catastrophic when it fails. Design for parallelism, track progress, deduplicate aggressively, and monitor closely. Start with simple hash-based deduplication and add semantic deduplication only if storage or retrieval quality is a problem.