End-to-End Scalable CV Pipeline Design
Reference architecture for production CV systems. Read this before any system design interview.
Generic CV Pipeline Stages
┌─────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌─────────┐
│ Data │ │ Pre- │ │ Inference │ │ Post- │ │ Storage │
│ Ingest │───▶│ process │───▶│ (GPU/TPU) │───▶│ process │───▶│ & Serve │
│ │ │ │ │ │ │ │ │ │
└─────────┘ └──────────┘ └──────────────┘ └──────────┘ └─────────┘
RTSP/S3 Resize/ TensorRT/ NMS/ DB/S3/
Kafka/ Normalize PyTorch/ Track/ Kafka/
REST Augment ONNX Filter Redis
Data Ingestion Patterns
Push vs Pull
| Pattern | When to Use |
|---|---|
| Pull (worker polls queue) | Batch processing, variable load |
| Push (cameras push to endpoint) | Low-latency, event-driven |
| Stream (Kafka/Kinesis) | High-throughput, durable, replayable |
Protocol Choices
- RTSP: cameras → edge decoder → Kafka (standard for IP cameras)
- HTTP multipart: browser webcams, mobile apps
- gRPC streaming: low-latency bidirectional (good for robots, edge devices)
- WebRTC: browser real-time (if you need sub-second latency to browser)
Preprocessing Pipeline
CPU-side (before GPU)
# Maximize CPU preprocessing throughput
from concurrent.futures import ThreadPoolExecutor
def preprocess_worker(raw_bytes: bytes) -> np.ndarray:
img = cv2.imdecode(np.frombuffer(raw_bytes, np.uint8), cv2.IMREAD_COLOR)
img = cv2.resize(img, (640, 640))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
return img.astype(np.float32) / 255.0
# Use multiple threads for I/O + decode
with ThreadPoolExecutor(max_workers=8) as pool:
frames = list(pool.map(preprocess_worker, raw_batch))
GPU-side (CUDA preprocessing)
For high throughput, move preprocessing to GPU with TorchVision or DALI:
# NVIDIA DALI — GPU-accelerated data pipeline
from nvidia.dali import pipeline_def, fn
@pipeline_def(batch_size=64, num_threads=4, device_id=0)
def video_pipeline(file_list):
jpegs, labels = fn.readers.file(file_root=file_list)
images = fn.decoders.image(jpegs, device='mixed') # decode on GPU
images = fn.resize(images, resize_shorter=640)
images = fn.crop_mirror_normalize(images,
mean=[0.485*255, 0.456*255, 0.406*255],
std=[0.229*255, 0.224*255, 0.225*255],
output_layout='CHW')
return images, labels
DALI can eliminate the CPU preprocessing bottleneck entirely for high-FPS pipelines.
Post-processing
Non-Maximum Suppression (NMS)
After object detection, many overlapping boxes exist. NMS selects the best:
def nms(boxes: np.ndarray, scores: np.ndarray,
iou_threshold: float = 0.45) -> list[int]:
"""
Classic NMS (greedy). Operates in O(N²) but N is small after conf filtering.
boxes: (N, 4) in [x1, y1, x2, y2]
scores: (N,)
Returns: list of kept indices
"""
x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3]
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1] # highest score first
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
# Compute IoU with all remaining boxes
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-8)
order = order[1:][iou <= iou_threshold]
return keep
Soft-NMS: Instead of discarding boxes with IoU > threshold, reduce their score by a Gaussian function of IoU. Better for crowded scenes (pedestrian detection).
WBF (Weighted Boxes Fusion): Ensemble NMS for combining predictions from multiple models — weights boxes by confidence and averages them. Better than voting-based NMS for model ensembles.
Multi-Object Tracking
After detection, link boxes across frames:
Frame t: [box_A, box_B, box_C]
Frame t+1: [box_D, box_E, box_F]
Assignment problem: which detections in t+1 correspond to t?
Solution: Hungarian algorithm on IoU cost matrix
SORT (Simple Online and Realtime Tracking):
- Predict box positions using Kalman filter
- Match predictions to detections via IoU + Hungarian assignment
- Unmatched detections → new tracks; unmatched tracks → deleted after K frames
DeepSORT: Adds Re-ID embedding (appearance features) to SORT's IoU matching. Reduces ID switches in crowded scenes. The Re-ID model runs as a separate lightweight CNN.
Storage Architecture
Hot / Warm / Cold Tiering
Hot (Redis): Current detections, live dashboard data TTL: 5 minutes
Warm (PostgreSQL): Event records, track histories, aggregates TTL: 90 days
Cold (S3/GCS): Raw video clips, model outputs, audit logs TTL: 7 years
Database Schema for CV Events
CREATE TABLE detections (
id UUID PRIMARY KEY,
camera_id VARCHAR(50) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
class_id SMALLINT NOT NULL,
confidence FLOAT4 NOT NULL,
bbox FLOAT4[4] NOT NULL, -- [x1,y1,x2,y2] normalized
track_id INTEGER, -- NULL if no tracking
clip_s3_key VARCHAR(255) -- link to video clip
) PARTITION BY RANGE (timestamp); -- partition by day for query performance
-- Index for common queries
CREATE INDEX ON detections (camera_id, timestamp DESC);
CREATE INDEX ON detections (class_id, timestamp DESC);
Security Considerations
- Camera streams: Authenticate RTSP with digest auth or mTLS
- API: Rate limiting per API key; validate input dimensions before GPU (prevent resource exhaustion)
- Model: Adversarial robustness — test against common perturbations
- PII: GDPR compliance — blur faces before storing video if cameras capture public areas
- Model exfiltration: Don't expose raw model weights; use encrypted containers or TEE (Trusted Execution Environments) for sensitive models
# Input validation (prevent resource exhaustion attacks)
def validate_image_input(img: np.ndarray) -> None:
if img.ndim not in (2, 3):
raise ValueError("Image must be 2D or 3D array")
if img.shape[0] > 4096 or img.shape[1] > 4096:
raise ValueError("Image too large (max 4096×4096)")
if img.dtype not in (np.uint8, np.float32):
raise ValueError("Unsupported dtype")