"""
Incremental phase-1 metadata build.

This version avoids the monolithic validation merge that can OOM on the full
recover dataset. Instead it:
  1. builds lightweight tx / flag helper tables once
  2. ingests recover validation shards in prefix batches
  3. ingests historical validation shards in prefix batches, skipping keys
     already covered by recover
  4. writes the final segment map one queue language at a time
  5. computes analytics from the on-disk segment map parquet dataset
"""
from __future__ import annotations

import argparse
import json
import math
import shutil
import time
from pathlib import Path

import duckdb

GOLDEN_LID_AGREE = 3
GOLDEN_CTC_MIN = 0.7
GOLDEN_QUALITY_MIN = 0.5
GOLDEN_DURATION_MIN = 2.0
DISPOSE_CTC_MAX = 0.3
DISPOSE_DURATION_MAX = 1.0

VALIDATION_COLUMNS: list[tuple[str, str]] = [
    ("video_id", "VARCHAR"),
    ("segment_file", "VARCHAR"),
    ("duration_s", "DOUBLE"),
    ("gemini_lang", "VARCHAR"),
    ("gemini_transcription", "VARCHAR"),
    ("gemini_tagged", "VARCHAR"),
    ("gemini_quality_score", "DOUBLE"),
    ("speaker_info", "VARCHAR"),
    ("mms_lang_iso3", "VARCHAR"),
    ("mms_lang_iso1", "VARCHAR"),
    ("mms_confidence", "DOUBLE"),
    ("mms_top3", "VARCHAR"),
    ("vox_lang", "VARCHAR"),
    ("vox_lang_iso1", "VARCHAR"),
    ("vox_confidence", "DOUBLE"),
    ("vox_top3", "VARCHAR"),
    ("conformer_multi_transcription", "VARCHAR"),
    ("conformer_multi_ctc_raw", "DOUBLE"),
    ("conformer_multi_ctc_normalized", "DOUBLE"),
    ("wav2vec_transcription", "VARCHAR"),
    ("wav2vec_ctc_raw", "DOUBLE"),
    ("wav2vec_ctc_normalized", "DOUBLE"),
    ("wav2vec_model_used", "VARCHAR"),
    ("lid_consensus", "BOOLEAN"),
    ("lid_agree_count", "INTEGER"),
    ("consensus_lang", "VARCHAR"),
]


def parse_args():
    p = argparse.ArgumentParser(description="Incremental phase-1 transcript + validation metadata lake")
    p.add_argument("--tx", default="data/transcription_results.parquet")
    p.add_argument("--flags", default="data/transcription_flags.parquet")
    p.add_argument("--queue", default="data/video_queue.csv.gz")
    p.add_argument("--historical-shards", default="data/validation_shards")
    p.add_argument("--recover-shards", default="data/recover_shards_raw")
    p.add_argument("--output-dir", default="data/phase1_incremental")
    p.add_argument("--db-path", default="")
    p.add_argument("--threads", type=int, default=4)
    p.add_argument("--memory-limit", default="24GB")
    p.add_argument("--recover-batch-dirs", type=int, default=8)
    p.add_argument("--historical-batch-dirs", type=int, default=8)
    p.add_argument("--overwrite", action="store_true")
    return p.parse_args()


def remove_path(path: Path):
    if not path.exists():
        return
    if path.is_dir():
        shutil.rmtree(path)
    else:
        path.unlink()


def sql_quote(path: str) -> str:
    return path.replace("'", "''")


def parquet_glob_list_sql(paths: list[str]) -> str:
    quoted = ", ".join(f"'{sql_quote(p)}'" for p in paths)
    return f"[{quoted}]"


def validation_schema_sql(include_bucket: bool = True) -> str:
    cols = [f"{name} {sql_type}" for name, sql_type in VALIDATION_COLUMNS]
    cols.append("validation_source VARCHAR")
    if include_bucket:
        cols.append("provisional_bucket VARCHAR")
    return ", ".join(cols)


def write_json(path: Path, payload: dict):
    path.write_text(json.dumps(payload, indent=2, sort_keys=True))


def fetchone_dict(con: duckdb.DuckDBPyConnection, query: str) -> dict:
    rel = con.execute(query)
    row = rel.fetchone()
    if row is None:
        return {}
    cols = [d[0] for d in rel.description]
    return dict(zip(cols, row))


