---
name: Gemini Transcription Pipeline
overview: Distributed worker-orchestrator pipeline deployed as Docker images across 18+ vast.ai GPUs. Each worker claims videoID tars from Supabase, preprocesses audio on GPU, fires 1000 async requests/min to AI Studio (primary) or OpenRouter (fallback on 429s), validates with optional GPU-based IndicConformer, packs results back to R2, and reports heartbeat/metrics to Supabase. Target 100M segments in 3-4 days.
todos:
  - id: setup-env
    content: "Project setup: Dockerfile, requirements.txt, .env template, config module with language mappings, provider configs, worker registration"
    status: pending
  - id: worker-core
    content: "Worker core: Docker entrypoint, auto-register with Supabase (unique instance_id), heartbeat loop, graceful shutdown, exception tracking"
    status: pending
  - id: supabase-schema
    content: "Supabase schema: video_queue table (claim/lock), workers table (heartbeat/stats), transcription_results, transcription_flags. Check existing tables in DB first."
    status: pending
  - id: r2-client
    content: "R2 client: download videoID.tar, extract metadata.json + segments/*.flac, pack results + transcriptions back into tar, upload finished tar to R2"
    status: pending
  - id: audio-preprocess
    content: "Audio polishing: FIRST length-split at silence valleys (Concern 8), THEN boundary-trim dirty edges (Concern 7), THEN 150ms silence pad. Store all trim metadata."
    status: pending
  - id: prompt-builder
    content: "Prompt builder: lean system prompt template (~400 tokens, no schema), Pydantic JSON schema with ISO lang enum, number-as-digits, NO_SPEECH handling"
    status: pending
  - id: aistudio-client
    content: "AI Studio client (PRIMARY): fire 1000 async requests per batch, track returned/rate-limited/pending per batch, exponential backoff on 429s, cache-hit verification via usage_metadata"
    status: pending
  - id: openrouter-client
    content: "OpenRouter client (SECONDARY): same interface as AI Studio, activate on 429s or via config flag, cache verification via prompt_tokens_details, daily spend tracking"
    status: pending
  - id: validator
    content: "Validator: Tier 1 programmatic checks + quality_score + lane flags. Optional Tier 1.5 GPU validation (romanization via aksharamukha + IndicConformer CER) with disable flag."
    status: pending
  - id: batch-cycle
    content: "Batch cycle engine: 1-minute cadence loop. Fire 1000 requests -> collect results -> validate -> pack segments -> report metrics. Track all states per batch."
    status: pending
  - id: pipeline-orchestrator
    content: "Pipeline orchestrator per worker: claim video -> download tar -> polish audio -> batch-cycle through segments -> pack results to R2 -> mark video done -> claim next"
    status: pending
  - id: metrics-reporting
    content: "Metrics reporting: per-worker heartbeat to Supabase (requests sent/returned/429s/errors, current video, segments remaining, provider), live dashboard query support"
    status: pending
  - id: canary-test
    content: "Canary test: single worker, 1000 segments across all 12 languages, test both AI Studio and OpenRouter, verify caching, measure token usage, validate quality, test temp=0 for looping"
    status: pending
isProject: false
---

# Gemini Audio Transcription Pipeline - Comprehensive Plan

Distributed worker-orchestrator system for transcribing ~100M audio segments across 12 Indian languages using Gemini 3 Flash. Each worker is a Docker container deployed on vast.ai GPUs.

---

## Architecture Overview: Worker-Orchestrator Model

