# Gemini Transcription Pipeline

Distributed audio transcription system using **Gemini 3 Flash** via Google AI Studio. Processes 500K+ multilingual YouTube video segments across 12 Indic languages. Workers claim videos from a Supabase queue, download audio tars from Cloudflare R2, polish/split/transcribe segments, validate results, and upload output tars back to R2.

## Architecture

```
Supabase (video_queue)  ──claim──>  Worker (Docker container)
                                       │
                                       ├── R2: download {id}.tar
                                       ├── Audio Polish: split >10s, trim edges, pad silence
                                       ├── Gemini 3 Flash: transcribe all segments (batch, async)
                                       ├── Validator: quality score, lane flags
                                       ├── Supabase: insert transcription_results + flags
                                       ├── R2: upload {id}_transcribed.tar
                                       └── Supabase: mark video done
```

Each worker is a single Docker container running `python -m src.main`. Workers are stateless — all state lives in Supabase and R2.

## Project Structure

```
src/
  main.py              Entry point (Docker ENTRYPOINT)
  config.py            All tunables, env vars, language mappings
  worker.py            Worker lifecycle: register, heartbeat, main loop, prefetch, shutdown
  pipeline.py          Per-video orchestrator: download → polish → transcribe → pack → upload
  batch_cycle.py       Batch engine: fire requests, handle 429 flood, validate, build DB records
  cache_manager.py     Gemini explicit context caching (V2 prompt, 6-day TTL)
  audio_polish.py      4-step audio pipeline: split → boundary-trim → silence-pad → FLAC encode
  prompt_builder.py    V1/V2 system prompts, Pydantic JSON schema, user prompt templates
  validator.py         Tier 1 quality validation: chars/sec, script check, lang mismatch, etc.
  db.py                PostgreSQL client via asyncpg (+ MockDB for testing)
  r2_client.py         Cloudflare R2 client (download/extract/pack/upload tars)
  providers/
    base.py            Abstract provider interface + data classes
    aistudio.py        PRIMARY: direct Gemini REST API, V2 cache, 20s timeout, thinking override
    openrouter.py      SECONDARY: OpenRouter (kept for preflight tests, not used in production)

tests/                 73 unit/integration tests (pytest)
preflight/             Live validation scripts (canary, stress, determinism, dashboard)
```

## Gemini Keys (Multi-Key Pool)

Four GCP project API keys with identical quotas. Each key: **20K RPM, 20M TPM, unlimited RPD**.

```
GEMINI_KEY       = primary (GCP project 1)
GEMINI_PROJECT2  = secondary (GCP project 2)
GEMINI_PROJECT3  = tertiary (GCP project 3)
GEMINI_PROJECT4  = quaternary (GCP project 4)
```

**Combined capacity: 80K RPM, 80M TPM.**

At deployment, each worker gets assigned a key via `GEMINI_KEY_INDEX=0|1|2|3`. The next key in the pool is the 429 fallback. Each key gets its own explicit V2 context cache (auto-created on startup, 6-day TTL, 1036 tokens).

## Environment Variables

### Required

| Variable | Description |
|----------|-------------|
| `GEMINI_KEY` | Primary Gemini API key |
| `GEMINI_PROJECT2` | Second Gemini API key |
| `GEMINI_PROJECT3` | Third Gemini API key |
| `GEMINI_PROJECT4` | Fourth Gemini API key |
| `DATABASE_URL` | PostgreSQL connection string (direct PG, bypasses REST) |
| `URL` | Supabase REST API URL (dashboard only) |
| `SUPABASE_ADMIN` | Supabase service_role JWT (dashboard only) |
| `R2_ENDPOINT_URL` | Cloudflare R2 S3-compatible endpoint |
| `R2_ACCESS_KEY_ID` | R2 access key |
| `R2_SECRET_ACCESS_KEY` | R2 secret key |
| `R2_BUCKET` | R2 bucket name (default: `1-cleaned-data`) |

### Deployment Controls

| Variable | Default | Description |
|----------|---------|-------------|
| `GEMINI_KEY_INDEX` | `0` | Which key this worker uses as primary (0, 1, 2, or 3) |
| `WORKER_ID` | auto UUID | Unique worker identifier |
| `GPU_TYPE` | `unknown` | Label for Supabase dashboard |
| `MAX_VIDEOS` | `0` | Stop after N videos (0 = unlimited, for test runs) |
| `WORKER_BATCH_SIZE` | `1000` | Max segments per API batch |
| `BATCH_INTERVAL_SECONDS` | `60` | Min seconds between batches (rate limiting) |
| `TEMPERATURE` | `0` | Gemini temperature |
| `THINKING_LEVEL` | `low` | Gemini thinking level (LOW/MINIMAL) |
| `MOCK_MODE` | `false` | Run without real APIs |

## Supabase Schema

