RAG Pipeline End-to-End¶
This note builds a complete, production-ready RAG pipeline: ingest a PDF, chunk it, embed it, store it in ChromaDB, and answer questions with citations.
Learning objectives¶
- Ingest PDF documents into a RAG pipeline
- Store and query embeddings with ChromaDB
- Handle multi-document corpora with metadata
- Measure and log pipeline performance
Full pipeline implementation¶
import os
import hashlib
import json
import time
from dataclasses import dataclass, field
from openai import OpenAI
from anthropic import Anthropic
import chromadb
from chromadb.utils import embedding_functions
import tiktoken
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
anthropic_client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
enc = tiktoken.get_encoding("cl100k_base")
@dataclass
class RAGConfig:
embedding_model: str = "text-embedding-3-small"
generation_model: str = "claude-sonnet-4-6"
chunk_size: int = 512 # tokens
chunk_overlap: int = 64 # tokens
retrieval_k: int = 4
min_relevance_score: float = 0.5
max_context_tokens: int = 4000
Stage 1: Document ingestion¶
def load_text_file(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
def load_pdf(path: str) -> str:
"""Extract text from PDF using PyPDF2."""
try:
import PyPDF2
text_parts = []
with open(path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text.strip())
return "\n\n".join(text_parts)
except ImportError:
raise ImportError("Install PyPDF2: pip install PyPDF2")
def load_document(path: str) -> tuple[str, str]:
"""Returns (text, source_name)."""
import os
ext = os.path.splitext(path)[1].lower()
source = os.path.basename(path)
if ext == ".pdf":
return load_pdf(path), source
elif ext in {".txt", ".md"}:
return load_text_file(path), source
else:
raise ValueError(f"Unsupported file type: {ext}")
def chunk_document(text: str, source: str, config: RAGConfig) -> list[dict]:
"""Fixed-size token chunking with overlap."""
tokens = enc.encode(text)
chunks = []
i = 0
while i < len(tokens):
chunk_tokens = tokens[i:i + config.chunk_size]
chunk_text = enc.decode(chunk_tokens).strip()
if chunk_text:
# Stable ID based on content hash
chunk_id = hashlib.md5(f"{source}-{i}-{chunk_text[:50]}".encode()).hexdigest()[:16]
chunks.append({
"id": chunk_id,
"text": chunk_text,
"source": source,
"chunk_index": len(chunks),
"token_count": len(chunk_tokens),
"char_start": len(enc.decode(tokens[:i]))
})
i += config.chunk_size - config.chunk_overlap
return chunks
Stage 2: Embedding and storage¶
def get_chroma_collection(name: str = "rag_collection") -> chromadb.Collection:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small"
)
return chroma_client.get_or_create_collection(
name=name,
embedding_function=openai_ef,
metadata={"hnsw:space": "cosine"}
)
def ingest_chunks(
chunks: list[dict],
collection: chromadb.Collection,
batch_size: int = 100
) -> dict:
"""Upsert chunks into ChromaDB in batches."""
start_time = time.time()
added = 0
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
collection.upsert(
ids=[c["id"] for c in batch],
documents=[c["text"] for c in batch],
metadatas=[{
"source": c["source"],
"chunk_index": c["chunk_index"],
"token_count": c["token_count"]
} for c in batch]
)
added += len(batch)
print(f" Upserted {added}/{len(chunks)} chunks...")
elapsed = time.time() - start_time
return {"chunks_added": added, "elapsed_seconds": elapsed}
def ingest_document(path: str, collection: chromadb.Collection, config: RAGConfig) -> dict:
"""Full ingestion pipeline for a single document."""
print(f"Loading: {path}")
text, source = load_document(path)
print(f" Loaded {len(text):,} chars")
chunks = chunk_document(text, source, config)
print(f" Created {len(chunks)} chunks")
result = ingest_chunks(chunks, collection)
result["source"] = source
result["chunks_created"] = len(chunks)
return result
Stage 3: Retrieval¶
def retrieve(
query: str,
collection: chromadb.Collection,
config: RAGConfig,
source_filter: str | None = None
) -> list[dict]:
"""Query ChromaDB and return formatted results."""
where_clause = {"source": source_filter} if source_filter else None
results = collection.query(
query_texts=[query],
n_results=config.retrieval_k,
where=where_clause,
include=["documents", "metadatas", "distances"]
)
if not results["ids"][0]:
return []
retrieved = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
# ChromaDB cosine distance: 0 = identical, 2 = opposite
# Convert to similarity: 1 - distance/2 for cosine space
similarity = 1 - dist / 2
if similarity >= config.min_relevance_score:
retrieved.append({
"text": doc,
"source": meta["source"],
"chunk_index": meta["chunk_index"],
"similarity": similarity
})
return retrieved
Stage 4: Generation¶
SYSTEM_PROMPT = """You are a precise Q&A assistant. Answer questions using ONLY the provided document context.
Instructions:
- If the answer is in the context, give it clearly and cite the source document.
- If the answer is not in the context, respond exactly: "I don't have information about this in the provided documents."
- Do not use your general knowledge — only the provided context.
- Cite sources using the format: [Source: filename]"""
def generate_answer(
question: str,
retrieved: list[dict],
config: RAGConfig
) -> dict:
if not retrieved:
return {
"answer": "I don't have information about this in the provided documents.",
"sources": [],
"retrieved_count": 0,
"input_tokens": 0,
"output_tokens": 0,
"cost_usd": 0.0
}
# Assemble context
context_parts = []
sources = []
for item in retrieved:
context_parts.append(f"[Source: {item['source']}]\n{item['text']}")
if item["source"] not in sources:
sources.append(item["source"])
context = "\n\n---\n\n".join(context_parts)
response = anthropic_client.messages.create(
model=config.generation_model,
max_tokens=600,
system=SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": f"<context>\n{context}\n</context>\n\nQuestion: {question}"
}]
)
cost = (response.usage.input_tokens * 3 + response.usage.output_tokens * 15) / 1_000_000
return {
"answer": response.content[0].text,
"sources": sources,
"retrieved_count": len(retrieved),
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cost_usd": cost
}
Putting it all together¶
class RAGPipeline:
def __init__(self, collection_name: str = "rag_demo", config: RAGConfig | None = None):
self.config = config or RAGConfig()
self.collection = get_chroma_collection(collection_name)
def ingest(self, path: str) -> dict:
return ingest_document(path, self.collection, self.config)
def ask(self, question: str, source_filter: str | None = None) -> dict:
retrieved = retrieve(question, self.collection, self.config, source_filter)
result = generate_answer(question, retrieved, self.config)
result["question"] = question
return result
def stats(self) -> dict:
count = self.collection.count()
return {"total_chunks": count, "collection": self.collection.name}
# Demo with sample text files (no real PDFs needed)
import tempfile, os
# Create sample "documents"
sample_docs = {
"company-policy.txt": """
Vacation Policy: All employees receive 20 days of paid time off annually.
Unused PTO rolls over up to 10 days into the next calendar year.
PTO requests must be submitted through the HR portal with 2 weeks notice.
Remote Work: Employees may work remotely up to 3 days per week.
Full remote approval requires manager sign-off and a minimum of 6 months tenure.
Core hours are 10am–3pm in the employee's local timezone.
""",
"product-faq.txt": """
Q: How do I reset my password?
A: Click 'Forgot Password' on the login page. You will receive a reset link within 5 minutes.
Q: What payment methods do you accept?
A: We accept all major credit cards, PayPal, and bank transfers for enterprise accounts.
Q: Is there a free trial?
A: Yes, all plans include a 14-day free trial with no credit card required.
Q: How do I cancel my subscription?
A: You can cancel at any time from Settings > Billing. Your access continues until the end of the billing period.
"""
}
# Write to temp files and ingest
pipeline = RAGPipeline(collection_name="demo", config=RAGConfig(retrieval_k=3))
with tempfile.TemporaryDirectory() as tmpdir:
for filename, content in sample_docs.items():
path = os.path.join(tmpdir, filename)
with open(path, "w") as f:
f.write(content.strip())
pipeline.ingest(path)
print(f"\nPipeline stats: {pipeline.stats()}")
# Ask questions
questions = [
"How many vacation days do employees get?",
"Can I work from home every day?",
"How do I cancel my subscription?",
"What is the company's stock price?" # out of scope
]
for q in questions:
result = pipeline.ask(q)
print(f"\nQ: {q}")
print(f"A: {result['answer']}")
print(f" Sources: {result['sources']} | Tokens: {result['input_tokens']}/{result['output_tokens']} | ${result['cost_usd']:.5f}")
Performance logging¶
import csv
from datetime import datetime
def log_query(result: dict, log_file: str = "rag_queries.csv") -> None:
fieldnames = ["timestamp", "question", "retrieved_count", "sources",
"input_tokens", "output_tokens", "cost_usd"]
row = {
"timestamp": datetime.now().isoformat(),
"question": result["question"],
"retrieved_count": result["retrieved_count"],
"sources": "; ".join(result["sources"]),
"input_tokens": result["input_tokens"],
"output_tokens": result["output_tokens"],
"cost_usd": result["cost_usd"]
}
file_exists = os.path.exists(log_file)
with open(log_file, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow(row)
Log everything in production
Every query, retrieved chunk, and generated answer should be logged. This gives you: (1) a dataset for evaluating retrieval quality, (2) cost tracking per query type, (3) the ability to replay queries after prompt improvements, and (4) a paper trail for auditing AI-generated answers.