Lab 02 — Pretraining Data Pipeline (Solution Walkthrough)

Phase: 10 — Distributed Training & Data | Difficulty: ⭐⭐⭐⭐⭐ | Time: 6–10 hours

Concept primer: ../HITCHHIKERS-GUIDE.md §Data scaling, §Quality filters, §Deduplication.

Run

pip install -r requirements.txt
wget -O sample.warc.wet.gz \
  https://data.commoncrawl.org/crawl-data/CC-MAIN-2024-22/segments/.../wet/CC-MAIN-...warc.wet.gz
python solution.py --input ./sample.warc.wet.gz --out ./tokens

0. The mission

A scaled-down replica of the FineWeb / RefinedWeb / The Pile pipelines that produce the trillion-token datasets used to train Llama, GPT, Claude. The bigger the dataset, the more rigorous the cleaning needs to be — noise scales with size, but signal doesn't.

Five stages, each a real engineering surface area:

  1. Parse WET — extract plain-text from CommonCrawl WET archives.
  2. Language ID — keep English only (fasttext lid.176).
  3. Quality filter — Gopher-style heuristics (length, symbol ratio, repetition).
  4. MinHash LSH dedup — near-duplicate removal at 0.8 Jaccard.
  5. Tokenize + shard — tiktoken GPT-2 BPE → packed uint16 .bin files.

The output .bin files plug directly into the training loop from Phase 5's nanoGPT.


1. Stage 1 — Parsing WET

from warcio.archiveiterator import ArchiveIterator

def iter_wet_records(path: Path):
    with gzip.open(path, "rb") as f:
        for rec in ArchiveIterator(f):
            if rec.rec_type != "conversion":
                continue
            url = rec.rec_headers.get_header("WARC-Target-URI")
            text = rec.content_stream().read().decode("utf-8", errors="replace")
            yield {"url": url, "text": text}
  • WARC = Web ARChive format. Three sub-types: request, response, conversion. WET files contain only conversion (HTML stripped to text). WARC files contain raw HTML; warcio can extract conversions on the fly.
  • errors="replace" — the web is full of malformed UTF-8. Don't crash; emit U+FFFD.
  • Streaming is essential — a single WET shard is ~1 GB compressed; we never load it all into RAM.

2. Stage 2 — Language ID with fastText

import fasttext
# wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin
lid = fasttext.load_model("lid.176.bin")

def detect_lang(text: str) -> tuple[str, float]:
    sample = text.replace("\n", " ")[:1000]
    labels, probs = lid.predict(sample, k=1)
    return labels[0].replace("__label__", ""), float(probs[0])

# Keep English with prob >= 0.65
if lang != "en" or prob < 0.65:
    continue

Why fasttext lid.176:

  • 176 languages, ~80 ms/doc on CPU — fast enough for trillions of docs across many workers.
  • Threshold 0.65 is the FineWeb default. Higher threshold (0.8) drops more borderline docs (multilingual pages); lower (0.5) admits noise.
  • Replace newlines so we're predicting on a flat sample, not the structured first 1000 chars (which might be all menu links).

Replace fasttext with langdetect if you want pure-Python (10× slower, similar quality).


3. Stage 3 — Gopher quality filters

From DeepMind's Gopher paper. Drop documents that fail any of these:

def passes_gopher(text: str) -> tuple[bool, str]:
    words = text.split()
    n_words = len(words)
    if n_words < 50 or n_words > 100_000:
        return False, "length"

    mean_word_len = np.mean([len(w) for w in words])
    if mean_word_len < 3 or mean_word_len > 10:
        return False, "word_len"

    symbol_ratio = sum(1 for c in text if c in "#…") / max(1, len(text))
    if symbol_ratio > 0.10:
        return False, "symbol_ratio"

    bullet_lines = sum(1 for line in text.splitlines() if line.lstrip().startswith(("•", "-", "*")))
    if bullet_lines / max(1, len(text.splitlines())) > 0.90:
        return False, "too_bulleted"

    ellipsis_lines = sum(1 for line in text.splitlines() if line.rstrip().endswith("…"))
    if ellipsis_lines / max(1, len(text.splitlines())) > 0.30:
        return False, "too_truncated"

    # Top-2grams + top-3grams repetition (Gopher 2.4)
    if top_ngram_fraction(words, 2) > 0.20:
        return False, "repeat_2gram"
    if top_ngram_fraction(words, 3) > 0.18:
        return False, "repeat_3gram"

    return True, "ok"