```mermaid
flowchart TD
    subgraph orchestration [Supabase Orchestration Layer]
        VQ[video_queue table - pending/claimed/done]
        WT[workers table - heartbeat + stats]
        TR[transcription_results - per-segment data]
    end

    subgraph worker_N ["Worker N (Docker on vast.ai GPU)"]
        REG[Register worker with unique instance_id]
        CLAIM[Claim next pending videoID from video_queue]
        DL[Download videoID.tar from R2]
        POLISH[Polish: length-split then boundary-trim then pad]
        BATCH[Fire 1000 async requests to provider]
        COLLECT[Collect responses - track returned/429/pending]
        VALIDATE[Tier 1 validation + optional GPU IndicConformer]
        PACK[Pack segments + transcriptions into tar]
        UPLOAD[Upload finished tar to R2]
        REPORT[Report metrics + heartbeat to Supabase]
        MARK[Mark videoID as done in video_queue]
    end

    subgraph providers [Inference Providers]
        AISTUDIO[AI Studio Realtime - PRIMARY]
        OPENROUTER[OpenRouter - SECONDARY on 429s or config]
    end

    VQ --> CLAIM
    CLAIM --> DL
    DL --> POLISH
    POLISH --> BATCH
    BATCH --> AISTUDIO
    BATCH --> OPENROUTER
    AISTUDIO --> COLLECT
    OPENROUTER --> COLLECT
    COLLECT --> VALIDATE
    VALIDATE --> PACK
    PACK --> UPLOAD
    UPLOAD --> MARK
    MARK --> CLAIM
    REPORT --> WT
    VALIDATE --> TR
    REG --> WT
```



---

## Worker Lifecycle

Each Docker container on startup:

1. **Register**: Generate or read unique `instance_id` (vast.ai offer_id, or mock UUID). Insert row into `workers` table with status=`online`, provider config, GPU type.
2. **Heartbeat loop** (background async task, every 10s): Update `workers` table with current stats: `segments_sent`, `segments_completed`, `segments_failed`, `segments_429`, `current_video_id`, `segments_remaining`, `provider`, `last_heartbeat_at`.
3. **Main loop**: Claim video -> process -> upload -> repeat until no videos left or shutdown signal, But updates live transcription runs for every 5s instead so we know how many per worker are live so we know if we aggregate all workers we need to match our RPM and TPM. so update supbase against the worker, active RPM and TPM per worker. 
4. **Graceful shutdown**: On SIGTERM/SIGINT, finish current batch, upload partial results, mark worker as `offline`, release any claimed video back to `pending`.
5. **Exception handling**: All exceptions logged with full traceback. Failed segments tracked in `transcription_flags`. Worker never crashes silently - all errors reported to Supabase. if any errors, also update to supabase's worker table 

---

## Pipeline Order: Polishing Phase

**Concern 8 (length splitting) runs FIRST, then Concern 7 (boundary trimming).**

This is the correct order because:

- Splitting a 30s segment at silence valleys creates sub-segments already at natural pause points
- Each sub-segment then gets its own boundary check - the split points are already clean(ish) since we cut at silence
- If you trimmed boundaries first on a 30s segment, you'd only fix the start/end but still need to split, creating new potentially-dirty edges at every split point

```
RAW SEGMENT (from tar)
  |
  v
STEP 1: LENGTH SPLIT (Concern 8)
  - Compute energy profile
  - If > 10s: find silence valleys, split at first valley after 7s
  - If > 15s and no valley: force-cut at lowest energy in 10-15s
  - If < 2s: DISCARD
  - Result: segments in 2-15s range
  |
  v
STEP 2: BOUNDARY TRIM (Concern 7)
  - For each sub-segment: check first/last 50ms RMS
  - If dirty start: scan forward for silence valley, trim
  - If dirty end: scan backward for silence valley, trim
  - Flag abrupt_start/abrupt_end if no valley found within 40%
  |
  v
STEP 3: SILENCE PAD
  - Prepend 150ms silence, append 150ms silence
  - Post-pad length check: discard if < 2s
  |
  v
STEP 4: ENCODE
  - Export as FLAC, encode to base64 for API request
  - Store all trim metadata (original offsets, trimmed offsets, pad amounts)
```

---

## Batch Cycle Engine (1-Minute Cadence)

Each worker processes segments in batches of 1000, on a 1-minute cadence:

```
MINUTE 0:00 - Fire batch of 1000 async requests
  |
  v
MINUTE 0:00-0:15 - Collect responses as they arrive
  - Track per request: success / 429 / error / timeout
  - On 429: wait 10s, retry with backoff (max 3 retries)
  - On 429 flood (>10% of batch): switch to OpenRouter for remainder
  |
  v
MINUTE 0:15-0:30 - Tier 1 Validation
  - Programmatic checks on all returned transcripts
  - Compute quality_score, lane flags
  - Flag failures for retry in next batch
  |
  v
MINUTE 0:30-0:45 - Optional GPU Validation (flag to disable)
  - Romanize Indic transcripts via aksharamukha
  - Run IndicConformer on GPU for the same audio segments
  - Compute CER between Gemini output and IndicConformer output
  - Store alignment_score per segment
  |
  v
MINUTE 0:45-0:55 - Pack + Upload
  - Pack completed segments + transcription JSONs
  - Batch insert results to Supabase
  - Update worker heartbeat with batch stats
  |
  v
MINUTE 0:55-1:00 - Prepare next batch
  - Queue next 1000 segments from current video
  - If video exhausted: pack tar, upload to R2, mark done, claim next video
  |
  v
MINUTE 1:00 - Fire next batch (repeat)
```

**Per-batch tracking** (reported to Supabase):

- `batch_id`, `worker_id`, `video_id`
- `segments_sent`, `segments_returned`, `segments_429`, `segments_error`
- `provider_used` (aistudio / openrouter / mixed)
- `cache_hits` (from token usage metadata)
- `avg_latency_ms`, `batch_duration_ms`
- `validation_scores_summary`

---

## Provider Strategy

### AI Studio (PRIMARY)

- Used by default on all workers
- 1000 requests/batch, 1 batch/min per worker
- 18 workers * 1000 RPM = 18,000 RPM (under 20K RPM limit, 2K headroom)
- Inline audio bytes (segments are <20MB)
- **Cache verification**: Check `usage_metadata.cached_content_token_count` in each response. If system prompt is being cached, this will be > 0. Log cache hit rate per batch.
- On 429 response: wait 10s, retry up to 3x. If >10% of batch gets 429: switch remaining requests to OpenRouter.

### OpenRouter (SECONDARY - on 429s or dedicated workers)

- Activated automatically when AI Studio returns 429s
- Can also be set as primary via config flag (`PROVIDER=openrouter`) for dedicated OpenRouter workers
- Same structured output via `response_format: { type: "json_schema" }`
- Base64-encoded audio in message content
- **Cache verification**: Check `usage.prompt_tokens_details.cached_tokens` in response. Log cache hit rate.
- If you're hitting heavy AI Studio rate limits, launch additional GPU workers with `PROVIDER=openrouter` to increase total throughput beyond AI Studio's 20K RPM ceiling.

### NO Batch API

Gemini Batch API is excluded. The "batching" here means firing 1000 concurrent async requests to the realtime API per minute per worker. This gives immediate feedback and keeps the 1-minute cadence clean.

---

## Rate Limit Math with Horizontal Scaling

**Target**: 100M segments in 3-4 days (72-96 hours)

**Per-segment token estimate** (verify in canary):

- Audio: ~8s avg at 32 tokens/s = ~256 audio tokens
- System prompt: ~400 text tokens (cached via implicit caching)
- User prompt: ~30 text tokens
- Output: ~200 text tokens
- Total: ~886 tokens/request

### AI Studio only (18 workers):

- 18 workers * 1000 req/min = 18,000 RPM
- Per hour: 1,080,000 segments
- Per day (24h): 25,920,000 segments
- **3 days: ~77.8M segments**
- **4 days: ~103.7M segments** (covers 100M with headroom)

### AI Studio (18) + OpenRouter overflow (4-6 extra workers):