### video_queue
Claim queue for workers. 507K videos seeded across 12 languages.

| Column | Type | Description |
|--------|------|-------------|
| video_id | text PK | YouTube video ID |
| status | text | pending / claimed / done / failed |
| language | text | ISO 639-1 code (ta, hi, te, etc.) |
| segment_count | int | Number of raw segments in tar |
| claimed_by | text | Worker ID that claimed it |
| claimed_at | timestamptz | When claimed |
| completed_at | timestamptz | When finished |
| error_message | text | Error details if failed |

### workers
Live worker status and stats. Updated via heartbeat every 10s.

| Column | Type | Description |
|--------|------|-------------|
| worker_id | text PK | Unique worker ID |
| status | text | online / offline / error |
| provider | text | Always "aistudio" |
| gpu_type | text | GPU label |
| config_json | jsonb | Prompt/schema versions, temperature, thinking level |
| total_segments_sent/completed/failed/429 | int | Cumulative counts |
| total_cache_hits | int | Segments served from V2 cache |
| total_input_tokens / output_tokens / cached_tokens | int | For cost tracking |
| batches_completed | int | Total batch count |
| current_video_id | text | Video being processed |
| segments_remaining | int | In current video |
| active_rpm / active_tpm | float | Live throughput |

### transcription_results
One row per transcribed segment.

| Column | Type | Description |
|--------|------|-------------|
| id | uuid PK | |
| video_id | text | Source video |
| segment_file | text | Segment filename |
| expected_language_hint | text | Language from queue |
| detected_language | text | Language model actually heard |
| lang_mismatch_flag | bool | expected != detected |
| transcription | text | Verbatim native-script transcription |
| tagged | text | Same with event tags inserted |
| speaker_emotion / style / pace / accent | text | Prosody metadata |
| quality_score | float | 0-1 composite quality |
| asr_eligible / tts_clean_eligible / tts_expressive_eligible | bool | Downstream lane flags |
| prompt_version / schema_version / model_id / temperature | text | Reproducibility metadata |
| cache_hit | bool | Whether V2 cache was used |
| token_usage_json | jsonb | input/output/cached token counts |

### transcription_flags
Quality flags for flagged segments (errors, timeouts, validation issues).

## Audio Polish Pipeline

4-step process defined in `audio_polish.py`:

1. **Length split**: Segments >10s split at silence valleys (search from 7s). Hard upper bound 15s.
2. **Boundary trim**: Check first/last 50ms RMS, trim dirty edges at nearest silence valley.
3. **Silence pad**: 150ms silence prepended and appended.
4. **FLAC encode**: Convert to FLAC bytes + base64 for API.

Segments <2s are discarded at any stage. All valid segments are 2s–15s, most ≤10s.

## Prompt System

**V2 (production)**: Language-agnostic cacheable system prompt (~1036 tokens) with:
- Verbatim transcription rules (no translation, no correction, no hallucination)
- Code-mixed handling (each language in its native script)
- Prosody-based punctuation only
- 6 reference examples (Telugu+English code-mix, Hindi+English, no-speech, abrupt cutoff, event tags, language mismatch)
- 10 audio event tags
- Accent guidance: sub-regional dialect only, not language name

Per-request user prompt carries the language hint: `TARGET LANGUAGE: Tamil (ta)`.

JSON schema enforced via API `responseJsonSchema` (not in prompt text).

## Timeout & Retry Strategy

| Scenario | Behavior |
|----------|----------|
| Normal response | 2-5s latency |
| Read timeout (20s) | Retry once with `thinking=MINIMAL` (avoids re-entering thinking loop) |
| Second timeout (20s) | Discard segment as bad sample. Max 40s waste. |
| 429 rate limit | Exponential backoff: 10s, 20s, 40s (3 retries) |
| 429 flood (>10% of batch) | Entire batch of 429'd segments retried on fallback key |

## R2 Storage Layout

```
Input:   s3://1-cleaned-data/{video_id}.tar
         s3://1-cleaned-data/cleaned/trail/{video_id}.tar  (alternate prefix)

Output:  s3://1-cleaned-data/transcribed/{video_id}_transcribed.tar
```

Output tar structure:
```
{video_id}/
  metadata.json           Original metadata + transcription_summary
  segments/               Polished FLAC audio (valid segments only)
  transcriptions/         1:1 JSON per segment (transcription, tagged, speaker, detected_language)
```

## Docker

```bash
# Build
docker build -t bharathkumar192/transcription-worker:v8 .

# Run tests inside container
docker run --rm --entrypoint python bharathkumar192/transcription-worker:v8 -m pytest tests/ -x -q

# Test run (2 videos, single key)
docker run --rm --env-file .env \
  -e MAX_VIDEOS=2 \
  -e GEMINI_KEY_INDEX=0 \
  -e WORKER_ID=test-worker \
  bharathkumar192/transcription-worker:v8

# Production (unlimited videos)
docker run --rm --env-file .env \
  -e GEMINI_KEY_INDEX=0 \
  -e WORKER_ID=worker-001 \
  -e GPU_TYPE=RTX_4090 \
  bharathkumar192/transcription-worker:v8
```

