# 🚀 Unified Multi-Cloud Worker Architecture

**Created:** December 27, 2025  
**Goal:** Process 340K+ English podcasts using 150 T4 GPUs (Azure) + Runpod + any cloud

---

## 📊 Current Status

### Infrastructure Available

| Resource | Count | Location | Status |
|----------|-------|----------|--------|
| **Azure T4 GPUs** | 75 | eastus | ✅ 300 vCPU quota |
| **Azure T4 GPUs** | 75 | westus2 | ✅ 300 vCPU quota |
| **Local A100 80GB** | 1 | Current machine | ✅ Testing |
| **Runpod** | Variable | Serverless | 📋 Optional |

### Storage (R2)

| Bucket | Objects | Size | Purpose |
|--------|---------|------|---------|
| `pretraindata-english` | 9,989 | 203 GB | Raw audio |
| `pt-english` | 5,062 | 43 GB | English pretrain |
| `pt-indic` | 99,686 | 1.5 TB | Indic pretrain |
| `test` | 252,607 | 6.1 TB | Podcasts |
| **TOTAL** | 367,344 | **7.9 TB** | |

### Data to Process

| Dataset | Videos | Hours | Status |
|---------|--------|-------|--------|
| `english_podcasts.csv` | 340,424 | 282,702 | 📋 Pending |

---

## 🏗️ Architecture Overview

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    UNIFIED MULTI-CLOUD ARCHITECTURE                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌────────────────────────────────────────────────────────────────────────┐ │
│  │                     COORDINATOR API (FastAPI)                          │ │
│  │                                                                         │ │
│  │  Endpoints:                                                             │ │
│  │  ├─ POST /worker/register      → Register new worker                   │ │
│  │  ├─ GET  /job/request          → Get next video_id to process          │ │
│  │  ├─ POST /job/start            → Mark job as in_progress               │ │
│  │  ├─ POST /job/complete         → Mark job as done + upload stats       │ │
│  │  ├─ POST /job/fail             → Report failure + increment retry      │ │
│  │  ├─ GET  /job/status/{id}      → Check job status                      │ │
│  │  └─ GET  /stats                → Dashboard metrics                     │ │
│  │                                                                         │ │
│  │  Database: Supabase PostgreSQL (source of truth)                       │ │
│  │  Cache: Redis (optional, for rate limiting)                            │ │
│  └────────────────────────────────────────────────────────────────────────┘ │
│                              │                                               │
│              ┌───────────────┼───────────────┐                              │
│              │               │               │                              │
│              ▼               ▼               ▼                              │
│  ┌──────────────────┐ ┌──────────────┐ ┌──────────────────┐                │
│  │  AZURE WORKERS   │ │   RUNPOD     │ │  ANY CLOUD/GPU   │                │
│  │  (150 T4 GPUs)   │ │  SERVERLESS  │ │                  │                │
│  │                  │ │              │ │                  │                │
│  │  eastus: 75x T4  │ │  A100/A40    │ │  GCP/Lambda Labs │                │
│  │  westus2: 75x T4 │ │  On-demand   │ │  On-premise      │                │
│  └────────┬─────────┘ └──────┬───────┘ └────────┬─────────┘                │
│           │                  │                  │                           │
│           └──────────────────┼──────────────────┘                           │
│                              │                                               │
│                              ▼                                               │
│  ┌────────────────────────────────────────────────────────────────────────┐ │
│  │                    UNIVERSAL WORKER CONTAINER                          │ │
│  │                                                                         │ │
│  │  On Startup:                                                            │ │
│  │  1. Detect GPU type & VRAM (nvidia-smi)                                │ │
│  │  2. Auto-tune batch sizes (80% VRAM target)                            │ │
│  │  3. Register with Coordinator API                                       │ │
│  │                                                                         │ │
│  │  Main Loop:                                                             │ │
│  │  while True:                                                            │ │
│  │    1. GET /job/request → video_id                                      │ │
│  │    2. POST /job/start                                                  │ │
│  │    3. Download audio (yt-dlp)                                          │ │
│  │    4. Process (diarization + embeddings + music)                       │ │
│  │    5. Upload to R2 (metadata.json + original.wav)                      │ │
│  │    6. POST /job/complete (with stats)                                  │ │
│  │    7. Cleanup local files                                              │ │
│  │                                                                         │ │
│  │  On Error:                                                              │ │
│  │  - POST /job/fail (error_type, retry_count)                            │ │
│  │  - Auto-retry with exponential backoff                                 │ │
│  │  - OOM → reduce batch size → retry                                     │ │
│  │                                                                         │ │
│  └────────────────────────────────────────────────────────────────────────┘ │
│                              │                                               │
│                              ▼                                               │
│  ┌────────────────────────────────────────────────────────────────────────┐ │
│  │                     CLOUDFLARE R2 STORAGE                              │ │
│  │                                                                         │ │
│  │  Bucket: diarization-output/                                           │ │
│  │  ├── {video_id}/                                                       │ │
│  │  │   ├── metadata.json      (diarization results)                      │ │
│  │  │   ├── original.wav       (48kHz source audio)                       │ │
│  │  │   └── stats.json         (processing stats)                         │ │
│  │  └── manifests/                                                         │ │
│  │      └── manifest_{date}.jsonl                                         │ │
│  │                                                                         │ │
│  │  Endpoint: https://cb908ed13329eb7b186e06ab51bda190.r2.cloudflarestorage│
│  └────────────────────────────────────────────────────────────────────────┘ │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

