from __future__ import annotations

import json
import time
from pathlib import Path

import duckdb


GOLDEN_LID_AGREE = 3
GOLDEN_CTC_MIN = 0.7
GOLDEN_QUALITY_MIN = 0.5
GOLDEN_DURATION_MIN = 2.0
DISPOSE_CTC_MAX = 0.3
DISPOSE_DURATION_MAX = 1.0

ROOT = Path("/home/ubuntu/transcripts")
DATA_DIR = ROOT / "data"
OUTPUT_DIR = ROOT / "final_data"

SEGMENT_MAP_GLOB = (DATA_DIR / "phase1_incremental" / "segment_map_v1" / "**" / "*.parquet").as_posix()
VIDEO_ROLLUP_V1 = (DATA_DIR / "phase1_incremental" / "analytics_v1" / "video_rollup.parquet").as_posix()
RECOVER_V2 = (DATA_DIR / "recover_v2_consolidated.parquet").as_posix()
FINAL_VIDEO_SELECTION = (DATA_DIR / "video_tts_classification_final.csv").as_posix()
RAW_TX = (DATA_DIR / "transcription_results.parquet").as_posix()


def write_json(path: Path, payload: dict) -> None:
    path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")


def fetchone_dict(con: duckdb.DuckDBPyConnection, query: str) -> dict:
    rel = con.execute(query)
    row = rel.fetchone()
    if row is None:
        return {}
    cols = [d[0] for d in rel.description]
    return dict(zip(cols, row))


def bucket_case(alias: str = "") -> str:
    prefix = f"{alias}." if alias else ""
    return f"""
        CASE
            WHEN {prefix}lid_consensus = false AND COALESCE({prefix}lid_agree_count, 0) < 2 THEN 'dispose'
            WHEN {prefix}conformer_multi_ctc_normalized IS NOT NULL
                 AND {prefix}conformer_multi_ctc_normalized < {DISPOSE_CTC_MAX} THEN 'dispose'
            WHEN {prefix}duration_s < {DISPOSE_DURATION_MAX} THEN 'dispose'
            WHEN COALESCE({prefix}lid_agree_count, 0) >= {GOLDEN_LID_AGREE}
                 AND ({prefix}conformer_multi_ctc_normalized >= {GOLDEN_CTC_MIN}
                      OR {prefix}conformer_multi_ctc_normalized IS NULL)
                 AND ({prefix}gemini_quality_score >= {GOLDEN_QUALITY_MIN}
                      OR {prefix}gemini_quality_score = 0
                      OR {prefix}gemini_quality_score IS NULL)
                 AND {prefix}duration_s >= {GOLDEN_DURATION_MIN} THEN 'golden'
            ELSE 'redo'
        END
    """


def pct(numerator: int, denominator: int) -> float:
    if denominator == 0:
        return 0.0
    return round((numerator * 100.0) / denominator, 6)


