---
name: Validation Fleet Deployment
overview: "Full deployment pipeline: Supabase schema migration (new worker_validators table + validation_status column on video_queue), production-ready worker with heartbeats/prefetch/graceful shutdown, Dockerfile with baked-in models, R2 bucket creation, and Vast.ai deploy script for 100 GPUs."
todos:
  - id: schema-migration
    content: Run SQL to add validation_status column to video_queue + create worker_validators table
    status: pending
  - id: worker-rewrite
    content: Rewrite validations/worker.py with registration, heartbeat loop, prefetch, graceful shutdown, dead video recovery
    status: pending
  - id: dockerfile
    content: Create Dockerfile.validation with CUDA base, baked-in models, push to Docker Hub
    status: pending
  - id: r2-bucket
    content: Create validation-results R2 bucket
    status: pending
  - id: deploy-script
    content: Write Vast.ai deploy script (search offers, create instances, env vars)
    status: pending
  - id: e2e-test
    content: "End-to-end test: run 1 worker container locally, process 5 videos, verify parquet + DB updates"
    status: pending
isProject: false
---

# Validation Fleet Deployment

## Current State

- Pipeline tested locally: 4 models, 11 segs/s, 2.7 GB VRAM, all working
- `video_queue` has 507,387 videos in `done` status — ready for validation
- No validation-specific columns or tables exist yet
- Worker in `[validations/worker.py](validations/worker.py)` has basic claim/process logic but no heartbeats, no registration, no prefetch, no graceful shutdown

## Architecture

```mermaid
flowchart TD
    subgraph supabase [Supabase]
        VQ["video_queue\n+ validation_status column"]
        WV["worker_validators\n(new table)"]
    end
    subgraph vastai [Vast.ai Fleet]
        W1["Worker 1\nRTX 3090"]
        W2["Worker 2\nRTX 3090"]
        WN["Worker N\n..."]
    end
    subgraph r2 [Cloudflare R2]
        SRC["transcribed bucket\n507K tars"]
        DST["validation-results bucket\n~10K parquet shards"]
    end
    
    W1 -->|"claim (FOR UPDATE SKIP LOCKED)"| VQ
    W2 -->|heartbeat 30s| WV
    W1 -->|download tar| SRC
    W1 -->|upload parquet shard| DST
    WN -->|"status: validating → validated"| VQ
```



## 1. Supabase Schema Migration

### Add `validation_status` column to `video_queue`

Keep `status = 'done'` untouched (transcription state). Add a separate column for validation lifecycle:

```sql
ALTER TABLE video_queue 
  ADD COLUMN validation_status TEXT NOT NULL DEFAULT 'pending';

CREATE INDEX idx_vq_validation_pending 
  ON video_queue (validation_status) 
  WHERE validation_status = 'pending' AND status = 'done';
```

States: `pending` → `validating` → `validated` / `validation_failed`

### Create `worker_validators` table

Mirrors the existing `workers` table pattern but tailored for validation metrics:

```sql
CREATE TABLE worker_validators (
  worker_id            TEXT PRIMARY KEY,
  status               TEXT NOT NULL DEFAULT 'online',
  gpu_type             TEXT,
  gpu_vram_gb          REAL,
  
  current_video_id     TEXT,
  videos_processed     INTEGER DEFAULT 0,
  videos_failed        INTEGER DEFAULT 0,
  segments_processed   INTEGER DEFAULT 0,
  avg_segs_per_second  REAL DEFAULT 0,
  
  shards_written       INTEGER DEFAULT 0,
  shards_uploaded      INTEGER DEFAULT 0,
  total_parquet_mb     REAL DEFAULT 0,
  
  started_at           TIMESTAMPTZ DEFAULT now(),
  last_heartbeat_at    TIMESTAMPTZ DEFAULT now(),
  last_video_completed_at TIMESTAMPTZ,
  
  config_json          JSONB,
  last_error           TEXT
);
```

The dashboard can query this table for live fleet monitoring: which workers are alive (heartbeat < 60s ago), throughput per worker, total progress, stale workers, etc.

