06 — Pretraining Data Pipeline (10TB → Tokens)
Roles: Pretraining Data Engineer · Research Engineer Pretraining
1. Requirements
- Process 10s of TB raw web (CommonCrawl) → tokenized training shards
- Reproducible & auditable (every token traceable to a source URL)
- Deduped at scale (URL, exact, near-dup)
- Quality-filtered; PII-scrubbed; configurable per-source weights
- Resumable; idempotent
2. Stages
[Raw WARC/WET shards on S3]
│
▼
[Stage 1] Parse + extract (trafilatura/justext for HTML, or use WET)
│
▼
[Stage 2] URL dedup (Bloom filter / RocksDB)
│
▼
[Stage 3] Language ID (fasttext lid.176; keep en + others by quota)
│
▼
[Stage 4] Quality filters
- Gopher rules (length, mean word len, symbol ratio, repetition)
- Classifier (FastText: positive=Wikipedia/books, negative=random web)
│
▼
[Stage 5] PII scrub (presidio + regex; emails, phones, SSN)
│
▼
[Stage 6] Near-dup (MinHash LSH @ Jaccard 0.8; SuffixArray for exact spans)
│
▼
[Stage 7] Toxicity / safety filter (configurable threshold)
│
▼
[Stage 8] Tokenize (your custom BPE) → uint16/uint32 .bin shards
│
▼
[Stage 9] Mix + interleave with weights (web 60%, code 20%, books 10%, math 10%)
│
▼
[Final shards on S3, manifest.json with hashes + counts + lineage]
3. Deep Dives
3.1 Distributed Execution
- Spark / Ray / Dask on a cluster (1000s of vCPU)
- Shard-parallel: each task processes ≤1 GB
- Idempotent: writes go to
out/{stage}/{shard_id}.parquet; restart skips existing
3.2 MinHash LSH at Scale
- 128 perms, threshold 0.8
- Group docs by band; only compare within a bucket
- For 1B docs: cluster MinHash with 1024 bands → linear pass possible
- Output: keep one doc per cluster (longest, or earliest crawl date)
3.3 Data-Mix Tuning
- Ablation runs (small models, fixed compute) sweeping mix weights
- DSIR / DoReMi for principled mix search
- Final mix is compute-optimal at the target model size, not the proxy
3.4 PII & Safety
- Scrub before storage, not just before training
- Audit: log per-stage drop counts; alert on anomalies
- Honor takedown requests: source URL → shard ID lookup; rebuild affected shards
3.5 Reproducibility
- Each shard's manifest: stage version + config hash + input shard ID + count in/out
- Lineage graph queryable (DataHub / OpenMetadata)
- Re-running with same configs deterministically reproduces output
4. Observability
- Per-stage: docs in/out, MB in/out, drop reasons (categorized)
- Per-shard: language histogram, length distribution, sample 10 docs to S3 for manual spot-check
5. Tradeoffs
| Choice | Alt | When |
|---|---|---|
| Spark | Ray Datasets | Spark for stable batch; Ray when mixing GPU stages |
| MinHash LSH | SimHash | MinHash for general dedup; SimHash for short docs |
| Custom tokenizer | GPT-2 BPE | Custom when target language coverage matters (e.g., code, math, multilingual) |
| Filter early | Filter late | Always filter early — saves all downstream compute |