---

## 🔧 Component Details

### 1. Coordinator API (FastAPI + Supabase)

```python
# Key Database Schema (Supabase PostgreSQL)

CREATE TABLE videos (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    video_id VARCHAR(16) UNIQUE NOT NULL,
    youtube_url TEXT NOT NULL,
    status VARCHAR(32) DEFAULT 'pending',
    -- Status: pending → claimed → processing → completed → failed
    
    -- Worker tracking
    worker_id VARCHAR(64),
    claimed_at TIMESTAMPTZ,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    
    -- Results
    duration_sec FLOAT,
    num_speakers INT,
    usable_pct FLOAT,
    r2_path TEXT,
    
    -- Error handling
    error_message TEXT,
    retry_count INT DEFAULT 0,
    max_retries INT DEFAULT 3,
    
    -- Metadata
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_videos_pending ON videos(status, created_at) 
    WHERE status = 'pending';
CREATE INDEX idx_videos_worker ON videos(worker_id, status);
```

### 2. Auto-Tuning Config (GPU Detection)

```python
# Auto-tune based on detected GPU VRAM

GPU_CONFIGS = {
    # VRAM (GB): (embed_batch, music_batch, chunk_workers)
    16:  (128, 32, 1),   # T4, V100-16GB
    24:  (256, 64, 1),   # RTX 3090, A10
    40:  (512, 100, 2),  # A100-40GB
    48:  (512, 100, 2),  # A6000
    80:  (754, 150, 2),  # A100-80GB
}

def auto_tune_config():
    """Detect GPU and return optimal config."""
    import subprocess
    result = subprocess.run(
        ['nvidia-smi', '--query-gpu=memory.total', '--format=csv,noheader,nounits'],
        capture_output=True, text=True
    )
    vram_mb = int(result.stdout.strip().split('\n')[0])
    vram_gb = vram_mb // 1024
    
    # Find closest config (round down to be safe)
    for gb in sorted(GPU_CONFIGS.keys(), reverse=True):
        if vram_gb >= gb:
            return GPU_CONFIGS[gb]
    
    # Fallback: minimal config
    return (64, 16, 1)
```

### 3. Worker Loop (Main Logic)

