- Published on
AI Data Ingestion Pipelines — Processing Documents at Scale for RAG
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
RAG is only as good as its ingestion pipeline. A poor pipeline introduces duplicates, loses metadata, produces inconsistent embeddings, or crashes silently. Building a production ingestion system means orchestrating extraction, cleaning, chunking, embedding, and error recovery—all at scale and cost-effectively.
- Pipeline Stages: Extract → Clean → Chunk → Embed → Store
- Parallel Processing With Worker Queues
- Document Format Support
- Incremental Updates: Only Re-Process Changed Docs
- Deduplication
- Metadata Extraction
- Progress Tracking and Error Recovery
- Cost Estimation for Embedding Pipeline
- Monitoring Pipeline Health
- Checklist
- Conclusion
Pipeline Stages: Extract → Clean → Chunk → Embed → Store
A robust pipeline follows predictable stages:
interface PipelineStage {
name: string;
process: (input: any) => Promise<any>;
retryPolicy: { maxRetries: number; backoffMs: number };
errorHandler: (error: Error) => Promise<void>;
}
interface Document {
id: string;
content: string;
metadata: Record<string, any>;
originalFormat: string;
extractedAt: number;
}
interface DocumentChunk {
docId: string;
chunkIndex: number;
content: string;
metadata: Record<string, any>;
}
async function ingestDocument(sourceUri: string): Promise<void> {
try {
// Stage 1: Extract
const doc = await extractDocument(sourceUri);
// Stage 2: Clean
const cleaned = cleanText(doc.content);
doc.content = cleaned;
// Stage 3: Chunk
const chunks = chunkDocument(doc);
// Stage 4: Deduplicate
const uniqueChunks = await deduplicateChunks(chunks);
// Stage 5: Embed
const embedded = await embedChunks(uniqueChunks);
// Stage 6: Store
await storeChunks(embedded);
} catch (error) {
await handlePipelineError(sourceUri, error);
}
}
Parallel Processing With Worker Queues
Process documents in parallel using a job queue (BullMQ, Celery, AWS SQS).
import Queue from 'bull';
const ingestionQueue = new Queue('document-ingestion', {
redis: { host: 'localhost', port: 6379 },
});
// Enqueue a document for processing
async function enqueueDocument(sourceUri: string): Promise<void> {
await ingestionQueue.add(
{ sourceUri },
{
priority: 10,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
}
);
}
// Workers process documents in parallel
ingestionQueue.process(
10, // 10 parallel workers
async (job) => {
const { sourceUri } = job.data;
console.log(`Processing: ${sourceUri}`);
try {
await ingestDocument(sourceUri);
job.progress(100);
} catch (error) {
job.log(`Failed: ${error.message}`);
throw error; // Trigger retry
}
}
);
// Monitor queue health
ingestionQueue.on('completed', (job) => {
console.log(`Completed: ${job.data.sourceUri}`);
});
ingestionQueue.on('failed', (job, error) => {
console.error(`Failed permanently: ${job.data.sourceUri}`, error);
});
This parallelizes document processing, dramatically reducing end-to-end ingestion time.
Document Format Support
Different formats require different extraction logic.
type DocumentFormat = 'pdf' | 'docx' | 'html' | 'csv' | 'txt';
async function extractDocument(sourceUri: string): Promise<Document> {
const format = detectFormat(sourceUri);
let content: string;
const metadata: Record<string, any> = {};
switch (format) {
case 'pdf':
const pdfData = await extractPDF(sourceUri);
content = pdfData.text;
metadata.pageCount = pdfData.pageCount;
break;
case 'docx':
const docxData = await extractDOCX(sourceUri);
content = docxData.text;
metadata.author = docxData.author;
break;
case 'html':
content = await extractHTML(sourceUri); // strip tags, normalize
break;
case 'csv':
const rows = await parseCSV(sourceUri);
content = rows.map(row => Object.values(row).join(' ')).join('\n');
break;
case 'txt':
content = await readFile(sourceUri);
break;
default:
throw new Error(`Unsupported format: ${format}`);
}
return {
id: generateId(),
content,
metadata,
originalFormat: format,
extractedAt: Date.now(),
};
}
Use libraries like pdf-parse, docx, cheerio to handle format-specific extraction.
Incremental Updates: Only Re-Process Changed Docs
Don't re-embed documents that haven't changed.
interface DocumentVersion {
uri: string;
hash: string; // sha256(content)
embeddedAt: number;
}
async function ingestDocumentIncremental(
sourceUri: string,
versionStore: Map<string, DocumentVersion>
): Promise<{ processed: boolean; reason?: string }> {
const content = await fetchDocumentContent(sourceUri);
const currentHash = sha256(content);
const stored = versionStore.get(sourceUri);
if (stored && stored.hash === currentHash) {
return {
processed: false,
reason: 'Document unchanged; skipping embedding',
};
}
// Document changed or new: process it
const doc: Document = {
id: generateId(),
content,
metadata: { sourceUri },
originalFormat: detectFormat(sourceUri),
extractedAt: Date.now(),
};
const chunks = chunkDocument(doc);
const embedded = await embedChunks(chunks);
await storeChunks(embedded);
// Update version store
versionStore.set(sourceUri, {
uri: sourceUri,
hash: currentHash,
embeddedAt: Date.now(),
});
return { processed: true };
}
This reduces embedding costs dramatically when documents rarely change.
Deduplication
Prevent duplicate documents from inflating the knowledge base.
async function deduplicateChunks(chunks: DocumentChunk[]): Promise<DocumentChunk[]> {
const seen = new Set<string>();
const unique: DocumentChunk[] = [];
for (const chunk of chunks) {
// Hash chunk content; if seen before, skip
const hash = sha256(chunk.content);
if (!seen.has(hash)) {
seen.add(hash);
unique.push(chunk);
}
}
return unique;
}
// For semantic deduplication (similar, not identical chunks):
async function semanticDeduplicateChunks(
chunks: DocumentChunk[],
vectorDb: VectorStore,
threshold: number = 0.95
): Promise<DocumentChunk[]> {
const embedded = await embedChunks(chunks);
const unique: DocumentChunk[] = [];
for (const chunk of embedded) {
// Check if similar chunk exists in DB
const similar = await vectorDb.search(chunk.embedding, {
topK: 1,
filter: c => cosineSimilarity(chunk.embedding, c.embedding) > threshold,
});
if (similar.length === 0) {
unique.push(chunk);
}
}
return unique;
}
Exact deduplication is fast (hash-based). Semantic deduplication is slower but catches near-duplicates.
Metadata Extraction
Preserve document metadata for filtering and context.
async function extractMetadata(
doc: Document
): Promise<Record<string, any>> {
const metadata = { ...doc.metadata };
// Auto-extract: title, date, author
if (!metadata.title) {
metadata.title = await llm.extractTitle(doc.content);
}
if (!metadata.date) {
metadata.date = extractDate(doc.content);
}
if (!metadata.category) {
metadata.category = await llm.classify(doc.content, [
'technical',
'policy',
'announcement',
'other',
]);
}
return metadata;
}
This enriches documents with structured information for better retrieval filtering.
Progress Tracking and Error Recovery
Monitor pipeline health and resume from failures.
interface PipelineProgress {
totalDocuments: number;
processed: number;
failed: number;
tokensEmbedded: number;
timeElapsed: number;
}
class PipelineMonitor {
private progress: PipelineProgress = {
totalDocuments: 0,
processed: 0,
failed: 0,
tokensEmbedded: 0,
timeElapsed: 0,
};
private startTime: number = Date.now();
recordSuccess(tokensEmbedded: number): void {
this.progress.processed++;
this.progress.tokensEmbedded += tokensEmbedded;
this.progress.timeElapsed = Date.now() - this.startTime;
}
recordFailure(): void {
this.progress.failed++;
}
getReport(): string {
const throughput = (
this.progress.processed /
(this.progress.timeElapsed / 1000)
).toFixed(2);
return `Processed: ${this.progress.processed} | Failed: ${this.progress.failed} | Throughput: ${throughput} docs/s | Tokens: ${this.progress.tokensEmbedded.toLocaleString()}`;
}
}
// Checkpointing for resume capability
async function ingestDocumentsWithCheckpoint(
documents: string[],
checkpointFile: string
): Promise<void> {
const processed = new Set(
(await readJSON(checkpointFile)).processed ?? []
);
for (const docUri of documents) {
if (processed.has(docUri)) continue;
try {
await ingestDocument(docUri);
processed.add(docUri);
// Periodically save checkpoint
if (processed.size % 100 === 0) {
await writeJSON(checkpointFile, { processed: Array.from(processed) });
}
} catch (error) {
console.error(`Failed: ${docUri}`, error);
}
}
}
Checkpoints allow resuming from the last processed document if the pipeline crashes.
Cost Estimation for Embedding Pipeline
Calculate embedding costs upfront to budget appropriately.
interface EmbeddingCost {
documentsCount: number;
averageTokensPerDocument: number;
totalTokens: number;
costPer1MTokens: number;
totalCost: number;
}
function estimateEmbeddingCost(
documents: Document[],
embeddingModel: string = 'text-embedding-3-small'
): EmbeddingCost {
const avgTokensPerDoc = documents.reduce(
(sum, doc) => sum + estimateTokens(doc.content),
0
) / documents.length;
const totalTokens = documents.length * avgTokensPerDoc;
// Pricing: text-embedding-3-small = $0.02 per 1M tokens
const costPer1MTokens =
embeddingModel === 'text-embedding-3-small' ? 0.02 : 0.15;
return {
documentsCount: documents.length,
averageTokensPerDocument: avgTokensPerDoc,
totalTokens,
costPer1MTokens,
totalCost: (totalTokens / 1_000_000) * costPer1MTokens,
};
}
// Example:
const cost = estimateEmbeddingCost(documents);
console.log(`Embedding ${cost.documentsCount} docs will cost: $${cost.totalCost.toFixed(2)}`);
Use this to negotiate batch pricing with embedding providers or optimize chunking to reduce tokens.
Monitoring Pipeline Health
Track error rates, latency, and throughput.
interface PipelineHealthMetrics {
documentsProcessedToday: number;
documentsFailedToday: number;
successRate: number; // %
averageProcessingTimeMs: number;
estimatedTokensEmbeddedToday: number;
}
async function checkPipelineHealth(): Promise<PipelineHealthMetrics> {
const logs = await fetchLogs({
filter: { service: 'document-ingestion' },
timeRange: 'last-24h',
});
const processed = logs.filter(l => l.event === 'completed').length;
const failed = logs.filter(l => l.event === 'failed').length;
const total = processed + failed;
const processingTimes = logs
.filter(l => l.duration)
.map(l => l.duration);
const avgTime =
processingTimes.reduce((a, b) => a + b, 0) / processingTimes.length;
const tokens = logs
.filter(l => l.tokensEmbedded)
.reduce((sum, l) => sum + l.tokensEmbedded, 0);
return {
documentsProcessedToday: processed,
documentsFailedToday: failed,
successRate: (processed / total) * 100,
averageProcessingTimeMs: avgTime,
estimatedTokensEmbeddedToday: tokens,
};
}
Alert if success rate drops below 95% or throughput dips unexpectedly.
Checklist
- Build pipeline stages: extract, clean, chunk, embed, store
- Use worker queues for parallel processing
- Support PDF, DOCX, HTML, CSV, and TXT formats
- Implement incremental updates to avoid re-embedding unchanged docs
- Deduplicate chunks to prevent knowledge base bloat
- Extract and preserve metadata for filtering
- Implement checkpointing for fault recovery
- Estimate embedding costs before running at scale
- Monitor pipeline health: success rate, throughput, token usage
Conclusion
A robust ingestion pipeline is invisible when working well and catastrophic when it fails. Design for parallelism, track progress, deduplicate aggressively, and monitor closely. Start with simple hash-based deduplication and add semantic deduplication only if storage or retrieval quality is a problem.