def export_rollups(con: duckdb.DuckDBPyConnection) -> None:
    con.execute(f"""
        COPY (
            SELECT
                'all' AS scope,
                sum(total_segments) AS total_segments,
                count(*) AS total_videos,
                sum(v1_validated_segments) AS v1_validated_segments,
                sum(v1_missing_segments) AS v1_missing_segments,
                sum(final_validated_segments) AS final_validated_segments,
                sum(final_missing_segments) AS final_missing_segments,
                sum(final_golden_segments) AS golden_segments,
                sum(final_redo_segments) AS redo_segments,
                sum(final_dispose_segments) AS dispose_segments,
                count(*) FILTER (WHERE final_missing_segments > 0) AS videos_with_missing_validation
            FROM final_video_rollup

            UNION ALL

            SELECT
                'kept_video_subset' AS scope,
                sum(f.total_segments) AS total_segments,
                count(*) AS total_videos,
                sum(f.v1_validated_segments) AS v1_validated_segments,
                sum(f.v1_missing_segments) AS v1_missing_segments,
                sum(f.final_validated_segments) AS final_validated_segments,
                sum(f.final_missing_segments) AS final_missing_segments,
                sum(f.final_golden_segments) AS golden_segments,
                sum(f.final_redo_segments) AS redo_segments,
                sum(f.final_dispose_segments) AS dispose_segments,
                count(*) FILTER (WHERE f.final_missing_segments > 0) AS videos_with_missing_validation
            FROM final_video_rollup f
            JOIN kept_videos k USING (video_id)
        ) TO '{(OUTPUT_DIR / "scope_rollup.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)

    con.execute(f"""
        COPY (
            SELECT source, segments
            FROM final_source_rollup_all
            ORDER BY segments DESC, source
        ) TO '{(OUTPUT_DIR / "source_breakdown_all.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT source, segments
            FROM final_source_rollup_kept
            ORDER BY segments DESC, source
        ) TO '{(OUTPUT_DIR / "source_breakdown_kept_subset.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT bucket, segments
            FROM final_bucket_rollup_all
            ORDER BY segments DESC, bucket
        ) TO '{(OUTPUT_DIR / "bucket_breakdown_all.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT bucket, segments
            FROM final_bucket_rollup_kept
            ORDER BY segments DESC, bucket
        ) TO '{(OUTPUT_DIR / "bucket_breakdown_kept_subset.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT
                k.recommended_action,
                count(*) AS videos,
                sum(f.total_segments) AS total_segments,
                sum(f.final_validated_segments) AS final_validated_segments,
                sum(f.final_missing_segments) AS final_missing_segments,
                sum(f.final_golden_segments) AS golden_segments,
                sum(f.final_redo_segments) AS redo_segments,
                sum(f.final_dispose_segments) AS dispose_segments
            FROM final_video_rollup f
            JOIN kept_videos k USING (video_id)
            GROUP BY k.recommended_action
            ORDER BY videos DESC
        ) TO '{(OUTPUT_DIR / "action_rollup.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT *
            FROM final_video_rollup
            ORDER BY video_id
        ) TO '{(OUTPUT_DIR / "video_rollup_final.parquet").as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT f.*
            FROM final_video_rollup f
            JOIN kept_videos k USING (video_id)
            ORDER BY f.video_id
        ) TO '{(OUTPUT_DIR / "video_rollup_final_kept_subset.parquet").as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT *
            FROM final_video_rollup
            WHERE final_missing_segments > 0
            ORDER BY final_missing_segments DESC, video_id
        ) TO '{(OUTPUT_DIR / "videos_missing_final_validation.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT f.*
            FROM final_video_rollup f
            JOIN kept_videos k USING (video_id)
            WHERE f.final_missing_segments > 0
            ORDER BY f.final_missing_segments DESC, f.video_id
        ) TO '{(OUTPUT_DIR / "videos_missing_final_validation_kept_subset.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT k.video_id, k.recommended_action
            FROM kept_videos k
            LEFT JOIN final_video_rollup f USING (video_id)
            WHERE f.video_id IS NULL
            ORDER BY k.video_id
        ) TO '{(OUTPUT_DIR / "kept_videos_without_transcriptions.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)