def shard_prefix_dirs(root: Path) -> list[Path]:
    if not root.exists():
        return []
    dirs = [p for p in sorted(root.iterdir()) if p.is_dir() and any(p.glob("*.parquet"))]
    return dirs


def batched(items: list[Path], size: int) -> list[list[Path]]:
    return [items[i:i + size] for i in range(0, len(items), size)]


def batch_read_sql(dirs: list[Path]) -> str:
    globs = [(d / "*.parquet").as_posix() for d in dirs]
    return f"read_parquet({parquet_glob_list_sql(globs)}, hive_partitioning=false, union_by_name=true)"


def main():
    args = parse_args()
    tx_path = Path(args.tx)
    flags_path = Path(args.flags)
    queue_path = Path(args.queue)
    historical_dir = Path(args.historical_shards)
    recover_dir = Path(args.recover_shards)
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    if not tx_path.exists():
        raise SystemExit(f"Missing transcription parquet: {tx_path}")
    if not flags_path.exists():
        raise SystemExit(f"Missing transcription flags parquet: {flags_path}")
    if not queue_path.exists():
        raise SystemExit(f"Missing queue snapshot: {queue_path}")

    db_path = Path(args.db_path) if args.db_path else output_dir / "phase1_incremental.duckdb"
    segment_map_dir = output_dir / "segment_map_v1"
    analytics_dir = output_dir / "analytics_v1"
    temp_dir = output_dir / "duckdb_tmp"

    if args.overwrite:
        remove_path(db_path)
        remove_path(segment_map_dir)
        remove_path(analytics_dir)
        remove_path(temp_dir)

    analytics_dir.mkdir(parents=True, exist_ok=True)
    temp_dir.mkdir(parents=True, exist_ok=True)

    recover_prefixes = shard_prefix_dirs(recover_dir)
    historical_prefixes = shard_prefix_dirs(historical_dir)
    if not recover_prefixes:
        raise SystemExit(f"No recover shard prefixes found in {recover_dir}")
    if not historical_prefixes:
        raise SystemExit(f"No historical shard prefixes found in {historical_dir}")

    con = duckdb.connect(str(db_path))
    con.execute(f"SET threads = {args.threads}")
    con.execute(f"SET memory_limit = '{args.memory_limit}'")
    con.execute("SET preserve_insertion_order = false")
    con.execute(f"SET temp_directory = '{temp_dir.as_posix()}'")
    con.execute("PRAGMA enable_progress_bar")

    t0 = time.time()
    print("Building phase-1 metadata lake incrementally ...")

    tx_sql = sql_quote(tx_path.as_posix())
    flags_sql = sql_quote(flags_path.as_posix())
    queue_sql = sql_quote(queue_path.as_posix())

    con.execute(f"CREATE OR REPLACE VIEW tx_raw AS SELECT * FROM read_parquet('{tx_sql}')")
    con.execute(f"CREATE OR REPLACE VIEW flags_raw AS SELECT * FROM read_parquet('{flags_sql}')")
    con.execute(f"CREATE OR REPLACE VIEW queue_raw AS SELECT * FROM read_csv_auto('{queue_sql}', header=true)")

    print("  Step 1a: Finding duplicate tx keys ...")
    con.execute("""
        CREATE OR REPLACE TABLE tx_dup_keys AS
        SELECT video_id, segment_file, max(id) AS keep_id
        FROM tx_raw
        GROUP BY video_id, segment_file
        HAVING count(*) > 1
    """)
    dup_count = con.execute("SELECT count(*) FROM tx_dup_keys").fetchone()[0]
    print(f"    duplicate keys: {dup_count:,}")

    print("  Step 1b: Building tx_canonical view ...")
    con.execute("""
        CREATE OR REPLACE VIEW tx_canonical AS
        SELECT
            tx_raw.*,
            regexp_matches(segment_file, '_split[0-9]+$') AS is_split_segment,
            regexp_replace(segment_file, '_split[0-9]+$', '') AS parent_segment_file,
            TRY_CAST(regexp_extract(segment_file, '_split([0-9]+)$', 1) AS INTEGER) AS split_index_from_id
        FROM tx_raw
        WHERE NOT EXISTS (
            SELECT 1 FROM tx_dup_keys dk
            WHERE dk.video_id = tx_raw.video_id
              AND dk.segment_file = tx_raw.segment_file
              AND dk.keep_id != tx_raw.id
        )
    """)

    print("  Step 2: Building tx helper tables ...")
    con.execute("""
        CREATE OR REPLACE TABLE segment_name_uniqueness AS
        SELECT
            segment_file,
            count(DISTINCT video_id) AS segment_name_video_count
        FROM tx_canonical
        GROUP BY segment_file
    """)
    con.execute("""
        CREATE OR REPLACE TABLE flag_summary_by_segment_name AS
        SELECT
            segment_id AS segment_file,
            count(*) AS flag_rows_total,
            count(DISTINCT flag_type) AS flag_types_distinct,
            string_agg(DISTINCT flag_type, ',' ORDER BY flag_type) AS flag_types_csv,
            count(*) FILTER (WHERE flag_type = 'timeout') AS timeout_flag_rows,
            count(*) FILTER (WHERE flag_type = 'error') AS error_flag_rows,
            count(*) FILTER (WHERE flag_type = 'rate_limited') AS rate_limited_flag_rows,
            count(*) FILTER (WHERE flag_type = 'lang_mismatch') AS lang_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'tag_text_mismatch') AS tag_text_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'suspicious_length_ratio') AS suspicious_length_ratio_flag_rows,
            count(*) FILTER (WHERE flag_type = 'high_unk_density') AS high_unk_density_flag_rows,
            count(*) FILTER (WHERE flag_type = 'empty_transcription') AS empty_transcription_flag_rows
        FROM flags_raw
        GROUP BY segment_id
    """)

    val_cols = ", ".join(name for name, _ in VALIDATION_COLUMNS)
    bucket_case = f"""
        CASE
            WHEN lid_consensus = false AND COALESCE(lid_agree_count, 0) < 2 THEN 'dispose'
            WHEN conformer_multi_ctc_normalized IS NOT NULL
                 AND conformer_multi_ctc_normalized < {DISPOSE_CTC_MAX} THEN 'dispose'
            WHEN duration_s < {DISPOSE_DURATION_MAX} THEN 'dispose'
            WHEN COALESCE(lid_agree_count, 0) >= {GOLDEN_LID_AGREE}
                 AND (conformer_multi_ctc_normalized >= {GOLDEN_CTC_MIN}
                      OR conformer_multi_ctc_normalized IS NULL)
                 AND (gemini_quality_score >= {GOLDEN_QUALITY_MIN}
                      OR gemini_quality_score = 0
                      OR gemini_quality_score IS NULL)
                 AND duration_s >= {GOLDEN_DURATION_MIN} THEN 'golden'
            ELSE 'redo'
        END
    """

    con.execute(f"CREATE OR REPLACE TABLE validation_recover_unique ({validation_schema_sql()})")
    con.execute(f"CREATE OR REPLACE TABLE validation_historical_unique ({validation_schema_sql()})")

    print("  Step 3: Ingesting recover validation prefixes ...")
    recover_batches = batched(recover_prefixes, args.recover_batch_dirs)
    for batch_idx, dirs in enumerate(recover_batches, start=1):
        batch_sql = batch_read_sql(dirs)
        con.execute("DROP TABLE IF EXISTS val_batch")
        con.execute(f"""
            CREATE TEMP TABLE val_batch AS
            WITH ranked AS (
                SELECT
                    {val_cols},
                    ROW_NUMBER() OVER (
                        PARTITION BY video_id, segment_file
                        ORDER BY
                            CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                            conformer_multi_ctc_normalized DESC NULLS LAST,
                            mms_confidence DESC NULLS LAST
                    ) AS rn
                FROM {batch_sql}
            )
            SELECT
                {val_cols},
                'recover' AS validation_source,
                {bucket_case} AS provisional_bucket
            FROM ranked
            WHERE rn = 1
        """)
        inserted = con.execute("""
            INSERT INTO validation_recover_unique
            SELECT b.*
            FROM val_batch b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_recover_unique t
                WHERE t.video_id = b.video_id
                  AND t.segment_file = b.segment_file
            )
            RETURNING 1
        """).fetchall()
        total_rows = con.execute("SELECT count(*) FROM validation_recover_unique").fetchone()[0]
        print(
            f"    recover batch {batch_idx}/{len(recover_batches)}: "
            f"{len(dirs)} dirs, inserted {len(inserted):,}, total {total_rows:,}"
        )
        con.execute("DROP TABLE IF EXISTS val_batch")

    con.execute("CREATE INDEX IF NOT EXISTS idx_validation_recover_key ON validation_recover_unique(video_id, segment_file)")

    print("  Step 4: Ingesting historical validation prefixes ...")
    historical_batches = batched(historical_prefixes, args.historical_batch_dirs)
    for batch_idx, dirs in enumerate(historical_batches, start=1):
        batch_sql = batch_read_sql(dirs)
        con.execute("DROP TABLE IF EXISTS val_batch")
        con.execute("DROP TABLE IF EXISTS val_batch_filtered")
        con.execute(f"""
            CREATE TEMP TABLE val_batch AS
            WITH ranked AS (
                SELECT
                    {val_cols},
                    ROW_NUMBER() OVER (
                        PARTITION BY video_id, segment_file
                        ORDER BY
                            CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                            conformer_multi_ctc_normalized DESC NULLS LAST,
                            mms_confidence DESC NULLS LAST
                    ) AS rn
                FROM {batch_sql}
            )
            SELECT
                {val_cols},
                'historical' AS validation_source,
                {bucket_case} AS provisional_bucket
            FROM ranked
            WHERE rn = 1
        """)
        con.execute("""
            CREATE TEMP TABLE val_batch_filtered AS
            SELECT b.*
            FROM val_batch b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_recover_unique r
                WHERE r.video_id = b.video_id
                  AND r.segment_file = b.segment_file
            )
        """)
        inserted = con.execute("""
            INSERT INTO validation_historical_unique
            SELECT b.*
            FROM val_batch_filtered b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_historical_unique t
                WHERE t.video_id = b.video_id
                  AND t.segment_file = b.segment_file
            )
            RETURNING 1
        """).fetchall()
        total_rows = con.execute("SELECT count(*) FROM validation_historical_unique").fetchone()[0]
        print(
            f"    historical batch {batch_idx}/{len(historical_batches)}: "
            f"{len(dirs)} dirs, inserted {len(inserted):,}, total {total_rows:,}"
        )
        con.execute("DROP TABLE IF EXISTS val_batch")
        con.execute("DROP TABLE IF EXISTS val_batch_filtered")

    con.execute("CREATE INDEX IF NOT EXISTS idx_validation_hist_key ON validation_historical_unique(video_id, segment_file)")
    con.execute("""
        CREATE OR REPLACE VIEW validation_final AS
        SELECT * FROM validation_recover_unique
        UNION ALL
        SELECT * FROM validation_historical_unique
    """)
    validation_final_rows = con.execute("SELECT count(*) FROM validation_final").fetchone()[0]
    print(f"    validation_final rows: {validation_final_rows:,}")

    print("  Step 5: Writing segment map by queue language ...")
    segment_map_dir.mkdir(parents=True, exist_ok=True)
    languages = [row[0] for row in con.execute("""
        SELECT DISTINCT COALESCE(language, 'unknown') AS lang
        FROM queue_raw
        ORDER BY lang
    """).fetchall()]
    for lang in languages:
        out_dir = segment_map_dir / f"queue_language={lang}"
        out_dir.mkdir(parents=True, exist_ok=True)
        out_path = out_dir / "part-000.parquet"
        con.execute(f"""
            COPY (
                SELECT
                    tx.video_id,
                    COALESCE(q.language, 'unknown') AS queue_language,
                    tx.segment_file,
                    tx.parent_segment_file,
                    tx.is_split_segment,
                    tx.split_index_from_id,
                    tx.speaker_id,
                    tx.original_start_ms,
                    tx.original_end_ms,
                    tx.trimmed_start_ms,
                    tx.trimmed_end_ms,
                    tx.leading_pad_ms,
                    tx.trailing_pad_ms,
                    tx.expected_language_hint,
                    tx.detected_language AS tx_detected_language,
                    tx.lang_mismatch_flag,
                    tx.transcription,
                    tx.tagged,
                    tx.speaker_emotion,
                    tx.speaker_style,
                    tx.speaker_pace,
                    tx.speaker_accent,
                    tx.num_unk,
                    tx.num_inaudible,
                    tx.num_event_tags,
                    tx.boundary_score,
                    tx.text_length_per_sec,
                    tx.overlap_suspected,
                    tx.quality_score AS tx_quality_score,
                    tx.alignment_score,
                    tx.asr_eligible,
                    tx.tts_clean_eligible,
                    tx.tts_expressive_eligible,
                    tx.prompt_version,
                    tx.schema_version,
                    tx.trimmer_version,
                    tx.validator_version,
                    tx.model_id,
                    tx.temperature,
                    tx.thinking_level,
                    tx.provider,
                    tx.worker_id,
                    tx.cache_hit,
                    tx.token_usage_json,
                    tx.created_at,
                    uniq.segment_name_video_count,
                    uniq.segment_name_video_count = 1 AS flag_join_safe,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_rows_total END AS flag_rows_total,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_distinct END AS flag_types_distinct,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_csv END AS flag_types_csv,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.timeout_flag_rows END AS timeout_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.error_flag_rows END AS error_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.rate_limited_flag_rows END AS rate_limited_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.lang_mismatch_flag_rows END AS lang_mismatch_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.tag_text_mismatch_flag_rows END AS tag_text_mismatch_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.suspicious_length_ratio_flag_rows END AS suspicious_length_ratio_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.high_unk_density_flag_rows END AS high_unk_density_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.empty_transcription_flag_rows END AS empty_transcription_flag_rows,
                    val.validation_source,
                    val.validation_source IS NOT NULL AS has_validation,
                    val.duration_s,
                    val.gemini_lang,
                    val.gemini_transcription,
                    val.gemini_tagged,
                    val.gemini_quality_score,
                    val.speaker_info,
                    val.mms_lang_iso3,
                    val.mms_lang_iso1,
                    val.mms_confidence,
                    val.mms_top3,
                    val.vox_lang,
                    val.vox_lang_iso1,
                    val.vox_confidence,
                    val.vox_top3,
                    val.conformer_multi_transcription,
                    val.conformer_multi_ctc_raw,
                    val.conformer_multi_ctc_normalized,
                    val.wav2vec_transcription,
                    val.wav2vec_ctc_raw,
                    val.wav2vec_ctc_normalized,
                    val.wav2vec_model_used,
                    val.lid_consensus,
                    val.lid_agree_count,
                    val.consensus_lang,
                    COALESCE(val.provisional_bucket, 'missing') AS provisional_bucket
                FROM tx_canonical tx
                LEFT JOIN queue_raw q USING (video_id)
                LEFT JOIN segment_name_uniqueness uniq
                    ON tx.segment_file = uniq.segment_file
                LEFT JOIN flag_summary_by_segment_name flags
                    ON tx.segment_file = flags.segment_file
                LEFT JOIN validation_final val
                    ON tx.video_id = val.video_id
                   AND tx.segment_file = val.segment_file
                WHERE COALESCE(q.language, 'unknown') = '{sql_quote(lang)}'
            ) TO '{sql_quote(out_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
        """)
        print(f"    wrote queue_language={lang}")

    segment_map_glob = sql_quote((segment_map_dir / "**" / "*.parquet").as_posix())
    con.execute(f"""
        CREATE OR REPLACE VIEW segment_map_v1 AS
        SELECT * FROM read_parquet('{segment_map_glob}', hive_partitioning=true, union_by_name=true)
    """)

    print("  Step 6: Writing analytics outputs ...")
    language_rollup_path = analytics_dir / "language_rollup.parquet"
    video_rollup_path = analytics_dir / "video_rollup.parquet"
    bucket_rollup_path = analytics_dir / "bucket_rollup.parquet"
    model_rollup_path = analytics_dir / "model_rollup.parquet"
    disagreement_rollup_path = analytics_dir / "disagreement_rollup.parquet"
    missing_validation_videos_path = analytics_dir / "missing_validation_videos.parquet"

    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) AS total_segments,
                count(DISTINCT video_id) AS total_videos,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 4) AS validation_coverage_pct,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY total_segments DESC
        ) TO '{sql_quote(language_rollup_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS total_segments,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY video_id
        ) TO '{sql_quote(video_rollup_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                provisional_bucket,
                count(*) AS segments,
                count(DISTINCT video_id) AS videos,
                round(sum(duration_s) / 3600, 4) AS hours
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language, provisional_bucket
            ORDER BY queue_language, provisional_bucket
        ) TO '{sql_quote(bucket_rollup_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p10_mms_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p50_mms_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p90_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p10_vox_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p50_vox_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p90_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p10_conformer_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p50_conformer_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p90_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p10_wav2vec_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p50_wav2vec_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p90_wav2vec_ctc
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO '{sql_quote(model_rollup_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE has_validation AND gemini_lang = mms_lang_iso1) AS gemini_mms_match,
                count(*) FILTER (WHERE has_validation AND gemini_lang = vox_lang_iso1) AS gemini_vox_match,
                count(*) FILTER (WHERE has_validation AND mms_lang_iso1 = vox_lang_iso1) AS mms_vox_match,
                count(*) FILTER (
                    WHERE has_validation
                      AND gemini_lang = mms_lang_iso1
                      AND gemini_lang = vox_lang_iso1
                ) AS all_three_agree,
                count(*) FILTER (
                    WHERE has_validation
                      AND (
                          gemini_lang IS NULL OR gemini_lang = ''
                          OR mms_lang_iso1 IS NULL OR mms_lang_iso1 = ''
                          OR vox_lang_iso1 IS NULL OR vox_lang_iso1 = ''
                      )
                ) AS missing_any_lid
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO '{sql_quote(disagreement_rollup_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS missing_validation_segments
            FROM segment_map_v1
            WHERE NOT has_validation
            GROUP BY video_id
            ORDER BY missing_validation_segments DESC, video_id
        ) TO '{sql_quote(missing_validation_videos_path.as_posix())}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    global_summary = fetchone_dict(con, """
        SELECT
            count(*) AS total_segments,
            count(DISTINCT video_id) AS total_videos,
            count(*) FILTER (WHERE has_validation) AS validated_segments,
            count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
            count(DISTINCT CASE WHEN NOT has_validation THEN video_id END) AS videos_with_missing_validation,
            count(*) FILTER (WHERE validation_source = 'historical') AS historical_validation_segments,
            count(*) FILTER (WHERE validation_source = 'recover') AS recover_validation_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
            count(*) FILTER (WHERE provisional_bucket = 'missing') AS missing_bucket_segments,
            count(*) FILTER (WHERE is_split_segment) AS split_segments,
            count(*) FILTER (WHERE flag_join_safe) AS flag_join_safe_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS flag_join_ambiguous_segments,
            round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 6) AS validation_coverage_pct
        FROM segment_map_v1
    """)
    flag_join_summary = fetchone_dict(con, """
        SELECT
            count(*) AS total_segments,
            count(*) FILTER (WHERE flag_join_safe) AS safe_join_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS ambiguous_join_segments,
            count(*) FILTER (WHERE flag_join_safe AND flag_rows_total IS NOT NULL) AS safe_join_segments_with_flags
        FROM segment_map_v1
    """)

    global_summary["phase"] = "phase-1-incremental"
    global_summary["generated_at_epoch_s"] = round(time.time(), 3)
    global_summary["duckdb_path"] = str(db_path)
    global_summary["segment_map_path"] = str(segment_map_dir)

    run_manifest = {
        "phase": "phase-1-incremental",
        "inputs": {
            "tx": str(tx_path),
            "flags": str(flags_path),
            "queue": str(queue_path),
            "historical_shards": str(historical_dir),
            "recover_shards": str(recover_dir),
            "recover_prefixes": len(recover_prefixes),
            "historical_prefixes": len(historical_prefixes),
        },
        "outputs": {
            "duckdb_path": str(db_path),
            "segment_map_v1": str(segment_map_dir),
            "analytics_dir": str(analytics_dir),
            "language_rollup": str(language_rollup_path),
            "video_rollup": str(video_rollup_path),
            "bucket_rollup": str(bucket_rollup_path),
            "model_rollup": str(model_rollup_path),
            "disagreement_rollup": str(disagreement_rollup_path),
            "missing_validation_videos": str(missing_validation_videos_path),
        },
        "thresholds": {
            "golden_lid_agree": GOLDEN_LID_AGREE,
            "golden_ctc_min": GOLDEN_CTC_MIN,
            "golden_quality_min": GOLDEN_QUALITY_MIN,
            "golden_duration_min": GOLDEN_DURATION_MIN,
            "dispose_ctc_max": DISPOSE_CTC_MAX,
            "dispose_duration_max": DISPOSE_DURATION_MAX,
        },
        "global_summary": global_summary,
        "flag_join_summary": flag_join_summary,
        "elapsed_s": round(time.time() - t0, 2),
    }

    write_json(analytics_dir / "global_summary.json", global_summary)
    write_json(analytics_dir / "flag_join_summary.json", flag_join_summary)
    write_json(analytics_dir / "phase1_run_manifest.json", run_manifest)

    print(json.dumps(global_summary, indent=2, sort_keys=True))
    print(f"Phase-1 incremental build complete in {time.time() - t0:.1f}s")


if __name__ == "__main__":
    main()