CLI args: `--mock`, `--worker-id ID`, `--gpu-type TYPE`, `--key-index 0|1|2|3`, `--max-videos N`

## vast.ai Deployment

**Target**: 50 workers across 4 keys (~12 workers per key).

**Docker image**: `bharathkumar192/transcription-worker:v8`

Per worker: assign `GEMINI_KEY_INDEX` in round-robin (0,1,2,3,0,1,2,3,...), unique `WORKER_ID`, set `GPU_TYPE` to the instance GPU.

```bash
# Deploy on vast.ai (RTX 4090/3090, 4+ vCPUs)
vastai create instance <offer_id> \
  --image bharathkumar192/transcription-worker:v8 \
  --disk 50 --ssh --direct \
  --login '-u bharathkumar192 -p <DOCKER_PAT> docker.io' \
  --env "-e R2_ENDPOINT_URL=... -e DATABASE_URL=... -e GEMINI_KEY_INDEX=0 -e WORKER_ID=v8-prod-001 -e GPU_TYPE=RTX_4090 ..." \
  --onstart-cmd 'cd /app && python -m src.main >> /var/log/worker.log 2>&1 &' \
  --label "v8-prod-001"
```

**Capacity math**:
- 4 keys x 20K RPM = 80K RPM total
- 50 workers x ~600 RPM avg = ~30K RPM (well within 80K limit)
- 507K videos in queue, ~207 segments avg = ~105M segments total

**Live vast.ai v8 deployment** (2026-03-01): 48 RTX 3090/4090 workers, global
- 179K segments processed in first 15 min: 0 429s, 0 failures, 100% cache hits
- Combined RPM: ~31K, ETA ~2 days for remaining 443K videos
- Direct PostgreSQL via asyncpg: zero DB timeouts (was the v4-v7 killer)

**v4→v8 fix history**:
- v4: parallelized audio polish + async IO (2x throughput), but bulk inserts hit Supabase statement timeout
- v5-v6: chunked inserts, reduced chunk size — still overwhelmed PostgREST at 45+ workers
- v7: retry on register/heartbeat — PostgREST REST API (HTTP 000 timeout) buckled under load
- **v8: replaced Supabase REST entirely with direct PostgreSQL (asyncpg)**. Atomic claims via FOR UPDATE SKIP LOCKED, connection pooling, executemany batch inserts, heartbeat jitter. Zero DB issues at scale.

**Monitoring**: Direct PostgreSQL queries on `workers` and `video_queue` tables. Workers log `[GEMINI-STATS]` (429/500/timeout counters) and `[DB-STATS]` (insert/claim/heartbeat counters) periodically.

## Test Results (Verified)

| Test | Result |
|------|--------|
| Unit tests | 73/73 pass (inside Docker container) |
| 12-language determinism (AI Studio) | 90% exact match across 3 runs |
| 500-concurrency stress | 500/500 success, 0 errors, 0 429s, 100% cache hits |
| 1000-segment stress | 1000/1000 success, 74.9 req/s |
| Multi-key consistency (3 GCP keys) | All 3 keys: 80-90% determinism, identical quality |
| Live 2-video Docker E2E | 419/420 segments (99.8%), 62s wall time, 100% cache hits |
| Prefetch | Verified: video 2 tar pre-downloaded during video 1 processing, zero duplicate downloads |
| CAS claim lock | Verified: no race conditions in Supabase claim |
| Vast.ai RTX 4090 E2E | 685/685 segments (100%), 909 RPM, 0 failures, 0 429s |

## Video Queue Distribution

| Language | Code | Videos |
|----------|------|--------|
| Telugu | te | 71K |
| Malayalam | ml | 65K |
| English | en | 60K |
| Hindi | hi | 59K |
| Punjabi | pa | 56K |
| Tamil | ta | 50K |
| Kannada | kn | 46K |
| Gujarati | gu | 41K |
| Bengali | bn | 20K |
| Odia | or | 19K |
| Marathi | mr | 16K |
| Assamese | as | 4K |
| **Total** | | **507K** |

## Cost Tracking

Token usage is tracked per-worker in Supabase (`total_input_tokens`, `total_output_tokens`, `total_cached_tokens`). Cost formula:

```
cost = (input_tokens - cached_tokens) * input_rate
     + cached_tokens * input_rate * 0.25
     + output_tokens * output_rate
```

Gemini 3 Flash pricing applies. With V2 caching (~84% cache hit rate by token count), input costs are significantly reduced.

## Live Dashboard