What each filter catches in practice:

FilterTargetsExample
LengthStubs ("page not found") and giant SQL dumps<50 or >100k words
Mean word lengthCode listings, hex dumps, URL listsmean < 3 or > 10 chars
Symbol ratioASCII art, forum signatures, emoji walls>10% special chars
Bullet linesRecipe sites, link directories>90% lines start with bullet
Ellipsis linesTruncated SEO content ("...read more")>30% lines end with
N-gram repetitionTemplated content, spamtop 2-gram > 20% of all 2-grams

Gopher's full filter list is much longer; this lab implements the most impactful ~7. Together they discard ~30% of WET documents — the bottom of the quality distribution.


4. Stage 4 — MinHash LSH deduplication

Near-duplicates are the biggest unique threat to LLM training: they cause memorization, inflate apparent dataset size, and waste compute.

4.1 Why MinHash + LSH?

Exact dedup (hash the whole doc) misses near-duplicates: same article reposted with a different header. Pairwise Jaccard is O(N²) — infeasible at billions of docs. MinHash + LSH gives sub-linear search at controllable recall.

The trick:

  • Each doc → set of shingles (e.g., 5-word windows).
  • MinHash signature: K independent hash functions; for each, take the min hash value across the shingles. Two docs' MinHash signatures collide on a hash with probability equal to their Jaccard similarity.
  • LSH bands the signature: any two docs sharing a band of r consecutive hashes are "candidate similar". With b bands of r rows each, collision probability is approximately $1 - (1 - s^r)^b$, which has a steep S-curve around your target threshold.

For target threshold $s = 0.8$, num_perm=128 gives a good S-curve.

4.2 Implementation

from datasketch import MinHash, MinHashLSH

lsh = MinHashLSH(threshold=0.8, num_perm=128)
seen = []

def shingles(text: str, k=5):
    words = text.split()
    return {" ".join(words[i:i+k]) for i in range(len(words) - k + 1)}

for doc_id, text in enumerate(docs):
    m = MinHash(num_perm=128)
    for sh in shingles(text):
        m.update(sh.encode("utf-8"))
    if lsh.query(m):
        continue                        # near-duplicate — skip
    lsh.insert(str(doc_id), m)
    seen.append(text)
  • 5-word shingles — standard. Smaller (3) is too noisy; larger (10) misses paraphrases.
  • num_perm=128 — the right balance for 0.8 threshold. More perms = sharper S-curve but more memory per doc.
  • lsh.query(m) returns the candidate matches; if non-empty, we have a near-duplicate.

For billion-scale dedup, replace in-memory MinHashLSH with a Spark or DuckDB-backed implementation. The algorithm is identical.


5. Stage 5 — Tokenization and sharding

import tiktoken
import numpy as np

enc = tiktoken.get_encoding("gpt2")
shard_tokens = 100_000_000               # ~200 MB per shard at uint16
buf = []
shard_idx = 0

for text in cleaned_docs:
    ids = enc.encode_ordinary(text)
    ids.append(enc.eot_token)             # 👈 EOT between docs
    buf.extend(ids)
    while len(buf) >= shard_tokens:
        arr = np.array(buf[:shard_tokens], dtype=np.uint16)
        arr.tofile(out_dir / f"train_{shard_idx:05d}.bin")
        buf = buf[shard_tokens:]
        shard_idx += 1
  • uint16 halves disk vs int32. Required because 50257 < 65536.
  • EOT between docs so the model knows where one document ends. Without it, training can pick up a sequence spanning two unrelated docs and learn spurious correlations.
  • 100M tokens per shard is a typical size: small enough to memory-map quickly, large enough that file overhead is negligible.

6. The end-to-end loop

