"""
Build the phase-1 metadata lake for transcript + validation analytics.

Phase 1 intentionally stays metadata-only:
  - dedupe the transcription corpus
  - dedupe and merge historical + recover validation shards
  - build one canonical per-segment parquet map
  - emit analytics rollups and provisional bucket assignments

This avoids replaying raw audio or materializing child audio segments while
still giving us a nearly complete final corpus for thresholding and redo work.
"""
from __future__ import annotations

import argparse
import json
import shutil
import time
from pathlib import Path

import duckdb

GOLDEN_LID_AGREE = 3
GOLDEN_CTC_MIN = 0.7
GOLDEN_QUALITY_MIN = 0.5
GOLDEN_DURATION_MIN = 2.0
DISPOSE_CTC_MAX = 0.3
DISPOSE_DURATION_MAX = 1.0

VALIDATION_COLUMNS: list[tuple[str, str]] = [
    ("video_id", "VARCHAR"),
    ("segment_file", "VARCHAR"),
    ("duration_s", "DOUBLE"),
    ("gemini_lang", "VARCHAR"),
    ("gemini_transcription", "VARCHAR"),
    ("gemini_tagged", "VARCHAR"),
    ("gemini_quality_score", "DOUBLE"),
    ("speaker_info", "VARCHAR"),
    ("mms_lang_iso3", "VARCHAR"),
    ("mms_lang_iso1", "VARCHAR"),
    ("mms_confidence", "DOUBLE"),
    ("mms_top3", "VARCHAR"),
    ("vox_lang", "VARCHAR"),
    ("vox_lang_iso1", "VARCHAR"),
    ("vox_confidence", "DOUBLE"),
    ("vox_top3", "VARCHAR"),
    ("conformer_multi_transcription", "VARCHAR"),
    ("conformer_multi_ctc_raw", "DOUBLE"),
    ("conformer_multi_ctc_normalized", "DOUBLE"),
    ("wav2vec_transcription", "VARCHAR"),
    ("wav2vec_ctc_raw", "DOUBLE"),
    ("wav2vec_ctc_normalized", "DOUBLE"),
    ("wav2vec_model_used", "VARCHAR"),
    ("lid_consensus", "BOOLEAN"),
    ("lid_agree_count", "INTEGER"),
    ("consensus_lang", "VARCHAR"),
]


def parse_args():
    p = argparse.ArgumentParser(description="Build phase-1 transcript + validation metadata lake")
    p.add_argument("--tx", default="data/transcription_results.parquet")
    p.add_argument("--flags", default="data/transcription_flags.parquet")
    p.add_argument("--queue", default="data/video_queue.csv.gz")
    p.add_argument("--historical-shards", default="data/validation_shards")
    p.add_argument("--recover-shards", default="data/recover_validation_shards")
    p.add_argument("--output-dir", default="data/phase1")
    p.add_argument("--db-path", default="")
    p.add_argument("--threads", type=int, default=8)
    p.add_argument("--memory-limit", default="24GB")
    p.add_argument("--overwrite", action="store_true")
    return p.parse_args()


def remove_path(path: Path):
    if not path.exists():
        return
    if path.is_dir():
        shutil.rmtree(path)
    else:
        path.unlink()


def has_parquet_files(path: Path) -> bool:
    return path.exists() and any(path.rglob("*.parquet"))


def empty_validation_select() -> str:
    cols = ", ".join(f"CAST(NULL AS {sql_type}) AS {name}" for name, sql_type in VALIDATION_COLUMNS)
    return f"SELECT {cols} WHERE 1 = 0"


def fetchone_dict(con: duckdb.DuckDBPyConnection, query: str) -> dict:
    rel = con.execute(query)
    row = rel.fetchone()
    if row is None:
        return {}
    cols = [d[0] for d in rel.description]
    return dict(zip(cols, row))


def write_json(path: Path, payload: dict):
    path.write_text(json.dumps(payload, indent=2, sort_keys=True))


