Lab 02 — Production RAG Pipeline (Solution Walkthrough)
Phase: 7 — Retrieval, RAG & Agents | Difficulty: ⭐⭐⭐⭐☆ | Time: 4–6 hours
Concept primer:
../HITCHHIKERS-GUIDE.md§Embeddings, §Vector indices, §RAG.
Run
pip install -r requirements.txt
docker run -d -p 6333:6333 qdrant/qdrant
python solution.py --ingest ./docs # ingest a folder of .md / .txt
python solution.py --serve # start API on :8000
curl -N -X POST localhost:8000/chat -H 'content-type: application/json' \
-d '{"query":"what is FlashAttention?"}'
0. The mission
A complete RAG system in ~250 lines: chunk → embed → ingest → retrieve → stream. Every piece is what you'd ship in production:
- Token-aware chunking with overlap.
- BGE-small as the embedding model (best quality at 384-dim).
- Qdrant with HNSW + cosine similarity.
- FastAPI with Server-Sent Events for token streaming.
- Prompt template that actually grounds the model in retrieved context.
1. Chunking — token-aware with overlap
import tiktoken
enc = tiktoken.get_encoding("cl100k_base") # GPT-4 / OpenAI tokenizer
def chunk_text(text: str, chunk_tokens=400, overlap=80) -> list[str]:
ids = enc.encode(text)
chunks = []
i = 0
while i < len(ids):
window = ids[i : i + chunk_tokens]
chunks.append(enc.decode(window))
i += chunk_tokens - overlap
return chunks
Why these numbers:
chunk_tokens=400— balances retrieval precision and information density. Too small (≤100): single sentences, too narrow to be useful answers. Too large (≥1000): mixes multiple topics, dilutes embedding signal.overlap=80(~20%) — prevents critical info that straddles a chunk boundary from being lost. Adds ~20% storage cost; eliminates a whole class of "missing answer" failures.- Token-aware not character-aware — ensures chunks fit cleanly in embedding-model context (BGE-small's max is 512 tokens).
Production tweaks: split on paragraph/heading boundaries first, then token-chunk inside each section.
2. Embeddings — BGE-small with normalization
from sentence_transformers import SentenceTransformer
emb_model = SentenceTransformer("BAAI/bge-small-en-v1.5", device="cuda")
vecs = emb_model.encode(
chunks,
normalize_embeddings=True, # 👈 unit-norm → cosine = dot product
batch_size=64,
show_progress_bar=True,
)
Why BGE-small:
- 384-dim, 33M params — fast on CPU, free on GPU.
- Top-3 on MTEB at this size class. Larger BGE-base (768) is ~5% better; BGE-large (1024) is ~3% beyond that.
normalize_embeddings=True— unit-norm vectors mean cosine similarity reduces to a dot product, which Qdrant computes faster.
For query encoding, BGE expects an instruction prefix:
QUERY_PREFIX = "Represent this sentence for searching relevant passages: "
q_vec = emb_model.encode([QUERY_PREFIX + query], normalize_embeddings=True)[0]
Missing this prefix is a 5–10% silent quality loss — catches everyone the first time.
3. Qdrant ingestion
from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance, PointStruct
client = QdrantClient(url="http://localhost:6333")
client.recreate_collection(
collection_name="docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=v.tolist(),
payload={"text": chunk, "source": str(path)},
)
for v, chunk in zip(vecs, chunks)
]
client.upsert("docs", points=points, wait=True)
Distance.COSINEmatches our normalized vectors. Could also useDOTsince vectors are already unit-norm — same result, marginally faster.- Qdrant builds an HNSW index by default: ~99% recall at 10× the speed of brute force at 1M+ vectors.
wait=Trueblocks until indexed — essential before issuing queries (otherwise you get empty results from a still-building index).- Payload stores the original text + source so we can return citations without a second lookup.
4. Retrieval
def retrieve(query: str, k=5) -> list[dict]:
q_vec = emb_model.encode([QUERY_PREFIX + query], normalize_embeddings=True)[0]
hits = client.search(
collection_name="docs",
query_vector=q_vec.tolist(),
limit=k,
with_payload=True,
)
return [{"score": h.score, "text": h.payload["text"], "source": h.payload["source"]}
for h in hits]
k=5— standard. Each chunk is ~400 tokens → 5 chunks = 2000 tokens of context, leaves plenty of room for the LLM's reasoning.- For higher quality, retrieve
k=20, then rerank with a cross-encoder (e.g.,BAAI/bge-reranker-base) to top-5. Cross-encoders are 10–1000× slower per pair but score much better because they jointly attend to (query, doc).
5. The grounding prompt
SYSTEM = """You are a helpful assistant. Answer the user's question using ONLY the
provided context. If the answer is not contained in the context, say:
"I don't know based on the provided documents."
Cite sources by their [source] tag."""
def build_prompt(query, hits):
context = "\n\n".join(f"[{h['source']}]\n{h['text']}" for h in hits)
return [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
]
Key design decisions:
- "ONLY the provided context" + "I don't know" clause — the two phrases that minimize hallucination most. Without the explicit "I don't know" out, the model will confabulate when retrieval fails.
- Citations as
[source]inline — simple format that survives streaming. Don't try to ask for footnotes; the model loses track during long generations. - Context first, question last — LLMs attend most strongly to the start and end of a long prompt (the "lost in the middle" effect, Liu et al. 2023). Question last keeps it salient.
6. FastAPI + SSE streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
llm = OpenAI(base_url="http://localhost:8001/v1", api_key="local") # vLLM endpoint
@app.post("/chat")
def chat(payload: dict):
query = payload["query"]
hits = retrieve(query, k=5)
msgs = build_prompt(query, hits)
def event_stream():
# First, emit the citations as a JSON event
yield f"event: citations\ndata: {json.dumps([h['source'] for h in hits])}\n\n"
# Then stream tokens
stream = llm.chat.completions.create(model="local", messages=msgs, stream=True)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if delta:
yield f"data: {json.dumps({'token': delta})}\n\n"
yield "event: done\ndata: {}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
Why SSE not WebSockets:
- One-way (server → client) — matches the LLM streaming model.
- Plain HTTP — works through every proxy, browser, curl. WebSockets need special handling.
- Built-in reconnect via
Last-Event-ID(we don't use it here, but it's free). - Two-newline framing (
\n\n) is mandatory — missing it means events never flush.
Using the OpenAI SDK pointed at a local vLLM endpoint means you can swap to OpenAI/Anthropic/Together with one URL change.
7. Expected behavior
$ curl -N -X POST localhost:8000/chat -H 'content-type: application/json' \
-d '{"query":"what is FlashAttention?"}'
event: citations
data: ["docs/flashattn.md", "docs/transformers.md"]
data: {"token": "Flash"}
data: {"token": "Attention"}
data: {"token": " is"}
data: {"token": " an"}
...
event: done
data: {}
Sanity check: ask a question whose answer is NOT in your docs. The model should say "I don't know based on the provided documents." If it confabulates instead, the system prompt is too weak — strengthen the "ONLY" clause.
8. Diagnostic methodology
When RAG "isn't working", systematically isolate the failing stage:
| Failure mode | Diagnostic | Fix |
|---|---|---|
| Wrong chunks retrieved | Print hits; are the right chunks even in the top-20? | Tune chunking strategy; try larger chunks or hybrid search. |
| Right chunks retrieved but model ignores | Print the full prompt; is the context actually included? | Strengthen system prompt; reduce context to top-3. |
| Right context but model hallucinates | Reduce context to a single chunk that contains the answer | If still hallucinates, the model is too small / weak. |
| Empty results | Did wait=True complete? Does collection exist? | Check Qdrant /collections endpoint. |
| Slow retrieval | Profile client.search | Tune HNSW ef parameter; switch to GPU index. |
9. Common pitfalls
- Forgetting the BGE query prefix — silent 5–10% recall loss.
- Not normalizing embeddings — cosine similarity vs dot product mismatch.
recreate_collectionon every startup — wipes your data. Usecreate_collection(idempotent get-or-create).- Streaming without
\n\nbetween events — client never sees data. - Putting the question first in the prompt — "lost in the middle" effect.
- No "I don't know" clause — model hallucinates when retrieval fails.
- Same embedding model for query and doc, no instruction prefix — only matters for instruction-tuned embedders (BGE, GTE, E5). Plain SBERT models don't need it.
10. Stretch exercises
- Add hybrid search: combine dense (Qdrant) with sparse (BM25 via
rank_bm25). Reciprocal rank fusion to combine. ~10–20% recall improvement on heterogeneous corpora. - Add a reranker: retrieve top-20, rerank with
BAAI/bge-reranker-baseto top-5. ~5–15% precision improvement. - Add query rewriting: use the LLM to rewrite the user query before retrieval (HyDE: generate a hypothetical answer, embed that, retrieve). Big help on conversational queries.
- Add metadata filtering: pass
query_filter=Filter(must=[FieldCondition(key="date", range=...)])to scope by recency/source. - Multi-hop retrieval: retrieve, ask LLM to identify gaps, retrieve again with new query. Foundation for agentic RAG.
- Eval with RAGAS: faithfulness, context-precision, context-recall, answer-relevancy.
- Replace Qdrant with FAISS for in-process retrieval (no external service); compare latency.
11. What this lab proves about you
You can ship a production RAG service with proper streaming, grounding prompts, and citation handling. You can debug retrieval failures by isolating each stage. You know which knobs to turn for which problem (chunk size for granularity, k for recall, reranking for precision, query rewriting for ambiguity). Phase-7 milestone — and the most common interview project for LLM Application Engineer roles.