### Claim query

```sql
UPDATE video_queue
SET validation_status = 'validating',
    claimed_by = $1,
    claimed_at = now()
WHERE video_id = (
    SELECT video_id FROM video_queue
    WHERE status = 'done' AND validation_status = 'pending'
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
RETURNING video_id, language, segment_count
```

## 2. Rewrite `validations/worker.py`

Major upgrade modeled on the battle-tested `[src/worker.py](src/worker.py)`:

- **Registration**: Insert into `worker_validators` on startup with config_json (model list, batch sizes, GPU type)
- **Heartbeat loop**: Async task, 30s + jitter, updates `worker_validators` with live stats (videos_processed, segments_processed, avg_segs_per_second, current_video_id, shards_written)
- **Prefetch**: While processing video N, claim + download video N+1 in background (`asyncio.create_task`)
- **Graceful shutdown**: SIGTERM handler → finish current video → flush parquet → release claimed videos back to `pending` → set worker `offline`
- **Dead video recovery**: On startup, reset any `validating` videos claimed by this worker_id back to `pending` (handles crashes)
- **Throughput tracking**: Rolling window of last 10 videos for `avg_segs_per_second`

## 3. Dockerfile

New `Dockerfile.validation` based on NVIDIA PyTorch base image:

```dockerfile
FROM pytorch/pytorch:2.6.0-cuda12.4-cudnn9-runtime

RUN apt-get update && apt-get install -y libsndfile1 && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir transformers speechbrain onnxruntime-gpu \
    pyarrow pandas boto3 asyncpg soundfile python-dotenv

# Pre-download models into image (avoids per-GPU download on startup)
RUN python -c "from transformers import ...; ..." 

COPY validations/ /app/validations/
COPY src/config.py src/r2_client.py /app/src/

ENTRYPOINT ["python", "-m", "validations.main"]
```

Image size: ~8-10 GB (models baked in). Push to Docker Hub as `bharathkumar192/validation-worker:latest`.

## 4. R2 Bucket

Create `validation-results` bucket via Cloudflare dashboard or API. Structure:

```
validation-results/
  shards/{worker_id}/validation_{worker_id}_shard_0001.parquet
  shards/{worker_id}/validation_{worker_id}_shard_0001.manifest.json
```

## 5. Vast.ai Deploy Script

`deploy_validation.sh` — searches for RTX 3090/4090 instances, launches Docker containers with env vars:

```bash
vastai search offers 'gpu_name in [RTX_3090, RTX_4090] disk_space >= 50 inet_down >= 200'
vastai create instance $INSTANCE_ID \
  --image bharathkumar192/validation-worker:latest \
  --disk 50 \
  --env "DATABASE_URL=... R2_ENDPOINT_URL=... HF_TOKEN=... WORKER_ID=val-{N} GPU_TYPE=RTX_3090 CONFORMER_BATCH_SIZE=32 PARQUET_SHARD_SIZE=50"
```

## 6. Safety / Anti-Deadlock

- `FOR UPDATE SKIP LOCKED` prevents double-claims (proven at scale in transcription pipeline)
- On crash: worker's claimed videos stay as `validating`. On restart, a recovery query resets stale claims:

```sql
  UPDATE video_queue SET validation_status = 'pending', claimed_by = NULL
  WHERE validation_status = 'validating' 
    AND claimed_at < now() - interval '30 minutes'
  

```

- Heartbeat staleness: dashboard can flag workers with `last_heartbeat_at` > 2 minutes ago
- Parquet shards are write-once, append-only — no corruption risk from concurrent workers

## Estimated Timeline


| Step                                           | Time           |
| ---------------------------------------------- | -------------- |
| Schema migration (SQL)                         | 5 min          |
| Worker rewrite (heartbeat, prefetch, shutdown) | 30 min         |
| Dockerfile + local test                        | 20 min         |
| R2 bucket creation                             | 2 min          |
| Vast.ai deploy script                          | 10 min         |
| End-to-end test (1 worker, 5 videos)           | 10 min         |
| **Total**                                      | **~1.5 hours** |