def main():
    args = parse_args()
    tx_path = Path(args.tx)
    flags_path = Path(args.flags)
    queue_path = Path(args.queue)
    historical_dir = Path(args.historical_shards)
    recover_dir = Path(args.recover_shards)
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    if not tx_path.exists():
        raise SystemExit(f"Missing transcription parquet: {tx_path}")
    if not flags_path.exists():
        raise SystemExit(f"Missing transcription flags parquet: {flags_path}")
    if not queue_path.exists():
        raise SystemExit(f"Missing queue snapshot: {queue_path}")

    db_path = Path(args.db_path) if args.db_path else output_dir / "phase1.duckdb"
    segment_map_dir = output_dir / "segment_map_v1"
    analytics_dir = output_dir / "analytics_v1"

    if args.overwrite:
        remove_path(db_path)
        remove_path(segment_map_dir)
        remove_path(analytics_dir)

    analytics_dir.mkdir(parents=True, exist_ok=True)

    historical_available = has_parquet_files(historical_dir)
    recover_available = has_parquet_files(recover_dir)
    historical_glob = (historical_dir / "**" / "*.parquet").as_posix()
    recover_glob = (recover_dir / "**" / "*.parquet").as_posix()

    con = duckdb.connect(str(db_path))
    con.execute(f"SET threads = {args.threads}")
    con.execute(f"SET memory_limit = '{args.memory_limit}'")

    t0 = time.time()
    print("Building phase-1 metadata lake ...")

    tx_sql = tx_path.as_posix().replace("'", "''")
    flags_sql = flags_path.as_posix().replace("'", "''")
    queue_sql = queue_path.as_posix().replace("'", "''")

    con.execute(f"""
        CREATE OR REPLACE VIEW tx_raw AS
        SELECT * FROM read_parquet('{tx_sql}')
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW flags_raw AS
        SELECT * FROM read_parquet('{flags_sql}')
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW queue_raw AS
        SELECT * FROM read_csv_auto('{queue_sql}', header=true)
    """)

    historical_select = (
        f"SELECT {', '.join(name for name, _ in VALIDATION_COLUMNS)} "
        f"FROM read_parquet('{historical_glob}', hive_partitioning=false)"
        if historical_available
        else empty_validation_select()
    )
    recover_select = (
        f"SELECT {', '.join(name for name, _ in VALIDATION_COLUMNS)} "
        f"FROM read_parquet('{recover_glob}', hive_partitioning=false)"
        if recover_available
        else empty_validation_select()
    )

    con.execute(f"""
        CREATE OR REPLACE TABLE tx_canonical AS
        WITH ranked AS (
            SELECT
                tx_raw.*,
                ROW_NUMBER() OVER (
                    PARTITION BY video_id, segment_file
                    ORDER BY created_at DESC NULLS LAST, id DESC
                ) AS rn
            FROM tx_raw
        )
        SELECT
            *,
            regexp_matches(segment_file, '_split[0-9]+$') AS is_split_segment,
            regexp_replace(segment_file, '_split[0-9]+$', '') AS parent_segment_file,
            TRY_CAST(regexp_extract(segment_file, '_split([0-9]+)$', 1) AS INTEGER) AS split_index_from_id
        FROM ranked
        WHERE rn = 1
    """)

    con.execute("""
        CREATE OR REPLACE TABLE segment_name_uniqueness AS
        SELECT
            segment_file,
            count(DISTINCT video_id) AS segment_name_video_count
        FROM tx_canonical
        GROUP BY segment_file
    """)

    con.execute("""
        CREATE OR REPLACE TABLE flag_summary_by_segment_name AS
        SELECT
            segment_id AS segment_file,
            count(*) AS flag_rows_total,
            count(DISTINCT flag_type) AS flag_types_distinct,
            string_agg(DISTINCT flag_type, ',' ORDER BY flag_type) AS flag_types_csv,
            count(*) FILTER (WHERE flag_type = 'timeout') AS timeout_flag_rows,
            count(*) FILTER (WHERE flag_type = 'error') AS error_flag_rows,
            count(*) FILTER (WHERE flag_type = 'rate_limited') AS rate_limited_flag_rows,
            count(*) FILTER (WHERE flag_type = 'lang_mismatch') AS lang_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'tag_text_mismatch') AS tag_text_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'suspicious_length_ratio') AS suspicious_length_ratio_flag_rows,
            count(*) FILTER (WHERE flag_type = 'high_unk_density') AS high_unk_density_flag_rows,
            count(*) FILTER (WHERE flag_type = 'empty_transcription') AS empty_transcription_flag_rows
        FROM flags_raw
        GROUP BY segment_id
    """)

    con.execute(f"""
        CREATE OR REPLACE TABLE validation_final AS
        WITH unioned AS (
            SELECT
                {', '.join(name for name, _ in VALIDATION_COLUMNS)},
                'historical' AS validation_source,
                1 AS source_rank
            FROM ({historical_select})
            UNION ALL
            SELECT
                {', '.join(name for name, _ in VALIDATION_COLUMNS)},
                'recover' AS validation_source,
                0 AS source_rank
            FROM ({recover_select})
        ),
        ranked AS (
            SELECT
                unioned.*,
                ROW_NUMBER() OVER (
                    PARTITION BY video_id, segment_file
                    ORDER BY
                        source_rank ASC,
                        CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                        conformer_multi_ctc_normalized DESC NULLS LAST,
                        mms_confidence DESC NULLS LAST,
                        vox_confidence DESC NULLS LAST,
                        gemini_quality_score DESC NULLS LAST
                ) AS rn
            FROM unioned
        )
        SELECT
            video_id,
            segment_file,
            duration_s,
            gemini_lang,
            gemini_transcription,
            gemini_tagged,
            gemini_quality_score,
            speaker_info,
            mms_lang_iso3,
            mms_lang_iso1,
            mms_confidence,
            mms_top3,
            vox_lang,
            vox_lang_iso1,
            vox_confidence,
            vox_top3,
            conformer_multi_transcription,
            conformer_multi_ctc_raw,
            conformer_multi_ctc_normalized,
            wav2vec_transcription,
            wav2vec_ctc_raw,
            wav2vec_ctc_normalized,
            wav2vec_model_used,
            lid_consensus,
            lid_agree_count,
            consensus_lang,
            validation_source,
            CASE
                WHEN lid_consensus = false AND COALESCE(lid_agree_count, 0) < 2 THEN 'dispose'
                WHEN conformer_multi_ctc_normalized IS NOT NULL
                     AND conformer_multi_ctc_normalized < {DISPOSE_CTC_MAX} THEN 'dispose'
                WHEN duration_s < {DISPOSE_DURATION_MAX} THEN 'dispose'
                WHEN COALESCE(lid_agree_count, 0) >= {GOLDEN_LID_AGREE}
                     AND (conformer_multi_ctc_normalized >= {GOLDEN_CTC_MIN}
                          OR conformer_multi_ctc_normalized IS NULL)
                     AND (gemini_quality_score >= {GOLDEN_QUALITY_MIN}
                          OR gemini_quality_score = 0
                          OR gemini_quality_score IS NULL)
                     AND duration_s >= {GOLDEN_DURATION_MIN} THEN 'golden'
                ELSE 'redo'
            END AS provisional_bucket
        FROM ranked
        WHERE rn = 1
    """)

    con.execute("""
        CREATE OR REPLACE TABLE segment_map_v1 AS
        SELECT
            tx.video_id,
            COALESCE(q.language, 'unknown') AS queue_language,
            tx.segment_file,
            tx.parent_segment_file,
            tx.is_split_segment,
            tx.split_index_from_id,
            tx.speaker_id,
            tx.original_start_ms,
            tx.original_end_ms,
            tx.trimmed_start_ms,
            tx.trimmed_end_ms,
            tx.leading_pad_ms,
            tx.trailing_pad_ms,
            tx.expected_language_hint,
            tx.detected_language AS tx_detected_language,
            tx.lang_mismatch_flag,
            tx.transcription,
            tx.tagged,
            tx.speaker_emotion,
            tx.speaker_style,
            tx.speaker_pace,
            tx.speaker_accent,
            tx.num_unk,
            tx.num_inaudible,
            tx.num_event_tags,
            tx.boundary_score,
            tx.text_length_per_sec,
            tx.overlap_suspected,
            tx.quality_score AS tx_quality_score,
            tx.alignment_score,
            tx.asr_eligible,
            tx.tts_clean_eligible,
            tx.tts_expressive_eligible,
            tx.prompt_version,
            tx.schema_version,
            tx.trimmer_version,
            tx.validator_version,
            tx.model_id,
            tx.temperature,
            tx.thinking_level,
            tx.provider,
            tx.worker_id,
            tx.cache_hit,
            tx.token_usage_json,
            tx.created_at,
            uniq.segment_name_video_count,
            uniq.segment_name_video_count = 1 AS flag_join_safe,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_rows_total END AS flag_rows_total,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_distinct END AS flag_types_distinct,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_csv END AS flag_types_csv,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.timeout_flag_rows END AS timeout_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.error_flag_rows END AS error_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.rate_limited_flag_rows END AS rate_limited_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.lang_mismatch_flag_rows END AS lang_mismatch_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.tag_text_mismatch_flag_rows END AS tag_text_mismatch_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.suspicious_length_ratio_flag_rows END AS suspicious_length_ratio_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.high_unk_density_flag_rows END AS high_unk_density_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.empty_transcription_flag_rows END AS empty_transcription_flag_rows,
            val.validation_source,
            val.validation_source IS NOT NULL AS has_validation,
            val.duration_s,
            val.gemini_lang,
            val.gemini_transcription,
            val.gemini_tagged,
            val.gemini_quality_score,
            val.speaker_info,
            val.mms_lang_iso3,
            val.mms_lang_iso1,
            val.mms_confidence,
            val.mms_top3,
            val.vox_lang,
            val.vox_lang_iso1,
            val.vox_confidence,
            val.vox_top3,
            val.conformer_multi_transcription,
            val.conformer_multi_ctc_raw,
            val.conformer_multi_ctc_normalized,
            val.wav2vec_transcription,
            val.wav2vec_ctc_raw,
            val.wav2vec_ctc_normalized,
            val.wav2vec_model_used,
            val.lid_consensus,
            val.lid_agree_count,
            val.consensus_lang,
            COALESCE(val.provisional_bucket, 'missing') AS provisional_bucket
        FROM tx_canonical tx
        LEFT JOIN queue_raw q USING (video_id)
        LEFT JOIN segment_name_uniqueness uniq
            ON tx.segment_file = uniq.segment_file
        LEFT JOIN flag_summary_by_segment_name flags
            ON tx.segment_file = flags.segment_file
        LEFT JOIN validation_final val
            ON tx.video_id = val.video_id
           AND tx.segment_file = val.segment_file
    """)

    segment_map_sql = segment_map_dir.as_posix().replace("'", "''")
    con.execute(f"""
        COPY segment_map_v1
        TO '{segment_map_sql}'
        (FORMAT PARQUET, COMPRESSION ZSTD, PARTITION_BY (queue_language))
    """)

    language_rollup_path = analytics_dir / "language_rollup.parquet"
    video_rollup_path = analytics_dir / "video_rollup.parquet"
    bucket_rollup_path = analytics_dir / "bucket_rollup.parquet"
    model_rollup_path = analytics_dir / "model_rollup.parquet"
    disagreement_rollup_path = analytics_dir / "disagreement_rollup.parquet"
    missing_validation_videos_path = analytics_dir / "missing_validation_videos.parquet"

    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) AS total_segments,
                count(DISTINCT video_id) AS total_videos,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 4) AS validation_coverage_pct,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY total_segments DESC
        ) TO '{language_rollup_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    con.execute(f"""
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS total_segments,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY video_id
        ) TO '{video_rollup_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                provisional_bucket,
                count(*) AS segments,
                count(DISTINCT video_id) AS videos,
                round(sum(duration_s) / 3600, 4) AS hours
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language, provisional_bucket
            ORDER BY queue_language, provisional_bucket
        ) TO '{bucket_rollup_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p10_mms_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p50_mms_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p90_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p10_vox_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p50_vox_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p90_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p10_conformer_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p50_conformer_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p90_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p10_wav2vec_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p50_wav2vec_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p90_wav2vec_ctc
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO '{model_rollup_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    con.execute(f"""
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE has_validation AND gemini_lang = mms_lang_iso1) AS gemini_mms_match,
                count(*) FILTER (WHERE has_validation AND gemini_lang = vox_lang_iso1) AS gemini_vox_match,
                count(*) FILTER (WHERE has_validation AND mms_lang_iso1 = vox_lang_iso1) AS mms_vox_match,
                count(*) FILTER (
                    WHERE has_validation
                      AND gemini_lang = mms_lang_iso1
                      AND gemini_lang = vox_lang_iso1
                ) AS all_three_agree,
                count(*) FILTER (
                    WHERE has_validation
                      AND (
                          gemini_lang IS NULL OR gemini_lang = ''
                          OR mms_lang_iso1 IS NULL OR mms_lang_iso1 = ''
                          OR vox_lang_iso1 IS NULL OR vox_lang_iso1 = ''
                      )
                ) AS missing_any_lid
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO '{disagreement_rollup_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    con.execute(f"""
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS missing_validation_segments
            FROM segment_map_v1
            WHERE NOT has_validation
            GROUP BY video_id
            ORDER BY missing_validation_segments DESC, video_id
        ) TO '{missing_validation_videos_path.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    global_summary = fetchone_dict(con, """
        SELECT
            count(*) AS total_segments,
            count(DISTINCT video_id) AS total_videos,
            count(*) FILTER (WHERE has_validation) AS validated_segments,
            count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
            count(DISTINCT CASE WHEN NOT has_validation THEN video_id END) AS videos_with_missing_validation,
            count(*) FILTER (WHERE validation_source = 'historical') AS historical_validation_segments,
            count(*) FILTER (WHERE validation_source = 'recover') AS recover_validation_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
            count(*) FILTER (WHERE provisional_bucket = 'missing') AS missing_bucket_segments,
            count(*) FILTER (WHERE is_split_segment) AS split_segments,
            count(*) FILTER (WHERE flag_join_safe) AS flag_join_safe_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS flag_join_ambiguous_segments,
            round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 6) AS validation_coverage_pct
        FROM segment_map_v1
    """)
    global_summary["historical_shards_available"] = historical_available
    global_summary["recover_shards_available"] = recover_available
    global_summary["phase"] = "phase-1"
    global_summary["generated_at_epoch_s"] = round(time.time(), 3)
    global_summary["duckdb_path"] = str(db_path)
    global_summary["segment_map_path"] = str(segment_map_dir)

    flag_join_summary = fetchone_dict(con, """
        SELECT
            count(*) AS total_segments,
            count(*) FILTER (WHERE flag_join_safe) AS safe_join_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS ambiguous_join_segments,
            count(*) FILTER (WHERE flag_join_safe AND flag_rows_total IS NOT NULL) AS safe_join_segments_with_flags
        FROM segment_map_v1
    """)

    run_manifest = {
        "phase": "phase-1",
        "inputs": {
            "tx": str(tx_path),
            "flags": str(flags_path),
            "queue": str(queue_path),
            "historical_shards": str(historical_dir),
            "recover_shards": str(recover_dir),
            "historical_shards_available": historical_available,
            "recover_shards_available": recover_available,
        },
        "outputs": {
            "duckdb_path": str(db_path),
            "segment_map_v1": str(segment_map_dir),
            "analytics_dir": str(analytics_dir),
            "language_rollup": str(language_rollup_path),
            "video_rollup": str(video_rollup_path),
            "bucket_rollup": str(bucket_rollup_path),
            "model_rollup": str(model_rollup_path),
            "disagreement_rollup": str(disagreement_rollup_path),
            "missing_validation_videos": str(missing_validation_videos_path),
        },
        "thresholds": {
            "golden_lid_agree": GOLDEN_LID_AGREE,
            "golden_ctc_min": GOLDEN_CTC_MIN,
            "golden_quality_min": GOLDEN_QUALITY_MIN,
            "golden_duration_min": GOLDEN_DURATION_MIN,
            "dispose_ctc_max": DISPOSE_CTC_MAX,
            "dispose_duration_max": DISPOSE_DURATION_MAX,
        },
        "global_summary": global_summary,
        "flag_join_summary": flag_join_summary,
        "elapsed_s": round(time.time() - t0, 2),
    }

    write_json(analytics_dir / "global_summary.json", global_summary)
    write_json(analytics_dir / "flag_join_summary.json", flag_join_summary)
    write_json(analytics_dir / "phase1_run_manifest.json", run_manifest)

    print(json.dumps(global_summary, indent=2, sort_keys=True))
    print(f"Phase-1 build complete in {time.time() - t0:.1f}s")


if __name__ == "__main__":
    main()
