RAG System — Build Retrieval Augmented Generation from Scratch
Advertisement
Introduction
Retrieval-Augmented Generation (RAG) combines document retrieval with LLM generation to provide accurate, grounded answers. This guide shows you how to build a complete RAG system from components, understanding each layer's purpose.
- RAG Architecture Overview
- Building the Indexing Pipeline
- Document Loading and Chunking
- Generating Embeddings
- Vector Storage and Retrieval
- Simple In-Memory Vector Store
- The Retrieval-Generation Pipeline
- End-to-End RAG
- Advanced Retrieval Strategies
- Re-ranking Retrieved Results
- Multi-Query Retrieval
- Production Considerations
- Caching and Performance
- Evaluation Metrics
- Conclusion
- FAQ
RAG Architecture Overview
RAG systems have three core phases:
- Indexing: Convert documents to searchable vector indexes
- Retrieval: Find relevant documents for a query
- Generation: Use retrieved documents as context for LLM response
Documents → Chunking → Embeddings → Vector Index
↓
Query → Embedding → Vector Search → Retrieve Chunks
↓
Chunks + Query → LLM → Response
Building the Indexing Pipeline
Document Loading and Chunking
import os
from typing import List
from dataclasses import dataclass
@dataclass
class Document:
content: str
metadata: dict
def load_documents(directory: str) -> List[Document]:
"""Load text files from directory."""
documents = []
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r') as f:
documents.append(Document(
content=f.read(),
metadata={'source': filename}
))
return documents
def chunk_documents(
documents: List[Document],
chunk_size: int = 1000,
overlap: int = 200
) -> List[Document]:
"""Split documents into overlapping chunks."""
chunks = []
for doc in documents:
# Split by sentences for better semantics
sentences = doc.content.split('.')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < chunk_size:
current_chunk += sentence + "."
else:
if current_chunk:
chunks.append(Document(
content=current_chunk,
metadata=doc.metadata.copy()
))
# Add overlap
current_chunk = current_chunk[-overlap:] + sentence + "."
if current_chunk:
chunks.append(Document(current_chunk, doc.metadata))
return chunks
# Usage
documents = load_documents('./data')
chunks = chunk_documents(documents, chunk_size=1000)
Generating Embeddings
import numpy as np
from openai import OpenAI
client = OpenAI()
def embed_documents(chunks: List[Document]) -> tuple:
"""Generate embeddings for document chunks."""
texts = [chunk.content for chunk in chunks]
embeddings = []
for text in texts:
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
embeddings.append(response.data[0].embedding)
return np.array(embeddings), chunks
embeddings_array, chunks = embed_documents(chunks)
Vector Storage and Retrieval
Simple In-Memory Vector Store
from typing import Tuple
class VectorStore:
def __init__(self, embeddings: np.ndarray, chunks: List[Document]):
self.embeddings = embeddings
self.chunks = chunks
def retrieve(self, query_embedding: np.ndarray, k: int = 3) -> List[Document]:
"""Find k nearest neighbors using cosine similarity."""
similarities = np.dot(self.embeddings, query_embedding) / (
np.linalg.norm(self.embeddings, axis=1) *
np.linalg.norm(query_embedding)
)
top_k_indices = np.argsort(similarities)[-k:][::-1]
return [self.chunks[i] for i in top_k_indices]
# Create vector store
vector_store = VectorStore(embeddings_array, chunks)
The Retrieval-Generation Pipeline
End-to-End RAG
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
class RAGSystem:
def __init__(self, vector_store: VectorStore, llm_model: str = "gpt-4"):
self.vector_store = vector_store
self.llm = ChatOpenAI(model=llm_model, temperature=0)
def query(self, question: str, k: int = 3) -> str:
# Step 1: Embed the question
response = client.embeddings.create(
input=question,
model="text-embedding-3-small"
)
query_embedding = np.array(response.data[0].embedding)
# Step 2: Retrieve relevant chunks
retrieved_chunks = self.vector_store.retrieve(query_embedding, k=k)
context = "\n".join([chunk.content for chunk in retrieved_chunks])
# Step 3: Generate answer using LLM
prompt = ChatPromptTemplate.from_template(
"""Use the following context to answer the question.
If the answer is not in the context, say so.
Context:
{context}
Question: {question}
Answer:"""
)
messages = prompt.format_messages(
context=context,
question=question
)
response = self.llm.invoke(messages)
return response.content
# Usage
rag_system = RAGSystem(vector_store)
answer = rag_system.query("What is the main topic?")
print(answer)
Advanced Retrieval Strategies
Re-ranking Retrieved Results
from sklearn.metrics.pairwise import cosine_similarity
def rerank_results(
query_embedding: np.ndarray,
chunks: List[Document],
chunk_embeddings: np.ndarray,
k_rerank: int = 3
) -> List[Document]:
"""Re-rank based on semantic similarity."""
similarities = cosine_similarity(
[query_embedding],
chunk_embeddings
)[0]
top_indices = np.argsort(similarities)[-k_rerank:][::-1]
return [chunks[i] for i in top_indices]
Multi-Query Retrieval
def generate_sub_queries(question: str, num_queries: int = 3) -> List[str]:
"""Generate alternative phrasings of the question."""
prompt = ChatPromptTemplate.from_template(
"""Generate {num} alternative ways to phrase this question:
{question}
Return only the questions, one per line."""
)
response = llm.invoke(prompt.format_messages(
question=question,
num=num_queries
))
return response.content.split('\n')
def multi_query_retrieve(questions: List[str], vector_store: VectorStore):
"""Retrieve using multiple query formulations."""
all_chunks = set()
for question in questions:
query_embedding = get_embedding(question)
chunks = vector_store.retrieve(query_embedding, k=3)
all_chunks.update(chunks)
return list(all_chunks)
Production Considerations
Caching and Performance
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> np.ndarray:
"""Cache embeddings to reduce API calls."""
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return np.array(response.data[0].embedding)
def get_embedding(text: str) -> np.ndarray:
"""Get embedding with caching."""
return cached_embedding(text)
Evaluation Metrics
def evaluate_retrieval(
retrieved: List[Document],
ground_truth: List[Document]
) -> dict:
"""Calculate retrieval quality metrics."""
retrieved_ids = {d.metadata['id'] for d in retrieved}
truth_ids = {d.metadata['id'] for d in ground_truth}
tp = len(retrieved_ids & truth_ids)
fp = len(retrieved_ids - truth_ids)
fn = len(truth_ids - retrieved_ids)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {"precision": precision, "recall": recall, "f1": f1}
Conclusion
Building RAG from scratch teaches you the fundamentals. For production, use frameworks like LangChain or LlamaIndex that handle these patterns efficiently. Understanding the architecture enables better system design and debugging.
FAQ
Q: What's the optimal chunk size? A: Typically 512-2048 tokens. Test with your data; larger chunks provide more context but reduce search precision.
Q: Should I use dense or sparse embeddings? A: Dense (semantic) embeddings for meaning-based search, sparse (keyword) for exact matching. Hybrid approaches often work best.
Q: How do I handle document updates? A: Maintain a timestamp in metadata, update affected chunks only, and consider incremental indexing strategies.
Advertisement