```python
# Simplified worker main loop

async def worker_main():
    # 1. Auto-detect and configure
    config = auto_tune_config()
    worker_id = f"{platform.node()}-{uuid.uuid4().hex[:8]}"
    
    # 2. Register with coordinator
    await api.register_worker(worker_id, gpu_info())
    
    # 3. Main processing loop
    while True:
        try:
            # Request job
            job = await api.request_job(worker_id)
            if not job:
                await asyncio.sleep(10)  # No jobs, wait
                continue
            
            # Start job
            await api.start_job(job['video_id'], worker_id)
            
            # Process
            result = process_video(
                job['video_id'],
                job['youtube_url'],
                config
            )
            
            # Upload to R2
            r2_path = await upload_to_r2(job['video_id'], result)
            
            # Complete job
            await api.complete_job(job['video_id'], result, r2_path)
            
        except OOMError:
            # Reduce batch size and retry
            config = reduce_batch_size(config)
            await api.fail_job(job['video_id'], "OOM", retry=True)
            
        except Exception as e:
            await api.fail_job(job['video_id'], str(e))
```

---

## 📈 Throughput Projections

### T4 (16GB) Performance Estimates

With reduced batch sizes (`embed=128, music=32`):

| Metric | Value | Notes |
|--------|-------|-------|
| Processing time | ~100s/hr audio | ~30% slower than A100 |
| Videos/hour/GPU | ~36 | Avg 50min video |
| Daily/GPU | ~864 videos | 24hr operation |

### Fleet Projections

| Scenario | GPUs | Daily Videos | Time for 340K |
|----------|------|--------------|---------------|
| Azure eastus only | 75 | 64,800 | 5.3 days |
| Azure both regions | 150 | 129,600 | 2.6 days |
| +Runpod (50 A100) | 200 | ~180,000 | 1.9 days |

### Cost Estimates (Azure T4)

| VM Type | vCPUs | Cost/hr | GPUs | Daily Cost |
|---------|-------|---------|------|------------|
| NC4as_T4_v3 | 4 | $0.53 | 1 | $12.72 |
| 75 VMs (eastus) | 300 | $39.75 | 75 | $954 |
| 150 VMs (both) | 600 | $79.50 | 150 | $1,908 |

**Total for 340K videos:**
- 2.6 days × $1,908/day = **~$5,000**

---

## 🛡️ Reliability Features

### OOM Protection

```python
def process_with_oom_protection(video_id, config, max_retries=3):
    for attempt in range(max_retries):
        try:
            return process_video(video_id, config)
        except RuntimeError as e:
            if "CUDA out of memory" in str(e):
                torch.cuda.empty_cache()
                gc.collect()
                config = reduce_batch_size(config, factor=0.7)
                logger.warning(f"OOM on attempt {attempt+1}, reducing batch to {config}")
                continue
            raise
    raise MaxRetriesExceeded(f"Failed after {max_retries} OOM retries")
```

### Job Claim Pattern (Prevents Duplicates)

```sql
-- Atomic job claim with FOR UPDATE SKIP LOCKED
UPDATE videos 
SET status = 'claimed', 
    worker_id = $1, 
    claimed_at = NOW()
WHERE id = (
    SELECT id FROM videos 
    WHERE status = 'pending' 
    ORDER BY created_at 
    LIMIT 1 
    FOR UPDATE SKIP LOCKED
)
RETURNING *;
```

### Heartbeat & Stale Job Recovery

```python
# Workers send heartbeat every 60s
# Coordinator marks jobs as failed if no heartbeat for 5min

async def recover_stale_jobs():
    """Reset jobs claimed but not completed within timeout."""
    await db.execute("""
        UPDATE videos 
        SET status = 'pending', 
            worker_id = NULL, 
            retry_count = retry_count + 1
        WHERE status IN ('claimed', 'processing')
        AND updated_at < NOW() - INTERVAL '5 minutes'
        AND retry_count < max_retries
    """)
```

---

## 🐳 Docker Implementation

### Dockerfile

