Lab 02 — Production RAG Pipeline (Solution Walkthrough)

Phase: 7 — Retrieval, RAG & Agents | Difficulty: ⭐⭐⭐⭐☆ | Time: 4–6 hours

Concept primer: ../HITCHHIKERS-GUIDE.md §Embeddings, §Vector indices, §RAG.

Run

pip install -r requirements.txt
docker run -d -p 6333:6333 qdrant/qdrant
python solution.py --ingest ./docs        # ingest a folder of .md / .txt
python solution.py --serve                # start API on :8000

curl -N -X POST localhost:8000/chat -H 'content-type: application/json' \
     -d '{"query":"what is FlashAttention?"}'

0. The mission

A complete RAG system in ~250 lines: chunk → embed → ingest → retrieve → stream. Every piece is what you'd ship in production:

  • Token-aware chunking with overlap.
  • BGE-small as the embedding model (best quality at 384-dim).
  • Qdrant with HNSW + cosine similarity.
  • FastAPI with Server-Sent Events for token streaming.
  • Prompt template that actually grounds the model in retrieved context.

1. Chunking — token-aware with overlap

import tiktoken
enc = tiktoken.get_encoding("cl100k_base")     # GPT-4 / OpenAI tokenizer

def chunk_text(text: str, chunk_tokens=400, overlap=80) -> list[str]:
    ids = enc.encode(text)
    chunks = []
    i = 0
    while i < len(ids):
        window = ids[i : i + chunk_tokens]
        chunks.append(enc.decode(window))
        i += chunk_tokens - overlap
    return chunks

