# Audio Pipeline Optimization Strategy

## Target: 500k Videos in <24 Hours

### Current State
```
Performance:     26.1 seg/s, 7.56 videos/min
Bottleneck:      pyloudnorm LUFS computation (45% of time)
GPU Utilization: <5% (only resampling)
Time for 500k:   ~46 days
```

### Target State
```
Required:        500k videos / 24h = 347 videos/min
Speedup needed:  347 / 7.56 = 46x
```

---

## Strategy Overview

We'll achieve this through **3 parallel optimization tracks**:

| Track | Optimization | Expected Speedup | Cost Impact |
|-------|--------------|------------------|-------------|
| **A** | GPU-accelerated LUFS + metrics | 4-6x | Same hardware |
| **B** | Pipeline parallelization | 2-3x | Same hardware |
| **C** | Horizontal scaling | 8-16x | Linear scaling |

**Combined target: 48-288x speedup → 500k in 4-24 hours**

---

## Track A: GPU-Accelerated Audio Metrics

### The Problem with pyloudnorm

pyloudnorm is pure Python and computes LUFS sequentially:
1. K-weighting filter (2x IIR biquad cascades)
2. Mean square computation
3. Gating (400ms blocks with 75% overlap)
4. Integration

**Solution: GPU-native LUFS with torchaudio**

```python
# Current: 26.1ms per segment (sequential, CPU)
# Target:  0.5ms per segment (batched, GPU)
```

### GPU LUFS Implementation

The EBU R128 LUFS algorithm can be fully parallelized:

1. **K-weighting filters** → `torchaudio.functional.biquad` (batched)
2. **Block mean square** → `torch.unfold` + batched mean
3. **Gating** → Vectorized threshold comparison
4. **Integration** → Batched reduction

### Performance Projection

| Operation | CPU (pyloudnorm) | GPU (batched 128) | Speedup |
|-----------|------------------|-------------------|---------|
| LUFS | 26.1ms | ~0.8ms | 32x |
| Spectral | 8ms | ~0.3ms | 26x |
| Resample | 12ms | 0.5ms | 24x |
| **Total** | 57.6ms | ~5ms | **11x** |

---

## Track B: Pipeline Architecture Optimization

### Current Architecture (Sequential)
```
Download → Decode → Metrics → Chunk → Resample → Encode → Write
           [------------ Sequential per segment ------------]
```

### Optimized Architecture (Pipelined + Batched)
```
┌─────────────────────────────────────────────────────────────┐
│                    PRODUCER PROCESSES                        │
│  Download Pool (16 workers) → Decode Pool (8 workers)       │
│            ↓                         ↓                       │
│        [Download Queue]         [Segment Queue]              │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                    GPU BATCH PROCESSOR                       │
│  Batch Accumulator (128 segments) → GPU Kernel              │
│  - K-weight filter (batched)                                 │
│  - LUFS computation (batched)                                │
│  - Spectral analysis (batched FFT)                           │
│  - Resampling (batched)                                      │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                   CONSUMER PROCESSES                         │
│  Encode Pool (12 workers) → Shard Writers (4 workers)       │
│            ↓                         ↓                       │
│       [Output Queue]           [Upload Queue]                │
└─────────────────────────────────────────────────────────────┘
```

### Key Optimizations

1. **Async I/O for downloads** - Don't block GPU on network
2. **Pre-fetch next batch** - GPU never waits for data
3. **Streaming encode** - Encode while GPU processes next batch
4. **Batched shard writes** - Reduce filesystem syscalls

---

## Track C: Horizontal Scaling Strategy

### Compute Requirements

For 500k videos in 24 hours with optimized code (11x speedup):

```
Single node: 7.56 × 11 = 83 videos/min
Required:    347 videos/min
Nodes needed: ceil(347 / 83) = 5 nodes
Safety margin: 8 nodes (2.3x buffer)
```

### Recommended Instance Types

| Provider | Instance | vCPUs | GPU | Memory | Cost/hr |
|----------|----------|-------|-----|--------|---------|
| AWS | g5.4xlarge | 16 | A10G 24GB | 64GB | ~$1.00 |
| AWS | g5.8xlarge | 32 | A10G 24GB | 128GB | ~$2.00 |
| AWS | p4d.24xlarge | 96 | 8×A100 | 1.5TB | ~$32.77 |
| GCP | g2-standard-16 | 16 | L4 24GB | 64GB | ~$0.80 |

**Recommendation: 8× g5.4xlarge ($8/hr total) or 2× g5.8xlarge with better CPU/GPU ratio**

### Partitioning Strategy

```sql
-- Each worker gets deterministic partition
SELECT youtube_id, r2_tar_key
FROM videos
WHERE language = $1
  AND status = 'COMPLETED'
  AND mod(abs(hashtext(youtube_id)), $partition_count) = $partition_index
```

This ensures:
- No overlap between workers
- Deterministic assignment (restart-safe)
- Even distribution

---

## Monitoring & Orchestration

### Progress Tracking Schema (Supabase)