```dockerfile
FROM pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime

WORKDIR /app

# System dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg git curl \
    && rm -rf /var/lib/apt/lists/*

# Python dependencies
COPY requirements.txt requirements-worker.txt ./
RUN pip install --no-cache-dir -r requirements.txt -r requirements-worker.txt

# Application code
COPY src/ ./src/
COPY worker/ ./worker/
COPY pipeline.py main.py ./

# Environment
ENV PYTHONPATH=/app
ENV HF_HOME=/models/huggingface
ENV TORCH_HOME=/models/torch

# Entry point
ENTRYPOINT ["python", "-m", "worker.main"]
```

### Environment Variables (per deployment)

```bash
# Required
COORDINATOR_URL=https://api.yourdomain.com
R2_ACCESS_ID=c3c9190ae7ff98b10271ea8db6940210
R2_SECRET_KEY=eab9394d02b48a865634105b92c74751ec9a311c56884f7aead5d76476c6b576
R2_ENDPOINT=https://cb908ed13329eb7b186e06ab51bda190.r2.cloudflarestorage.com
R2_BUCKET=diarization-output
HF_TOKEN=hf_xxx

# Optional
WORKER_ID=auto  # Auto-generate if not set
LOG_LEVEL=INFO
MAX_RETRIES=3
```

---

## 📋 Implementation Roadmap

### Phase 1: Coordinator API (Day 1-2)
- [ ] FastAPI server with job dispatch endpoints
- [ ] Supabase schema + connection
- [ ] Job claim with atomic locking
- [ ] Basic dashboard endpoint

### Phase 2: Worker Container (Day 3-4)
- [ ] GPU auto-detection + config tuning
- [ ] Main processing loop
- [ ] R2 upload integration
- [ ] OOM protection + retries

### Phase 3: Testing (Day 5)
- [ ] Single worker end-to-end test
- [ ] Multi-worker concurrency test
- [ ] Failure recovery test

### Phase 4: Deployment (Day 6-7)
- [ ] Push Docker image to Azure Container Registry
- [ ] Deploy to Azure Batch (75 eastus + 75 westus2)
- [ ] Monitor dashboard setup

### Phase 5: Scale (Day 8+)
- [ ] Optional: Runpod integration
- [ ] Monitor and tune
- [ ] Process 340K videos

---

## 🎯 My Opinion on This Approach

### ✅ Strengths

1. **Cloud-agnostic**: Same Docker image works on Azure, Runpod, GCP, on-premise
2. **No duplicates**: Atomic job claiming prevents double-processing
3. **Self-healing**: Stale jobs auto-recover, OOM auto-retunes
4. **Scalable**: Add more workers anytime, they just request jobs
5. **Observable**: Central coordinator tracks all progress
6. **Cost-efficient**: T4s at $0.53/hr vs A100 at $3.70/hr

### ⚠️ Considerations

1. **T4 VRAM limit**: 16GB is tight, requires careful tuning
2. **Coordinator is SPOF**: Consider HA deployment or Supabase's built-in HA
3. **YouTube rate limits**: May need to distribute across IPs/proxies at scale
4. **Network costs**: Azure egress to R2 has costs (~$0.05/GB)

### 🎯 Verdict

**This is the right approach for scaling.** The unified worker pattern is battle-tested (used by Weights & Biases, Hugging Face, etc.) and gives you:

- **Flexibility**: Run anywhere GPUs are available
- **Reliability**: Central state prevents chaos
- **Simplicity**: One Docker image to maintain

The T4 VRAM constraint is manageable with auto-tuning. The biggest risk is YouTube rate limiting at 150 concurrent downloads, but you can mitigate with delays and IP rotation if needed.

**Estimated total processing time: 2.6 days for 340K videos**

---

## 🚀 Next Steps

1. **Create coordinator API** (FastAPI + Supabase)
2. **Create worker Docker image** with auto-tuning
3. **Test on local A100** first
4. **Deploy to Azure Batch**
5. **Monitor and scale**

Ready to start implementation?