- AI Studio: 18K RPM = 1.08M/hour
- OpenRouter: 4-6 workers * 1000 RPM = 4-6K RPM additional
- Combined: ~22-24K RPM = 1.32-1.44M/hour
- **3 days: ~95-104M segments** (covers 100M in 3 days!)

### Per-worker capacity:

- 1000 segments/min = ~1440K segments/day
- Each worker has a GPU (4090/A100/L40S) for:
  - Audio preprocessing (polishing) - CPU/GPU accelerated
  - Optional IndicConformer validation on GPU
  - NOT for Gemini inference (that's API calls)

---

## Cost Estimation (updated for 100M)

**AI Studio Realtime** (per 1M tokens):

- Audio input: $1.00 | Text input: $0.50 | Output: $3.00

**Per segment** (with caching):

- ~$0.00087 per segment

**100M segments via AI Studio: ~$87,000**

**OpenRouter** (likely similar or slightly higher pricing for Gemini 3 Flash):

- Estimate ~$0.001 per segment (depends on provider markup)
- If 20M segments via OpenRouter: ~$20,000

**Total estimate: $87K-$107K** depending on AI Studio vs OpenRouter split.

**Canary will give precise numbers.** This is back-of-envelope.

---

## Concern 5 (answer): Pipeline Order

**Length splitting (Concern 8) FIRST, then boundary trimming (Concern 7).**

Reasoning:

- A 30s raw segment needs to be split into 2-10s pieces BEFORE you can meaningfully trim boundaries
- Splitting finds silence valleys and cuts there - these are already relatively clean cut points
- After splitting, each sub-segment gets its own boundary check - this catches any remaining dirty edges
- If you trimmed first, you'd fix the start of the 30s segment but still need to split it, and each new split point could create dirty edges that you'd need to re-trim anyway
- The correct flow is: split at natural pauses -> trim each piece -> pad each piece

---

## Validation Strategy (Revised)

### Tier 1: Programmatic Checks (every segment, instant, free)

Runs within the batch cycle (0:15-0:30 window):

- Empty/NO_SPEECH check
- Length ratio (chars_per_second)
- Script check (unicodedata)
- Language mismatch flag
- Tag consistency (strip tags from `tagged`, compare to `transcription`)
- UNK/INAUDIBLE density
- Overlap detection
- Composite `quality_score` (0-1)
- Lane flags: `asr_eligible`, `tts_clean_eligible`, `tts_expressive_eligible`

### Tier 1.5: GPU-Accelerated Validation (optional, flag to disable)

Runs within the batch cycle (0:30-0:45 window) if `ENABLE_GPU_VALIDATION=true`:

- **Romanization**: Convert Indic script transcriptions to Roman via `aksharamukha` (~instant, CPU)
- **IndicConformer inference**: Run `ai4bharat/indic-conformer-600m-multilingual` on the same audio segments on the local GPU
- **CER computation**: Compare romanized Gemini output vs romanized IndicConformer output
- Store `alignment_score` per segment
- **Speed estimate**: IndicConformer processes ~100-200x realtime on GPU. 1000 segments * 8s avg = 8000s audio. At 200x realtime = 40s on GPU. Tight but feasible within the 15s window IF we batch inference properly. If too slow, sample-validate (every 10th segment).
- **This is a best-effort validation.** If it causes the 1-minute cadence to slip, disable it and run as a separate post-processing pass.

### No Tier 2 or Tier 3 in the main pipeline.

Heavy validation is deferred to post-processing after all 100M segments are transcribed.

---

## Supabase Schema

### Check existing tables first

The `.env` has `SUPABASE_TABLES` pointing to the dashboard. Before creating new tables, query existing tables to find the `videos` table (or similar) that has `video_id` and `language` columns. Use the existing `language` column as the hint for the prompt.

### New tables:

`**video_queue`** (controls which videos are processed):

- `video_id` (PK, references existing videos table)
- `status`: enum `pending` | `claimed` | `processing` | `done` | `failed`
- `claimed_by` (worker_id, nullable)
- `claimed_at` (timestamp, nullable)
- `completed_at` (timestamp, nullable)
- `segment_count` (total segments in tar)
- `segments_transcribed` (progress counter)
- `error_message` (nullable, for failed videos)

Populated by a one-time migration that inserts all video_ids from the existing videos table with status=`pending`.

`**workers**` (live worker registry + heartbeat):

- `worker_id` (PK, unique instance_id)
- `status`: enum `online` | `offline` | `error`
- `provider`: `aistudio` | `openrouter`
- `gpu_type` (e.g., "RTX 4090", "A100")
- `current_video_id` (nullable)
- `segments_remaining` (for current video)
- `total_segments_sent` (lifetime counter)
- `total_segments_completed`
- `total_segments_failed`
- `total_segments_429`
- `total_cache_hits`
- `batches_completed`
- `avg_batch_latency_ms`
- `last_heartbeat_at` (timestamp)
- `started_at` (timestamp)
- `config_json` (prompt_version, schema_version, trimmer_version, temperature, thinking_level)

`**transcription_results**` (per-segment, one row per segment):

- `id` (uuid, PK)
- `video_id`, `segment_file`, `speaker_id`
- `original_start_ms`, `original_end_ms`
- `trimmed_start_ms`, `trimmed_end_ms`
- `leading_pad_ms`, `trailing_pad_ms`
- `expected_language_hint`, `detected_language` (ISO enum)
- `lang_mismatch_flag` (bool)
- `transcription`, `tagged`
- `speaker_emotion`, `speaker_style`, `speaker_pace`, `speaker_accent`
- `num_unk`, `num_inaudible`, `num_event_tags`
- `boundary_score`, `text_length_per_sec`
- `overlap_suspected` (bool)
- `quality_score` (0-1)
- `alignment_score` (nullable, from GPU validation)
- `asr_eligible`, `tts_clean_eligible`, `tts_expressive_eligible` (bool)
- `prompt_version`, `schema_version`, `trimmer_version`, `validator_version`
- `model_id`, `temperature`, `thinking_level`
- `provider` (aistudio / openrouter)
- `worker_id`
- `cache_hit` (bool, from token usage)
- `token_usage_json` (input/output/cached counts)
- `created_at`

`**transcription_flags**`: segment_id, flag_type, details, resolved, resolved_at

---

## Prompt Caching - Verification

### AI Studio

- System instruction is passed in `config.system_instruction` for every request
- Gemini 3 Flash implicit caching kicks in at 1024+ tokens
- **Verify**: Check `response.usage_metadata.cached_content_token_count` - if > 0, cache is working
- Log `cache_hit_rate = cached_tokens / total_prompt_tokens` per batch
- To maximize cache hits: keep system_instruction byte-identical across all requests of the same language

### OpenRouter

- System message stays consistent across requests
- Provider sticky routing maximizes cache hits automatically
- **Verify**: Check `usage.prompt_tokens_details.cached_tokens` in response
- Log cache hit rate per batch
- Both providers' cache stats reported in worker heartbeat

---

## Output: Pack Results to R2

After all segments for a videoID are transcribed and validated:

1. Create output structure:

```
videoID_transcribed.tar
  ├── metadata.json (original + transcription summary stats)
  ├── segments/
  │   ├── speaker0_00000_00850.flac (original polished segment)
  │   ├── speaker0_00850_01200.flac
  │   └── ...
  └── transcriptions/
      ├── speaker0_00000_00850.json (full Gemini response + validation scores)
      ├── speaker0_00850_01200.json
      └── ...
```

1. Each `.json` file contains: transcription, tagged, speaker metadata, detected_language, quality_score, lane flags, alignment_score
2. Upload `videoID_transcribed.tar` to a separate R2 bucket (or prefix like `transcribed/`)
3. Mark video as `done` in `video_queue`

---

## Concerns 1-6: Unchanged

Concerns 1 (code-mixed), 2 (language handling + ISO enum), 3 (temp=0), 4 (thinking_level=low), 5 (10 event tags), 6 (speaker metadata + energetic rename) remain as decided in the previous plan revision. Not repeated here.

---

## Prompt Refinement: Unchanged

All 10 prompt changes from Concern 14 remain as decided. Key points:

- No schema in prompt text (API-only enforcement)
- Parameterized language hint
- Numbers as digits
- NO_SPEECH handling
- Lean ~400 token prompt

---

## Updated JSON Schema: Unchanged

The JSON schema with ISO `detected_language` enum, `energetic` speaking_style, and all field descriptions remains as previously defined.

---

## Implementation Modules

```
transcripts/
├── Dockerfile
├── docker-compose.yml (for local testing)
├── requirements.txt
├── .env.template
├── src/
│   ├── __init__.py
│   ├── config.py          # env vars, language mappings, provider configs, version constants
│   ├── worker.py           # worker lifecycle: register, heartbeat, main loop, shutdown
│   ├── r2_client.py        # download tar, extract, pack results, upload finished tar
│   ├── audio_polish.py     # length-split -> boundary-trim -> silence-pad (GPU accelerated)
│   ├── prompt_builder.py   # parameterized system prompt, Pydantic schema
│   ├── providers/
│   │   ├── __init__.py
│   │   ├── base.py         # abstract provider interface (send_batch, check_cache, etc.)
│   │   ├── aistudio.py     # AI Studio client: async requests, 429 handling, cache verification
│   │   └── openrouter.py   # OpenRouter client: same interface, cache verification
│   ├── batch_cycle.py      # 1-minute cadence engine: fire -> collect -> validate -> pack -> report
│   ├── validator.py        # Tier 1 programmatic + optional Tier 1.5 GPU (IndicConformer)
│   ├── db.py               # Supabase: video_queue ops, worker heartbeat, results insert, flags
│   ├── pipeline.py         # per-video orchestrator: claim -> polish -> batch-cycle -> upload -> done
│   └── main.py             # Docker entrypoint: parse args, init worker, run pipeline
└── tests/
    ├── test_audio_polish.py
    ├── test_providers.py
    ├── test_validator.py
    └── test_canary.py      # 1000-segment canary across 12 languages
```

### Provider modularity

Both `aistudio.py` and `openrouter.py` implement the same `BaseProvider` interface:

```python
class BaseProvider(ABC):
    async def send_batch(self, requests: list[TranscriptionRequest]) -> list[TranscriptionResponse]
    def verify_cache_hit(self, response) -> bool
    def get_token_usage(self, response) -> TokenUsage
    def get_provider_name(self) -> str
```

The `batch_cycle.py` calls `provider.send_batch()` and doesn't care which backend is active. Switching from AI Studio to OpenRouter is a single config change or automatic on 429 flood.

---

## Docker Deployment on vast.ai

```dockerfile
FROM python:3.11-slim
# Install system deps for librosa/soundfile + CUDA for IndicConformer
RUN apt-get update && apt-get install -y libsndfile1 ffmpeg
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ /app/src/
COPY .env /app/.env
WORKDIR /app
ENV PROVIDER=aistudio
ENV ENABLE_GPU_VALIDATION=false
ENV WORKER_BATCH_SIZE=1000
ENV BATCH_INTERVAL_SECONDS=60
ENTRYPOINT ["python", "-m", "src.main"]
```

Deploy N instances on vast.ai. Each instance:

- Gets a unique offer_id (used as worker_id)
- Reads `PROVIDER` env var to know which backend to use
- Auto-registers with Supabase on start
- Starts claiming and processing videos immediately

To scale up: deploy more instances. To switch provider: deploy new instances with `PROVIDER=openrouter`.

To monitor: query `workers` table for live stats. Query `video_queue` for progress. All from Supabase dashboard or a simple SQL query.