End-to-End Scalable CV Pipeline Design

Reference architecture for production CV systems. Read this before any system design interview.


Generic CV Pipeline Stages

┌─────────┐    ┌──────────┐    ┌──────────────┐    ┌──────────┐    ┌─────────┐
│  Data   │    │  Pre-    │    │   Inference   │    │  Post-   │    │ Storage │
│ Ingest  │───▶│ process  │───▶│   (GPU/TPU)  │───▶│ process  │───▶│ & Serve │
│         │    │          │    │              │    │          │    │         │
└─────────┘    └──────────┘    └──────────────┘    └──────────┘    └─────────┘
   RTSP/S3       Resize/          TensorRT/           NMS/            DB/S3/
   Kafka/        Normalize        PyTorch/             Track/         Kafka/
   REST          Augment          ONNX                 Filter         Redis

Data Ingestion Patterns

Push vs Pull

PatternWhen to Use
Pull (worker polls queue)Batch processing, variable load
Push (cameras push to endpoint)Low-latency, event-driven
Stream (Kafka/Kinesis)High-throughput, durable, replayable

Protocol Choices

  • RTSP: cameras → edge decoder → Kafka (standard for IP cameras)
  • HTTP multipart: browser webcams, mobile apps
  • gRPC streaming: low-latency bidirectional (good for robots, edge devices)
  • WebRTC: browser real-time (if you need sub-second latency to browser)

Preprocessing Pipeline

CPU-side (before GPU)

# Maximize CPU preprocessing throughput
from concurrent.futures import ThreadPoolExecutor

def preprocess_worker(raw_bytes: bytes) -> np.ndarray:
    img = cv2.imdecode(np.frombuffer(raw_bytes, np.uint8), cv2.IMREAD_COLOR)
    img = cv2.resize(img, (640, 640))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img.astype(np.float32) / 255.0

# Use multiple threads for I/O + decode
with ThreadPoolExecutor(max_workers=8) as pool:
    frames = list(pool.map(preprocess_worker, raw_batch))

GPU-side (CUDA preprocessing)

For high throughput, move preprocessing to GPU with TorchVision or DALI:

# NVIDIA DALI — GPU-accelerated data pipeline
from nvidia.dali import pipeline_def, fn

@pipeline_def(batch_size=64, num_threads=4, device_id=0)
def video_pipeline(file_list):
    jpegs, labels = fn.readers.file(file_root=file_list)
    images = fn.decoders.image(jpegs, device='mixed')  # decode on GPU
    images = fn.resize(images, resize_shorter=640)
    images = fn.crop_mirror_normalize(images,
        mean=[0.485*255, 0.456*255, 0.406*255],
        std=[0.229*255, 0.224*255, 0.225*255],
        output_layout='CHW')
    return images, labels

DALI can eliminate the CPU preprocessing bottleneck entirely for high-FPS pipelines.


Post-processing

Non-Maximum Suppression (NMS)

After object detection, many overlapping boxes exist. NMS selects the best:

def nms(boxes: np.ndarray, scores: np.ndarray, 
        iou_threshold: float = 0.45) -> list[int]:
    """
    Classic NMS (greedy). Operates in O(N²) but N is small after conf filtering.
    
    boxes: (N, 4) in [x1, y1, x2, y2]
    scores: (N,)
    Returns: list of kept indices
    """
    x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3]
    areas = (x2 - x1) * (y2 - y1)
    order = scores.argsort()[::-1]  # highest score first
    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        # Compute IoU with all remaining boxes
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])
        inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
        iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-8)
        order = order[1:][iou <= iou_threshold]
    return keep

Soft-NMS: Instead of discarding boxes with IoU > threshold, reduce their score by a Gaussian function of IoU. Better for crowded scenes (pedestrian detection).

WBF (Weighted Boxes Fusion): Ensemble NMS for combining predictions from multiple models — weights boxes by confidence and averages them. Better than voting-based NMS for model ensembles.

Multi-Object Tracking

After detection, link boxes across frames:

Frame t:   [box_A, box_B, box_C]
Frame t+1: [box_D, box_E, box_F]

Assignment problem: which detections in t+1 correspond to t?
Solution: Hungarian algorithm on IoU cost matrix

SORT (Simple Online and Realtime Tracking):

  1. Predict box positions using Kalman filter
  2. Match predictions to detections via IoU + Hungarian assignment
  3. Unmatched detections → new tracks; unmatched tracks → deleted after K frames

DeepSORT: Adds Re-ID embedding (appearance features) to SORT's IoU matching. Reduces ID switches in crowded scenes. The Re-ID model runs as a separate lightweight CNN.


Storage Architecture

Hot / Warm / Cold Tiering

Hot  (Redis):    Current detections, live dashboard data          TTL: 5 minutes
Warm (PostgreSQL): Event records, track histories, aggregates     TTL: 90 days  
Cold (S3/GCS):   Raw video clips, model outputs, audit logs       TTL: 7 years

Database Schema for CV Events

CREATE TABLE detections (
    id          UUID PRIMARY KEY,
    camera_id   VARCHAR(50) NOT NULL,
    timestamp   TIMESTAMPTZ NOT NULL,
    class_id    SMALLINT NOT NULL,
    confidence  FLOAT4 NOT NULL,
    bbox        FLOAT4[4] NOT NULL,  -- [x1,y1,x2,y2] normalized
    track_id    INTEGER,             -- NULL if no tracking
    clip_s3_key VARCHAR(255)         -- link to video clip
) PARTITION BY RANGE (timestamp);   -- partition by day for query performance

-- Index for common queries
CREATE INDEX ON detections (camera_id, timestamp DESC);
CREATE INDEX ON detections (class_id, timestamp DESC);

Security Considerations

  • Camera streams: Authenticate RTSP with digest auth or mTLS
  • API: Rate limiting per API key; validate input dimensions before GPU (prevent resource exhaustion)
  • Model: Adversarial robustness — test against common perturbations
  • PII: GDPR compliance — blur faces before storing video if cameras capture public areas
  • Model exfiltration: Don't expose raw model weights; use encrypted containers or TEE (Trusted Execution Environments) for sensitive models
# Input validation (prevent resource exhaustion attacks)
def validate_image_input(img: np.ndarray) -> None:
    if img.ndim not in (2, 3):
        raise ValueError("Image must be 2D or 3D array")
    if img.shape[0] > 4096 or img.shape[1] > 4096:
        raise ValueError("Image too large (max 4096×4096)")
    if img.dtype not in (np.uint8, np.float32):
        raise ValueError("Unsupported dtype")