**URL**: https://dashboard-theta-lovat-45.vercel.app

Next.js dashboard deployed on Vercel. Auto-refreshes every 15s. Pulls live data from all 4 Supabase tables:

- **Aggregate overview**: Videos done/pending/failed, segments processed, tokens used, estimated cost, ETA
- **Worker heartbeats**: Status grid with live RPM, segments completed, GPU type, heartbeat staleness
- **Per-worker detail**: Click any worker for full breakdown — segments, cache hits, tokens, cost, config, errors
- **Language progress**: Per-language progress bars across all 12 Indic languages (507K videos)
- **Queue chart**: Pie chart of pending/claimed/done/failed
- **Errors & flags**: Failed videos with error messages, flag type breakdown

Source: `dashboard/` directory. Environment: `NEXT_PUBLIC_SUPABASE_URL` + `SUPABASE_SERVICE_KEY`.

## Validation Pipeline (LID + CTC Scoring)

**Status**: Fleet-ready. Docker image + Vast.ai deploy script complete.

Multi-model validation that runs every segment through 4 models to produce rich per-segment quality data:

1. **MMS LID-256** (Meta, 1B params): Language classification — 256 languages, ~21 segs/s on A100
2. **VoxLingua107** (SpeechBrain, 14M params): Second LID opinion + 256-dim speaker embeddings, ~78 segs/s
3. **IndicConformer 600M** (AI4Bharat, multilingual): CTC ASR on 22 Indic languages + CER scoring vs Gemini, ~2.4 segs/s
4. **English CTC** (facebook/wav2vec2-large, 315M params): English CTC log-likelihood scoring, ~18 segs/s

**Output**: Parquet shards (50 videos each) with 27 columns per segment — all model scores, confidences, top-3 predictions, speaker embeddings, transcription comparison, LID consensus. Designed for offline bucketing into Golden/Redo/Discard sets.

```bash
# Test on a single video (downloads from R2, runs all models):
python -m validations.main --test-local="VIDEO_ID" --models all

# Run worker loop (claims from DB, processes, packs parquet):
python -m validations.main --max-videos 10
```

**VRAM**: 4.0GB for all 4 models on A100 (fits easily on 3090 24GB).
**Throughput**: ~2 segs/s total (conformer bottleneck), ~50 segments per video ≈ 25s/video.

Source: `validations/` directory.

### Validation Supabase Schema

Uses a separate `validation_status` column on `video_queue` (keeps transcription `status='done'` untouched):
- States: `pending` → `validating` → `validated` / `validation_failed`
- Claim: `WHERE status = 'done' AND validation_status = 'pending' ORDER BY video_id FOR UPDATE SKIP LOCKED`
- Index: `idx_vq_val_claim` partial index on `(video_id) WHERE validation_status='pending' AND status='done'`
  - ORDER BY video_id forces planner to use the partial index (105x faster than seq scan)
- Dashboard indexes: `idx_vq_status_lang (status, language)`, `idx_vq_done_completed (completed_at) WHERE status='done'`

`worker_validators` table tracks live fleet: worker_id, status, gpu_type, videos/segments processed, avg throughput, shards written, heartbeat, last_error. Dashboard can query for stale workers (heartbeat > 2min ago).

### Validation Docker

```bash
# Build (models baked in, ~10GB image)
docker build -f Dockerfile.validation -t bharathkumar192/validation-worker:latest \
  --build-arg HF_TOKEN=$HF_TOKEN .

# Test locally
docker run --rm --gpus all --env-file .env \
  -e MAX_VIDEOS=2 -e WORKER_ID=test-val \
  bharathkumar192/validation-worker:latest

# Push to Docker Hub
docker push bharathkumar192/validation-worker:latest
```

### Validation Vast.ai Deployment

```bash
# Deploy 50 workers
source .env && ./deploy_validation.sh 50

# Dry run (preview)
source .env && ./deploy_validation.sh 10 --dry-run

# Monitor
vastai show instances
```

**Capacity math (150 GPUs, post DB-optimization)**:
- ~2 segs/s per GPU, ~50 segments/video ≈ 25s/video
- 150 GPUs: ~6 videos/s → 465K remaining in ~21 hours
- DB fixes (2026-03-04): claim query 15.8ms→0.15ms (105x), heartbeat 30s→60s, pool 6→3 conns/worker

**R2 buckets**: Source=`transcribed` (507K tars), Output=`validation-results` (parquet shards).

## Deferred / Not Implemented

- **Music/jingle detection**: Proactive skip of non-speech segments before sending to Gemini. Currently these timeout and get discarded (40s max waste each).
- **OpenRouter fallback**: Code exists in `src/providers/openrouter.py` but not used in production. OpenRouter doesn't support caching for Gemini 3 Flash (4096 token minimum, our prompt is 1036). Kept for preflight comparative tests only.