Why these numbers:

  • chunk_tokens=400 — balances retrieval precision and information density. Too small (≤100): single sentences, too narrow to be useful answers. Too large (≥1000): mixes multiple topics, dilutes embedding signal.
  • overlap=80 (~20%) — prevents critical info that straddles a chunk boundary from being lost. Adds ~20% storage cost; eliminates a whole class of "missing answer" failures.
  • Token-aware not character-aware — ensures chunks fit cleanly in embedding-model context (BGE-small's max is 512 tokens).

Production tweaks: split on paragraph/heading boundaries first, then token-chunk inside each section.


2. Embeddings — BGE-small with normalization

from sentence_transformers import SentenceTransformer
emb_model = SentenceTransformer("BAAI/bge-small-en-v1.5", device="cuda")

vecs = emb_model.encode(
    chunks,
    normalize_embeddings=True,                # 👈 unit-norm → cosine = dot product
    batch_size=64,
    show_progress_bar=True,
)

Why BGE-small:

  • 384-dim, 33M params — fast on CPU, free on GPU.
  • Top-3 on MTEB at this size class. Larger BGE-base (768) is ~5% better; BGE-large (1024) is ~3% beyond that.
  • normalize_embeddings=True — unit-norm vectors mean cosine similarity reduces to a dot product, which Qdrant computes faster.

For query encoding, BGE expects an instruction prefix:

QUERY_PREFIX = "Represent this sentence for searching relevant passages: "
q_vec = emb_model.encode([QUERY_PREFIX + query], normalize_embeddings=True)[0]

Missing this prefix is a 5–10% silent quality loss — catches everyone the first time.


3. Qdrant ingestion

from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance, PointStruct

client = QdrantClient(url="http://localhost:6333")
client.recreate_collection(
    collection_name="docs",
    vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)

points = [
    PointStruct(
        id=str(uuid.uuid4()),
        vector=v.tolist(),
        payload={"text": chunk, "source": str(path)},
    )
    for v, chunk in zip(vecs, chunks)
]
client.upsert("docs", points=points, wait=True)
  • Distance.COSINE matches our normalized vectors. Could also use DOT since vectors are already unit-norm — same result, marginally faster.
  • Qdrant builds an HNSW index by default: ~99% recall at 10× the speed of brute force at 1M+ vectors.
  • wait=True blocks until indexed — essential before issuing queries (otherwise you get empty results from a still-building index).
  • Payload stores the original text + source so we can return citations without a second lookup.

4. Retrieval

def retrieve(query: str, k=5) -> list[dict]:
    q_vec = emb_model.encode([QUERY_PREFIX + query], normalize_embeddings=True)[0]
    hits = client.search(
        collection_name="docs",
        query_vector=q_vec.tolist(),
        limit=k,
        with_payload=True,
    )
    return [{"score": h.score, "text": h.payload["text"], "source": h.payload["source"]}
            for h in hits]
  • k=5 — standard. Each chunk is ~400 tokens → 5 chunks = 2000 tokens of context, leaves plenty of room for the LLM's reasoning.
  • For higher quality, retrieve k=20, then rerank with a cross-encoder (e.g., BAAI/bge-reranker-base) to top-5. Cross-encoders are 10–1000× slower per pair but score much better because they jointly attend to (query, doc).

5. The grounding prompt

SYSTEM = """You are a helpful assistant. Answer the user's question using ONLY the
provided context. If the answer is not contained in the context, say:
"I don't know based on the provided documents."
Cite sources by their [source] tag."""

def build_prompt(query, hits):
    context = "\n\n".join(f"[{h['source']}]\n{h['text']}" for h in hits)
    return [
        {"role": "system", "content": SYSTEM},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
    ]

Key design decisions:

  • "ONLY the provided context" + "I don't know" clause — the two phrases that minimize hallucination most. Without the explicit "I don't know" out, the model will confabulate when retrieval fails.
  • Citations as [source] inline — simple format that survives streaming. Don't try to ask for footnotes; the model loses track during long generations.
  • Context first, question last — LLMs attend most strongly to the start and end of a long prompt (the "lost in the middle" effect, Liu et al. 2023). Question last keeps it salient.

6. FastAPI + SSE streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
llm = OpenAI(base_url="http://localhost:8001/v1", api_key="local")  # vLLM endpoint

@app.post("/chat")
def chat(payload: dict):
    query = payload["query"]
    hits = retrieve(query, k=5)
    msgs = build_prompt(query, hits)

    def event_stream():
        # First, emit the citations as a JSON event
        yield f"event: citations\ndata: {json.dumps([h['source'] for h in hits])}\n\n"
        # Then stream tokens
        stream = llm.chat.completions.create(model="local", messages=msgs, stream=True)
        for chunk in stream:
            delta = chunk.choices[0].delta.content or ""
            if delta:
                yield f"data: {json.dumps({'token': delta})}\n\n"
        yield "event: done\ndata: {}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

Why SSE not WebSockets:

  • One-way (server → client) — matches the LLM streaming model.
  • Plain HTTP — works through every proxy, browser, curl. WebSockets need special handling.
  • Built-in reconnect via Last-Event-ID (we don't use it here, but it's free).
  • Two-newline framing (\n\n) is mandatory — missing it means events never flush.

Using the OpenAI SDK pointed at a local vLLM endpoint means you can swap to OpenAI/Anthropic/Together with one URL change.


7. Expected behavior

$ curl -N -X POST localhost:8000/chat -H 'content-type: application/json' \
       -d '{"query":"what is FlashAttention?"}'

event: citations
data: ["docs/flashattn.md", "docs/transformers.md"]

data: {"token": "Flash"}
data: {"token": "Attention"}
data: {"token": " is"}
data: {"token": " an"}
...
event: done
data: {}

Sanity check: ask a question whose answer is NOT in your docs. The model should say "I don't know based on the provided documents." If it confabulates instead, the system prompt is too weak — strengthen the "ONLY" clause.


8. Diagnostic methodology

When RAG "isn't working", systematically isolate the failing stage:

Failure modeDiagnosticFix
Wrong chunks retrievedPrint hits; are the right chunks even in the top-20?Tune chunking strategy; try larger chunks or hybrid search.
Right chunks retrieved but model ignoresPrint the full prompt; is the context actually included?Strengthen system prompt; reduce context to top-3.
Right context but model hallucinatesReduce context to a single chunk that contains the answerIf still hallucinates, the model is too small / weak.
Empty resultsDid wait=True complete? Does collection exist?Check Qdrant /collections endpoint.
Slow retrievalProfile client.searchTune HNSW ef parameter; switch to GPU index.

9. Common pitfalls

  1. Forgetting the BGE query prefix — silent 5–10% recall loss.
  2. Not normalizing embeddings — cosine similarity vs dot product mismatch.
  3. recreate_collection on every startup — wipes your data. Use create_collection (idempotent get-or-create).
  4. Streaming without \n\n between events — client never sees data.
  5. Putting the question first in the prompt — "lost in the middle" effect.
  6. No "I don't know" clause — model hallucinates when retrieval fails.
  7. Same embedding model for query and doc, no instruction prefix — only matters for instruction-tuned embedders (BGE, GTE, E5). Plain SBERT models don't need it.

10. Stretch exercises

  • Add hybrid search: combine dense (Qdrant) with sparse (BM25 via rank_bm25). Reciprocal rank fusion to combine. ~10–20% recall improvement on heterogeneous corpora.
  • Add a reranker: retrieve top-20, rerank with BAAI/bge-reranker-base to top-5. ~5–15% precision improvement.
  • Add query rewriting: use the LLM to rewrite the user query before retrieval (HyDE: generate a hypothetical answer, embed that, retrieve). Big help on conversational queries.
  • Add metadata filtering: pass query_filter=Filter(must=[FieldCondition(key="date", range=...)]) to scope by recency/source.
  • Multi-hop retrieval: retrieve, ask LLM to identify gaps, retrieve again with new query. Foundation for agentic RAG.
  • Eval with RAGAS: faithfulness, context-precision, context-recall, answer-relevancy.
  • Replace Qdrant with FAISS for in-process retrieval (no external service); compare latency.

11. What this lab proves about you

You can ship a production RAG service with proper streaming, grounding prompts, and citation handling. You can debug retrieval failures by isolating each stage. You know which knobs to turn for which problem (chunk size for granularity, k for recall, reranking for precision, query rewriting for ambiguity). Phase-7 milestone — and the most common interview project for LLM Application Engineer roles.