def main() -> None:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    con = duckdb.connect()
    con.execute("SET threads = 8")
    con.execute("SET memory_limit = '24GB'")
    con.execute("SET preserve_insertion_order = false")

    started = time.time()

    con.execute(f"""
        CREATE OR REPLACE VIEW video_rollup_v1 AS
        SELECT * FROM read_parquet('{VIDEO_ROLLUP_V1}')
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW kept_videos AS
        SELECT DISTINCT video_id, recommended_action
        FROM read_csv_auto('{FINAL_VIDEO_SELECTION}', header=true)
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW segment_map AS
        SELECT
            video_id,
            segment_file,
            has_validation,
            validation_source,
            parent_segment_file,
            is_split_segment,
            split_index_from_id,
            original_start_ms,
            original_end_ms,
            trimmed_start_ms,
            trimmed_end_ms,
            leading_pad_ms,
            trailing_pad_ms
        FROM read_parquet('{SEGMENT_MAP_GLOB}', hive_partitioning=true, union_by_name=true)
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE missing_segments_v1 AS
        SELECT video_id, segment_file
        FROM segment_map
        WHERE NOT has_validation
    """)
    con.execute(f"""
        CREATE OR REPLACE TEMP TABLE recover_v2_dedup AS
        WITH ranked AS (
            SELECT
                *,
                ROW_NUMBER() OVER (
                    PARTITION BY video_id, segment_file
                    ORDER BY
                        CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                        conformer_multi_ctc_normalized DESC NULLS LAST,
                        mms_confidence DESC NULLS LAST
                ) AS rn
            FROM read_parquet('{RECOVER_V2}')
        )
        SELECT
            video_id,
            segment_file,
            {bucket_case()} AS provisional_bucket
        FROM ranked
        WHERE rn = 1
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE recover_v2_matched AS
        SELECT
            m.video_id,
            m.segment_file,
            v.provisional_bucket
        FROM missing_segments_v1 m
        JOIN recover_v2_dedup v USING (video_id, segment_file)
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE recover_v2_by_video AS
        SELECT
            video_id,
            count(*) AS recover_v2_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS recover_v2_golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS recover_v2_redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS recover_v2_dispose_segments
        FROM recover_v2_matched
        GROUP BY video_id
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE final_video_rollup AS
        SELECT
            v.video_id,
            v.queue_language,
            v.total_segments,
            v.validated_segments AS v1_validated_segments,
            v.missing_validation_segments AS v1_missing_segments,
            v.golden_segments AS v1_golden_segments,
            v.redo_segments AS v1_redo_segments,
            v.dispose_segments AS v1_dispose_segments,
            coalesce(r.recover_v2_segments, 0) AS recover_v2_segments,
            v.validated_segments + coalesce(r.recover_v2_segments, 0) AS final_validated_segments,
            v.missing_validation_segments - coalesce(r.recover_v2_segments, 0) AS final_missing_segments,
            v.golden_segments + coalesce(r.recover_v2_golden_segments, 0) AS final_golden_segments,
            v.redo_segments + coalesce(r.recover_v2_redo_segments, 0) AS final_redo_segments,
            v.dispose_segments + coalesce(r.recover_v2_dispose_segments, 0) AS final_dispose_segments,
            (v.missing_validation_segments - coalesce(r.recover_v2_segments, 0) = 0) AS fully_validated_final
        FROM video_rollup_v1 v
        LEFT JOIN recover_v2_by_video r USING (video_id)
    """)

    raw_tx_rows = con.execute(f"SELECT count(*) FROM read_parquet('{RAW_TX}')").fetchone()[0]
    transcribed_summary_all = fetchone_dict(con, """
        SELECT
            sum(total_segments) AS deduped_transcribed_segments,
            count(*) AS transcribed_videos
        FROM video_rollup_v1
    """)
    v1_summary_all = fetchone_dict(con, """
        SELECT
            sum(validated_segments) AS v1_validated_segments,
            sum(missing_validation_segments) AS v1_missing_segments,
            sum(golden_segments) AS v1_golden_segments,
            sum(redo_segments) AS v1_redo_segments,
            sum(dispose_segments) AS v1_dispose_segments,
            count(*) FILTER (WHERE missing_validation_segments > 0) AS v1_videos_with_missing_validation
        FROM video_rollup_v1
    """)
    source_rollup_all = fetchone_dict(con, """
        SELECT
            count(*) FILTER (WHERE validation_source = 'historical') AS historical_segments,
            count(*) FILTER (WHERE validation_source = 'recover') AS recover_v1_segments
        FROM segment_map
        WHERE has_validation
    """)
    v2_summary_all = fetchone_dict(con, """
        SELECT
            count(*) AS recover_v2_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS recover_v2_golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS recover_v2_redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS recover_v2_dispose_segments
        FROM recover_v2_matched
    """)
    final_summary_all = fetchone_dict(con, """
        SELECT
            sum(final_validated_segments) AS final_validated_segments,
            sum(final_missing_segments) AS final_missing_segments,
            sum(final_golden_segments) AS golden_segments,
            sum(final_redo_segments) AS redo_segments,
            sum(final_dispose_segments) AS dispose_segments,
            count(*) FILTER (WHERE final_missing_segments > 0) AS videos_with_missing_final_validation
        FROM final_video_rollup
    """)
    cutpoint_coverage = fetchone_dict(con, """
        SELECT
            count(*) AS total_segment_map_rows,
            count(*) FILTER (WHERE parent_segment_file IS NOT NULL AND parent_segment_file <> '') AS parent_segment_file_present,
            count(*) FILTER (WHERE is_split_segment) AS split_segments,
            count(*) FILTER (WHERE split_index_from_id IS NOT NULL) AS split_index_present,
            count(*) FILTER (WHERE original_start_ms IS NOT NULL AND original_start_ms <> '') AS original_start_present,
            count(*) FILTER (WHERE original_end_ms IS NOT NULL AND original_end_ms <> '') AS original_end_present,
            count(*) FILTER (WHERE trimmed_start_ms IS NOT NULL AND trimmed_start_ms <> '') AS trimmed_start_present,
            count(*) FILTER (WHERE trimmed_end_ms IS NOT NULL AND trimmed_end_ms <> '') AS trimmed_end_present,
            count(*) FILTER (WHERE leading_pad_ms IS NOT NULL AND leading_pad_ms <> '') AS leading_pad_present,
            count(*) FILTER (WHERE trailing_pad_ms IS NOT NULL AND trailing_pad_ms <> '') AS trailing_pad_present
        FROM segment_map
    """)

    kept_selection = fetchone_dict(con, """
        SELECT
            count(*) AS selected_videos_total,
            count(*) FILTER (WHERE recommended_action = 'keep') AS selected_videos_action_keep,
            count(*) FILTER (WHERE recommended_action = 'review') AS selected_videos_action_review
        FROM kept_videos
    """)
    transcribed_summary_kept = fetchone_dict(con, """
        SELECT
            count(*) AS selected_videos_with_transcriptions,
            sum(f.total_segments) AS deduped_transcribed_segments
        FROM final_video_rollup f
        JOIN kept_videos k USING (video_id)
    """)
    v1_summary_kept = fetchone_dict(con, """
        SELECT
            sum(f.v1_validated_segments) AS v1_validated_segments,
            sum(f.v1_missing_segments) AS v1_missing_segments,
            sum(f.v1_golden_segments) AS v1_golden_segments,
            sum(f.v1_redo_segments) AS v1_redo_segments,
            sum(f.v1_dispose_segments) AS v1_dispose_segments,
            count(*) FILTER (WHERE f.v1_missing_segments > 0) AS v1_videos_with_missing_validation
        FROM final_video_rollup f
        JOIN kept_videos k USING (video_id)
    """)
    source_rollup_kept = fetchone_dict(con, """
        SELECT
            count(*) FILTER (WHERE s.validation_source = 'historical') AS historical_segments,
            count(*) FILTER (WHERE s.validation_source = 'recover') AS recover_v1_segments
        FROM segment_map s
        JOIN kept_videos k USING (video_id)
        WHERE s.has_validation
    """)
    v2_summary_kept = fetchone_dict(con, """
        SELECT
            count(*) AS recover_v2_segments,
            count(*) FILTER (WHERE m.provisional_bucket = 'golden') AS recover_v2_golden_segments,
            count(*) FILTER (WHERE m.provisional_bucket = 'redo') AS recover_v2_redo_segments,
            count(*) FILTER (WHERE m.provisional_bucket = 'dispose') AS recover_v2_dispose_segments
        FROM recover_v2_matched m
        JOIN kept_videos k USING (video_id)
    """)
    final_summary_kept = fetchone_dict(con, """
        SELECT
            sum(f.final_validated_segments) AS final_validated_segments,
            sum(f.final_missing_segments) AS final_missing_segments,
            sum(f.final_golden_segments) AS golden_segments,
            sum(f.final_redo_segments) AS redo_segments,
            sum(f.final_dispose_segments) AS dispose_segments,
            count(*) FILTER (WHERE f.final_missing_segments > 0) AS videos_with_missing_final_validation
        FROM final_video_rollup f
        JOIN kept_videos k USING (video_id)
    """)

    all_summary = {
        "raw_transcription_rows": raw_tx_rows,
        "deduped_transcribed_segments": transcribed_summary_all["deduped_transcribed_segments"],
        "duplicate_transcription_rows": raw_tx_rows - transcribed_summary_all["deduped_transcribed_segments"],
        "transcribed_videos": transcribed_summary_all["transcribed_videos"],
        "v1_validated_segments": v1_summary_all["v1_validated_segments"],
        "v1_missing_segments": v1_summary_all["v1_missing_segments"],
        "v1_videos_with_missing_validation": v1_summary_all["v1_videos_with_missing_validation"],
        "final_validated_segments": final_summary_all["final_validated_segments"],
        "final_missing_segments": final_summary_all["final_missing_segments"],
        "videos_with_missing_final_validation": final_summary_all["videos_with_missing_final_validation"],
        "historical_segments": source_rollup_all["historical_segments"],
        "recover_v1_segments": source_rollup_all["recover_v1_segments"],
        "recover_v2_segments": v2_summary_all["recover_v2_segments"],
        "golden_segments": final_summary_all["golden_segments"],
        "redo_segments": final_summary_all["redo_segments"],
        "dispose_segments": final_summary_all["dispose_segments"],
        "v2_golden_segments": v2_summary_all["recover_v2_golden_segments"],
        "v2_redo_segments": v2_summary_all["recover_v2_redo_segments"],
        "v2_dispose_segments": v2_summary_all["recover_v2_dispose_segments"],
        "v1_coverage_pct": pct(v1_summary_all["v1_validated_segments"], transcribed_summary_all["deduped_transcribed_segments"]),
        "final_coverage_pct": pct(final_summary_all["final_validated_segments"], transcribed_summary_all["deduped_transcribed_segments"]),
    }

    kept_summary = {
        "selected_videos_total": kept_selection["selected_videos_total"],
        "selected_videos_action_keep": kept_selection["selected_videos_action_keep"],
        "selected_videos_action_review": kept_selection["selected_videos_action_review"],
        "selected_videos_with_transcriptions": transcribed_summary_kept["selected_videos_with_transcriptions"],
        "selected_videos_without_transcriptions": (
            kept_selection["selected_videos_total"] - transcribed_summary_kept["selected_videos_with_transcriptions"]
        ),
        "deduped_transcribed_segments": transcribed_summary_kept["deduped_transcribed_segments"],
        "v1_validated_segments": v1_summary_kept["v1_validated_segments"],
        "v1_missing_segments": v1_summary_kept["v1_missing_segments"],
        "v1_videos_with_missing_validation": v1_summary_kept["v1_videos_with_missing_validation"],
        "final_validated_segments": final_summary_kept["final_validated_segments"],
        "final_missing_segments": final_summary_kept["final_missing_segments"],
        "videos_with_missing_final_validation": final_summary_kept["videos_with_missing_final_validation"],
        "historical_segments": source_rollup_kept["historical_segments"],
        "recover_v1_segments": source_rollup_kept["recover_v1_segments"],
        "recover_v2_segments": v2_summary_kept["recover_v2_segments"],
        "golden_segments": final_summary_kept["golden_segments"],
        "redo_segments": final_summary_kept["redo_segments"],
        "dispose_segments": final_summary_kept["dispose_segments"],
        "v2_golden_segments": v2_summary_kept["recover_v2_golden_segments"],
        "v2_redo_segments": v2_summary_kept["recover_v2_redo_segments"],
        "v2_dispose_segments": v2_summary_kept["recover_v2_dispose_segments"],
        "v1_coverage_pct": pct(v1_summary_kept["v1_validated_segments"], transcribed_summary_kept["deduped_transcribed_segments"]),
        "final_coverage_pct": pct(final_summary_kept["final_validated_segments"], transcribed_summary_kept["deduped_transcribed_segments"]),
    }

    con.execute("""
        CREATE OR REPLACE TEMP TABLE final_source_rollup_all AS
        SELECT * FROM (
            SELECT 'historical' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'recover_v1' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'recover_v2' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'missing' AS source, ?::BIGINT AS segments
        )
    """, [
        all_summary["historical_segments"],
        all_summary["recover_v1_segments"],
        all_summary["recover_v2_segments"],
        all_summary["final_missing_segments"],
    ])
    con.execute("""
        CREATE OR REPLACE TEMP TABLE final_source_rollup_kept AS
        SELECT * FROM (
            SELECT 'historical' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'recover_v1' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'recover_v2' AS source, ?::BIGINT AS segments
            UNION ALL
            SELECT 'missing' AS source, ?::BIGINT AS segments
        )
    """, [
        kept_summary["historical_segments"],
        kept_summary["recover_v1_segments"],
        kept_summary["recover_v2_segments"],
        kept_summary["final_missing_segments"],
    ])
    con.execute("""
        CREATE OR REPLACE TEMP TABLE final_bucket_rollup_all AS
        SELECT * FROM (
            SELECT 'golden' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'redo' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'dispose' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'missing' AS bucket, ?::BIGINT AS segments
        )
    """, [
        all_summary["golden_segments"],
        all_summary["redo_segments"],
        all_summary["dispose_segments"],
        all_summary["final_missing_segments"],
    ])
    con.execute("""
        CREATE OR REPLACE TEMP TABLE final_bucket_rollup_kept AS
        SELECT * FROM (
            SELECT 'golden' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'redo' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'dispose' AS bucket, ?::BIGINT AS segments
            UNION ALL
            SELECT 'missing' AS bucket, ?::BIGINT AS segments
        )
    """, [
        kept_summary["golden_segments"],
        kept_summary["redo_segments"],
        kept_summary["dispose_segments"],
        kept_summary["final_missing_segments"],
    ])

    export_rollups(con)

    inventory = {
        "generated_at_epoch_s": round(time.time(), 3),
        "elapsed_s": round(time.time() - started, 2),
        "source_artifacts": {
            "deduped_transcript_backbone": "data/phase1_incremental/segment_map_v1/",
            "phase1_video_rollup_v1": "data/phase1_incremental/analytics_v1/video_rollup.parquet",
            "recover_v2_consolidated": "data/recover_v2_consolidated.parquet",
            "final_video_selection": "data/video_tts_classification_final.csv",
            "channel_dropped_videos": "data/video_tts_dropped_by_channel.csv",
        },
        "reality_checks": {
            "transcribed_tars_persist_polished_child_audio": False,
            "local_replay_ledgers_present_under_data": False,
            "split_lineage_present_in_segment_backbone": True,
            "millisecond_cutpoint_columns_exist_but_are_empty": True,
            "cutpoint_fields": [
                "original_start_ms",
                "original_end_ms",
                "trimmed_start_ms",
                "trimmed_end_ms",
                "leading_pad_ms",
                "trailing_pad_ms",
                "parent_segment_file",
                "is_split_segment",
                "split_index_from_id",
            ],
        },
        "working_outputs": {
            "video_rollup_final": "final_data/video_rollup_final.parquet",
            "video_rollup_final_kept_subset": "final_data/video_rollup_final_kept_subset.parquet",
            "videos_missing_final_validation": "final_data/videos_missing_final_validation.csv",
            "videos_missing_final_validation_kept_subset": "final_data/videos_missing_final_validation_kept_subset.csv",
            "source_breakdown_all": "final_data/source_breakdown_all.csv",
            "source_breakdown_kept_subset": "final_data/source_breakdown_kept_subset.csv",
            "bucket_breakdown_all": "final_data/bucket_breakdown_all.csv",
            "bucket_breakdown_kept_subset": "final_data/bucket_breakdown_kept_subset.csv",
            "action_rollup": "final_data/action_rollup.csv",
        },
    }

    write_json(OUTPUT_DIR / "summary_all.json", all_summary)
    write_json(OUTPUT_DIR / "summary_kept_subset.json", kept_summary)
    write_json(OUTPUT_DIR / "cutpoint_coverage.json", cutpoint_coverage)
    write_json(OUTPUT_DIR / "dataset_inventory.json", inventory)

    print(json.dumps({
        "summary_all": all_summary,
        "summary_kept_subset": kept_summary,
        "cutpoint_coverage": cutpoint_coverage,
        "inventory": inventory,
    }, indent=2, sort_keys=True))


if __name__ == "__main__":
    main()