```sql
CREATE TABLE pipeline_runs (
  run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  language TEXT NOT NULL,
  total_videos INT NOT NULL,
  started_at TIMESTAMPTZ DEFAULT NOW(),
  completed_at TIMESTAMPTZ,
  status TEXT DEFAULT 'running'
);

CREATE TABLE worker_progress (
  worker_id TEXT PRIMARY KEY,
  run_id UUID REFERENCES pipeline_runs(run_id),
  partition_index INT NOT NULL,
  partition_count INT NOT NULL,
  hostname TEXT,
  instance_type TEXT,
  
  -- Progress counters
  videos_total INT DEFAULT 0,
  videos_processed INT DEFAULT 0,
  segments_total INT DEFAULT 0,
  segments_processed INT DEFAULT 0,
  segments_dropped INT DEFAULT 0,
  
  -- Performance metrics
  videos_per_minute FLOAT DEFAULT 0,
  segments_per_second FLOAT DEFAULT 0,
  gpu_utilization FLOAT DEFAULT 0,
  
  -- Status
  status TEXT DEFAULT 'starting',
  last_heartbeat TIMESTAMPTZ DEFAULT NOW(),
  last_error TEXT,
  error_count INT DEFAULT 0,
  
  started_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE processing_errors (
  id SERIAL PRIMARY KEY,
  worker_id TEXT,
  run_id UUID,
  video_id TEXT,
  error_type TEXT,
  error_message TEXT,
  stack_trace TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Real-time aggregation view
CREATE VIEW run_progress AS
SELECT 
  r.run_id,
  r.language,
  r.total_videos,
  SUM(w.videos_processed) as processed,
  SUM(w.videos_total) as assigned,
  AVG(w.videos_per_minute) as avg_rate,
  SUM(w.videos_per_minute) as total_rate,
  COUNT(*) FILTER (WHERE w.status = 'running') as active_workers,
  COUNT(*) FILTER (WHERE w.status = 'failed') as failed_workers,
  EXTRACT(EPOCH FROM (NOW() - r.started_at)) / 3600 as hours_elapsed,
  (r.total_videos - SUM(w.videos_processed)) / NULLIF(SUM(w.videos_per_minute), 0) / 60 as hours_remaining
FROM pipeline_runs r
LEFT JOIN worker_progress w ON w.run_id = r.run_id
GROUP BY r.run_id;
```

### Worker Heartbeat & Monitoring

```python
class ProgressReporter:
    def __init__(self, supabase_url: str, supabase_key: str, run_id: str, worker_id: str):
        self.client = create_client(supabase_url, supabase_key)
        self.run_id = run_id
        self.worker_id = worker_id
        self.last_report = time.time()
        self.report_interval = 10  # seconds
        
    def update(self, videos_processed: int, segments_processed: int, 
               videos_per_minute: float, segments_per_second: float):
        now = time.time()
        if now - self.last_report < self.report_interval:
            return
            
        self.client.table('worker_progress').upsert({
            'worker_id': self.worker_id,
            'run_id': self.run_id,
            'videos_processed': videos_processed,
            'segments_processed': segments_processed,
            'videos_per_minute': videos_per_minute,
            'segments_per_second': segments_per_second,
            'last_heartbeat': datetime.utcnow().isoformat(),
            'status': 'running'
        }).execute()
        self.last_report = now
```

---

## Implementation Phases

### Phase 1: GPU-Accelerated Metrics (Day 1)
- [ ] Implement GPU LUFS computation
- [ ] Implement GPU spectral analysis
- [ ] Batch processing with async I/O
- [ ] Benchmark: Target 80+ seg/s single GPU

### Phase 2: Horizontal Scaling Infrastructure (Day 1)
- [ ] Supabase schema setup
- [ ] Worker orchestration script
- [ ] Progress monitoring dashboard
- [ ] Auto-recovery on failure

### Phase 3: Deployment & Execution (Day 2)
- [ ] Deploy to 8 GPU instances
- [ ] Start processing all 12 languages
- [ ] Monitor and adjust
- [ ] Handle failures and retries

---

## Cost Estimate (24-hour run)

| Component | Units | Cost/hr | Hours | Total |
|-----------|-------|---------|-------|-------|
| g5.4xlarge | 8 | $1.00 | 24 | $192 |
| R2 egress | ~5TB | - | - | ~$0 (free) |
| Supabase | Pro | - | - | $25/mo |
| **Total** | | | | **~$220** |

---

## Quick Start

```bash
# 1. Deploy infrastructure
./deploy/setup_workers.sh 8  # Spin up 8 GPU instances

# 2. Start orchestrator
python orchestrator.py --language all --workers 8 --run-id $(uuidgen)

# 3. Monitor progress
python monitor.py --run-id <run_id>  # Real-time dashboard
```

---

## Files to Create

1. `gpu_audio_metrics.py` - GPU-accelerated LUFS and spectral analysis
2. `optimized_pipeline.py` - Pipelined batch processor
3. `orchestrator.py` - Multi-node job orchestration
4. `monitor.py` - Real-time progress dashboard
5. `deploy/` - Infrastructure as code