stats = Counter()
for rec in iter_wet_records(args.input):
    stats["in"] += 1
    lang, prob = detect_lang(rec["text"])
    if lang != "en" or prob < 0.65:
        stats["drop_lang"] += 1
        continue
    ok, reason = passes_gopher(rec["text"])
    if not ok:
        stats[f"drop_{reason}"] += 1
        continue
    if is_near_duplicate(rec["text"]):
        stats["drop_dup"] += 1
        continue
    write_to_shard(rec["text"])
    stats["keep"] += 1

print(stats)

The stats dict is the single most important deliverable — it tells you what fraction was filtered at each stage. Typical numbers on raw CommonCrawl WET:

in           = 1,000,000
drop_lang    =   400,000  (40% non-English)
drop_length  =    80,000  (8% too short / too long)
drop_symbol  =    50,000
drop_repeat  =    40,000
drop_dup     =   200,000  (20% near-duplicates)
keep         =   230,000  (23% retention)

FineWeb-Edu's retention rate is ~10% (much stricter; uses an LLM-based quality classifier). Pile retention is ~50% (lighter filtering).


7. Expected output

[parse]   docs=1.0M
[langid]  kept=600k  (60%)
[gopher]  kept=430k  (43%)
[dedup]   kept=230k  (23%)
[tokens]  total=180M  shards=2  (train_00000.bin, train_00001.bin)

Load a shard back to verify:

arr = np.memmap("./tokens/train_00000.bin", dtype=np.uint16, mode="r")
print(arr.shape)              # (100000000,)
print(enc.decode(arr[:200].tolist()))

8. The data quality → model quality chain

Massively-scaled empirical work (FineWeb paper, 2024) shows:

  • Filter strictness pays off hugely — a 1.5T-token strictly-filtered dataset (FineWeb-Edu) trains a better 7B model than a 6T-token loosely-filtered one (raw CommonCrawl).
  • Dedup matters more than filtering — The Pile's 30% deduplication had the biggest single quality jump.
  • Domain mixture — web alone is suboptimal. Add code, books, math, papers in tuned ratios (DoReMi auto-tunes them).

The pipeline you built is the prerequisite for any of those investigations.


9. Common pitfalls

  1. Loading the WET file into memory — 1 GB compressed = 5+ GB decompressed. Always stream.
  2. open() instead of gzip.open() — silent garbled output.
  3. Detecting language on the first 50 chars — dominated by menu HTML; use 500–1000 chars.
  4. Forgetting to encode shingles to bytes before MinHash — type error or wrong hashes.
  5. No EOT between docs — model learns spurious cross-doc patterns.
  6. int32 shards — wastes 2× disk. Always uint16.
  7. Single-pass dedup at billion-scale — need distributed: Spark, Ray, DuckDB. The algorithm is identical, just sharded.
  8. Filtering after dedup — wastes work on docs that were already destined for the trash. Filter first; dedup what survives.

10. Stretch exercises

  • Add a quality classifier: train a fasttext model on (high-quality, low-quality) labeled examples (e.g., Wikipedia vs random forum posts). Score every doc; drop bottom 30%.
  • Implement DoReMi-style mixing: train two small models on different domain mixes; use their loss differences to set the optimal mix.
  • Decontaminate against your eval sets: drop any doc whose 13-gram overlaps with HellaSwag/MMLU/etc.
  • Distributed dedup: replace datasketch.MinHashLSH with a Ray/Spark version that scales to billions.
  • PII redaction: regex out emails, phone numbers, SSNs.
  • Toxicity filter: use perspective API or a small classifier; drop above-threshold docs.
  • Compute compression ratio: tokens per doc, tokens per byte. Compare to FineWeb-Edu's ~0.20 tokens/byte.
  • Run on 10 GB: confirm your throughput and memory profile scale linearly.

11. What this lab proves about you

You can build the data infrastructure that pretraining requires. You understand the failure modes (web noise, near-duplicates, language drift) and the techniques to handle each. You can quote retention rates and explain why FineWeb-Edu beats raw CommonCrawl despite being 4× smaller. This is the bar for data engineering for foundation models roles — a niche but high-impact specialty at every frontier lab.