---
name: Fast parallel compactor
overview: Remove the per-language lease bottleneck from the compactor, add concurrent microshard downloads, and deploy 200+ workers to finish Stage B in under 1 hour.
todos:
  - id: destroy-current-compactors
    content: Destroy current 12 compactor instances, reset all microshards to pending
    status: in_progress
  - id: new-db-method
    content: Add claim_microshards_batch() to final_export_db.py - single atomic claim without language lease
    status: pending
  - id: parallel-downloads
    content: Add _load_microshard_rows_parallel() with ThreadPoolExecutor(12) in compactor
    status: pending
  - id: simplify-main-loop
    content: Rewrite compactor _main_loop to skip language leases, use new batch claim + parallel load
    status: pending
  - id: increase-claim-limit
    content: Set FINAL_EXPORT_COMPACTOR_CLAIM_LIMIT=250 in deploy script
    status: pending
  - id: rebuild-deploy
    content: Rebuild Docker image, push, deploy 200+ compactor workers
    status: pending
  - id: monitor-completion
    content: Monitor until all 876K microshards compacted, validate final shard counts
    status: pending
isProject: false
---

# Fast Parallel Compactor (Stage B in < 1 hour)

## Problem

The current compactor uses a **per-language lease** (`final_export_language_leases` table) that limits to 1 worker per language. With 12 languages, max 12 workers. Each downloads microshards sequentially. Result: **~70 hour ETA** for 876K microshards / 60.7M segments.

## Root Cause

```mermaid
flowchart LR
    subgraph current [Current: Serial per Language]
        W1[Worker 1] -->|"lease: en"| EN[254K microshards]
        W2[Worker 2] -->|"lease: hi"| HI[180K microshards]
        W3[Worker 3] -->|"lease: te"| TE[178K microshards]
    end
```

The language lease exists in `acquire_language_lease()` in [src/final_export_db.py](src/final_export_db.py) and is enforced via `final_export_language_leases` table (PRIMARY KEY on language). But `claim_microshards_for_language()` already uses `FOR UPDATE SKIP LOCKED` which handles concurrency natively - the lease is redundant.

## Solution: Remove Lease, Add Parallelism

```mermaid
flowchart LR
    subgraph new [New: N Workers per Language]
        W1[Worker 1] -->|"SKIP LOCKED"| EN[en microshards]
        W2[Worker 2] -->|"SKIP LOCKED"| EN
        W3[Worker 3] -->|"SKIP LOCKED"| EN
        W4[Worker 4] -->|"SKIP LOCKED"| HI[hi microshards]
        W5[Worker 5] -->|"SKIP LOCKED"| HI
        WN["...200+ workers"] -->|"SKIP LOCKED"| ALL["any language"]
    end
```

### Code Changes (3 files)

**1. New DB method in [src/final_export_db.py](src/final_export_db.py)**: `claim_microshards_batch()`

Replaces the lease+claim two-step with a single atomic query:

```sql
WITH pick AS (
    SELECT language
    FROM final_export_microshards
    WHERE status = 'pending' AND run_id = $1
    GROUP BY language
    ORDER BY count(*) DESC
    LIMIT 1
)
UPDATE final_export_microshards
SET status = 'claimed', claimed_by = $2, claimed_at = now(),
    attempt_count = attempt_count + 1, updated_at = now()
WHERE microshard_id IN (
    SELECT microshard_id
    FROM final_export_microshards
    WHERE status = 'pending' AND run_id = $1
      AND language = (SELECT language FROM pick)
    ORDER BY created_at, microshard_id
    LIMIT $3
    FOR UPDATE SKIP LOCKED
)
RETURNING microshard_id, video_id, language, ...
```

No lease table involved. Multiple workers naturally partition via `SKIP LOCKED`.

**2. Add concurrent downloads in [src/final_export_compactor.py](src/final_export_compactor.py)**: `_load_microshard_rows_parallel()`

Current code downloads microshards one-by-one. New method uses `ThreadPoolExecutor` with 8-16 threads to download concurrently:

```python
from concurrent.futures import ThreadPoolExecutor, as_completed

def _load_microshard_rows_parallel(self, jobs, max_workers=12):
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(self._load_microshard_rows, job): job for job in jobs}
        for future in as_completed(futures):
            buffer.extend(future.result())
```

**3. Simplify `_main_loop` in [src/final_export_compactor.py](src/final_export_compactor.py)**:

Remove all language lease calls (`acquire_language_lease`, `release_language_lease`, `heartbeat_language_lease`). New loop:

```
while not shutdown:
    jobs = db.claim_microshards_batch(limit=250)
    if not jobs: break
    segments = load_parallel(jobs)
    build shard from segments
    upload shard
    commit consumption
```

**4. Increase claim limit**: Set `FINAL_EXPORT_COMPACTOR_CLAIM_LIMIT=250` in deploy script (from 32). This claims ~250 microshards per batch (~17K segments), roughly one shard per claim.

### Infrastructure Steps

1. Destroy current 12 compactors
2. Reset ALL microshards to `pending` (SQL: `UPDATE final_export_microshards SET status='pending', claimed_by=NULL WHERE run_id='production-20260312'`)
3. Rebuild Docker image and push
4. Deploy 200+ workers with `--stage compact --allow-partial-shards`

### Time Estimate

- 876K microshards across 200 workers = ~4,380 each
- With 12-thread parallel downloads: ~8 microshards/sec/worker
- Per worker: 4,380 / 8 = 547 seconds = ~9 min for downloads
- Shard building + upload overhead: ~2x = ~20 min
- Buffer: **~30-45 minutes total**
