from __future__ import annotations

import json
import time
from pathlib import Path

import duckdb


ROOT = Path("/home/ubuntu/transcripts")
DATA_DIR = ROOT / "data"
FINAL_DIR = ROOT / "final_data"

SEGMENT_MAP_GLOB = (DATA_DIR / "phase1_incremental" / "segment_map_v1" / "**" / "*.parquet").as_posix()
RECOVER_V2 = (DATA_DIR / "recover_v2_consolidated.parquet").as_posix()
VIDEO_QUEUE = (DATA_DIR / "video_queue.csv.gz").as_posix()
YOUTUBE_META = (DATA_DIR / "youtube_video_metadata_all.csv").as_posix()
CLASSIFICATION_ALL = (DATA_DIR / "video_tts_classification_all.csv").as_posix()
SELECTED_VIDEO_SET = (DATA_DIR / "video_tts_classification_final.csv").as_posix()
VIDEO_ROLLUP_ALL = (FINAL_DIR / "video_rollup_final.parquet").as_posix()
VIDEO_ROLLUP_SELECTED = (FINAL_DIR / "video_rollup_final_kept_subset.parquet").as_posix()
VIDEO_ROLLUP_STRICT = (FINAL_DIR / "video_rollup_gemini_refined_strict.parquet").as_posix()

TARGET_LANGS = ("en", "hi", "te", "ta", "kn", "ml", "gu", "pa", "bn", "or", "mr", "as")
APPROVED_THEME_PATTERNS = {
    "language_learning": (
        r"language-learning|language learning|language lesson|language lessons|learn english|learn hindi|"
        r"learn tamil|learn telugu|learn kannada|learn malayalam|learn gujarati|learn punjabi|learn bengali|"
        r"learn assamese|learn odia|phrasebook|phrases|daily use sentences|spoken english|spoken hindi|"
        r"spoken tamil|spoken telugu|spoken kannada|spoken malayalam|spoken gujarati|spoken punjabi|"
        r"spoken bengali|spoken odia|translation"
    ),
    "pakistan_urdu": r"(^|[^a-z])(pakistan|pakistani|urdu)([^a-z]|$)",
    "meeting_zoom_group": (
        r"zoom meeting|group meeting|team meeting|meeting recording|weekly meeting|committee meeting|"
        r"board meeting|panel discussion|roundtable|group discussion|discussion session|question answer|"
        r"q\s*&\s*a|q and a|multi speaker|multilingual"
    ),
    "startup_interview": r"startup interview|startup interview podcast|founder interview|co-founder interview|startup podcast",
}


def write_json(path: Path, payload: dict) -> None:
    path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")


def fetchone_dict(con: duckdb.DuckDBPyConnection, query: str) -> dict:
    rel = con.execute(query)
    row = rel.fetchone()
    if row is None:
        return {}
    cols = [d[0] for d in rel.description]
    return dict(zip(cols, row))


def pct(numerator: int | None, denominator: int | None) -> float:
    if not numerator or not denominator:
        return 0.0
    return round((100.0 * numerator) / denominator, 6)


def sql_list(items: tuple[str, ...]) -> str:
    return ", ".join(f"'{item}'" for item in items)


def bucket_case(alias: str = "") -> str:
    prefix = f"{alias}." if alias else ""
    return f"""
        CASE
            WHEN {prefix}lid_consensus = false AND COALESCE({prefix}lid_agree_count, 0) < 2 THEN 'dispose'
            WHEN {prefix}conformer_multi_ctc_normalized IS NOT NULL
                 AND {prefix}conformer_multi_ctc_normalized < 0.3 THEN 'dispose'
            WHEN {prefix}duration_s < 1.0 THEN 'dispose'
            WHEN COALESCE({prefix}lid_agree_count, 0) >= 3
                 AND ({prefix}conformer_multi_ctc_normalized >= 0.7
                      OR {prefix}conformer_multi_ctc_normalized IS NULL)
                 AND ({prefix}gemini_quality_score >= 0.5
                      OR {prefix}gemini_quality_score = 0
                      OR {prefix}gemini_quality_score IS NULL)
                 AND {prefix}duration_s >= 2.0 THEN 'golden'
            ELSE 'redo'
        END
    """


def clean_text_expr(column: str) -> str:
    return (
        f"trim("
        f"regexp_replace("
        f"regexp_replace("
        f"regexp_replace(coalesce({column}, ''), '\\\\[[^\\\\]]+\\\\]', '', 'g'), "
        f"'\\\\[UNK\\\\]', '', 'gi'), "
        f"'\\\\[INAUDIBLE\\\\]', '', 'gi')"
        f")"
    )


def main() -> None:
    FINAL_DIR.mkdir(parents=True, exist_ok=True)
    temp_dir = FINAL_DIR / "duckdb_tmp"
    temp_dir.mkdir(parents=True, exist_ok=True)

    con = duckdb.connect()
    con.execute("SET threads = 8")
    con.execute("SET memory_limit = '24GB'")
    con.execute("SET preserve_insertion_order = false")
    con.execute(f"SET temp_directory = '{temp_dir.as_posix()}'")

    started = time.time()
    target_sql = sql_list(TARGET_LANGS)

    con.execute(f"""
        CREATE OR REPLACE VIEW selected_videos AS
        SELECT DISTINCT video_id, recommended_action
        FROM read_csv_auto('{SELECTED_VIDEO_SET}', header=true)
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW strict_videos AS
        SELECT DISTINCT video_id
        FROM read_parquet('{VIDEO_ROLLUP_STRICT}')
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW queue_videos AS
        SELECT video_id, language AS queue_language
        FROM read_csv_auto('{VIDEO_QUEUE}', header=true)
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW youtube_meta AS
        SELECT
            video_id,
            regexp_extract(lower(coalesce(default_audio_language, '')), '^([a-z]+)', 1) AS youtube_audio_language,
            regexp_extract(lower(coalesce(default_language, '')), '^([a-z]+)', 1) AS youtube_default_language,
            channel_id,
            channel_title,
            title,
            description,
            tags,
            lower(coalesce(channel_title, '')) AS channel_title_lc,
            lower(coalesce(title, '')) AS title_lc,
            lower(coalesce(description, '')) AS description_lc,
            lower(coalesce(tags, '')) AS tags_lc
        FROM read_csv_auto('{YOUTUBE_META}', header=true)
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW classifier_meta AS
        SELECT
            video_id,
            lower(coalesce(likely_content_type, '')) AS likely_content_type_lc,
            lower(coalesce(risk_signals, '')) AS risk_signals_lc,
            lower(coalesce(hard_reject_reasons, '')) AS hard_reject_reasons_lc,
            lower(coalesce(short_rationale, '')) AS short_rationale_lc
        FROM read_csv_auto('{CLASSIFICATION_ALL}', header=true)
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW strict_segments_raw AS
        SELECT
            s.video_id,
            sv.recommended_action,
            coalesce(q.queue_language, s.queue_language) AS queue_language,
            s.segment_file,
            s.parent_segment_file,
            s.is_split_segment,
            s.split_index_from_id,
            s.speaker_id,
            s.original_start_ms,
            s.original_end_ms,
            s.trimmed_start_ms,
            s.trimmed_end_ms,
            s.leading_pad_ms,
            s.trailing_pad_ms,
            s.expected_language_hint,
            s.tx_detected_language,
            coalesce(nullif(s.tx_detected_language, ''), nullif(s.expected_language_hint, ''), coalesce(q.queue_language, s.queue_language)) AS gemini_lang,
            s.lang_mismatch_flag,
            s.transcription,
            s.tagged,
            s.num_unk,
            s.num_inaudible,
            s.num_event_tags,
            s.text_length_per_sec,
            s.tx_quality_score,
            s.asr_eligible,
            s.tts_clean_eligible,
            s.tts_expressive_eligible,
            s.validation_source,
            s.has_validation,
            s.duration_s,
            s.provisional_bucket
        FROM read_parquet('{SEGMENT_MAP_GLOB}', hive_partitioning=true, union_by_name=true) s
        JOIN strict_videos st USING (video_id)
        JOIN selected_videos sv USING (video_id)
        LEFT JOIN queue_videos q USING (video_id)
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE strict_missing_segments_v1 AS
        SELECT video_id, segment_file
        FROM strict_segments_raw
        WHERE NOT has_validation
    """)
    con.execute(f"""
        CREATE OR REPLACE TEMP TABLE recover_v2_dedup AS
        WITH ranked AS (
            SELECT
                *,
                ROW_NUMBER() OVER (
                    PARTITION BY video_id, segment_file
                    ORDER BY
                        CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                        conformer_multi_ctc_normalized DESC NULLS LAST,
                        mms_confidence DESC NULLS LAST
                ) AS rn
            FROM read_parquet('{RECOVER_V2}')
        )
        SELECT
            video_id,
            segment_file,
            {bucket_case()} AS provisional_bucket
        FROM ranked
        WHERE rn = 1
    """)
    con.execute("""
        CREATE OR REPLACE TEMP TABLE recover_v2_matched AS
        SELECT
            m.video_id,
            m.segment_file,
            v.provisional_bucket
        FROM strict_missing_segments_v1 m
        JOIN recover_v2_dedup v USING (video_id, segment_file)
    """)
    con.execute("""
        CREATE OR REPLACE VIEW strict_segments_final AS
        SELECT
            s.*,
            CASE
                WHEN s.has_validation THEN s.validation_source
                WHEN r.segment_file IS NOT NULL THEN 'recover_v2'
                ELSE 'missing'
            END AS final_validation_source,
            (s.has_validation OR r.segment_file IS NOT NULL) AS final_has_validation,
            CASE
                WHEN s.has_validation THEN s.provisional_bucket
                ELSE r.provisional_bucket
            END AS final_bucket
        FROM strict_segments_raw s
        LEFT JOIN recover_v2_matched r USING (video_id, segment_file)
    """)
    con.execute("""
        CREATE OR REPLACE VIEW strict_video_lang_counts AS
        SELECT
            video_id,
            gemini_lang,
            count(*) AS segments
        FROM strict_segments_final
        GROUP BY video_id, gemini_lang
    """)
    con.execute("""
        CREATE OR REPLACE VIEW strict_video_lang_ranked AS
        SELECT
            *,
            row_number() OVER (PARTITION BY video_id ORDER BY segments DESC, gemini_lang) AS rn
        FROM strict_video_lang_counts
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW strict_video_profile AS
        WITH totals AS (
            SELECT
                s.video_id,
                any_value(s.recommended_action) AS recommended_action,
                any_value(s.queue_language) AS queue_language,
                any_value(y.youtube_audio_language) AS youtube_audio_language,
                any_value(y.youtube_default_language) AS youtube_default_language,
                any_value(y.channel_id) AS channel_id,
                any_value(y.channel_title) AS channel_title,
                any_value(y.title) AS title,
                count(*) AS total_segments,
                count(DISTINCT s.gemini_lang) FILTER (WHERE s.gemini_lang <> '') AS distinct_detected_languages,
                count(*) FILTER (WHERE s.gemini_lang NOT IN ({target_sql}) AND s.gemini_lang <> '') AS foreign_segments
            FROM strict_segments_final s
            LEFT JOIN youtube_meta y USING (video_id)
            GROUP BY s.video_id
        ),
        dominant AS (
            SELECT
                video_id,
                gemini_lang AS dominant_gemini_language,
                segments AS dominant_gemini_segments
            FROM strict_video_lang_ranked
            WHERE rn = 1
        )
        SELECT
            t.*,
            d.dominant_gemini_language,
            d.dominant_gemini_segments,
            round(100.0 * t.foreign_segments / t.total_segments, 6) AS foreign_share_pct,
            round(100.0 * d.dominant_gemini_segments / t.total_segments, 6) AS dominant_share_pct,
            CASE
                WHEN t.youtube_audio_language IN ({target_sql})
                     AND t.youtube_audio_language = d.dominant_gemini_language
                THEN t.youtube_audio_language
                ELSE t.queue_language
            END AS corrected_language
        FROM totals t
        JOIN dominant d USING (video_id)
    """)
    text_expr = (
        "coalesce(c.likely_content_type_lc,'') || ' ' || "
        "coalesce(c.risk_signals_lc,'') || ' ' || "
        "coalesce(c.hard_reject_reasons_lc,'') || ' ' || "
        "coalesce(c.short_rationale_lc,'') || ' ' || "
        "coalesce(y.channel_title_lc,'') || ' ' || "
        "coalesce(y.title_lc,'') || ' ' || "
        "coalesce(y.description_lc,'') || ' ' || "
        "coalesce(y.tags_lc,'')"
    )
    for family, pattern in APPROVED_THEME_PATTERNS.items():
        con.execute(f"""
            CREATE OR REPLACE VIEW flag_{family} AS
            SELECT DISTINCT s.video_id
            FROM strict_videos s
            LEFT JOIN classifier_meta c USING (video_id)
            LEFT JOIN youtube_meta y USING (video_id)
            WHERE regexp_matches({text_expr}, '{pattern}')
        """)
    flag_union = " UNION ALL ".join(
        f"SELECT video_id, '{family}' AS family FROM flag_{family}" for family in APPROVED_THEME_PATTERNS
    )
    con.execute(f"""
        CREATE OR REPLACE VIEW approved_theme_flags AS
        {flag_union}
    """)
    con.execute(f"""
        CREATE OR REPLACE VIEW final_video_filters AS
        SELECT
            p.*,
            p.foreign_share_pct >= 5.0 AS drop_foreign_share_ge5,
            p.youtube_audio_language <> ''
                AND p.youtube_audio_language <> 'und'
                AND p.youtube_audio_language NOT IN ({target_sql}) AS drop_audio_non_target,
            p.youtube_audio_language = 'und'
                AND NOT (p.distinct_detected_languages <= 3 AND p.foreign_segments = 0) AS drop_und_unsound,
            EXISTS (SELECT 1 FROM flag_language_learning f WHERE f.video_id = p.video_id) AS drop_language_learning,
            EXISTS (SELECT 1 FROM flag_pakistan_urdu f WHERE f.video_id = p.video_id) AS drop_pakistan_urdu,
            EXISTS (SELECT 1 FROM flag_meeting_zoom_group f WHERE f.video_id = p.video_id) AS drop_meeting_zoom_group,
            EXISTS (SELECT 1 FROM flag_startup_interview f WHERE f.video_id = p.video_id) AS drop_startup_interview
        FROM strict_video_profile p
    """)
    con.execute("""
        CREATE OR REPLACE VIEW final_video_decisions AS
        SELECT
            *,
            (
                drop_language_learning
                OR drop_pakistan_urdu
                OR drop_meeting_zoom_group
                OR drop_startup_interview
            ) AS drop_approved_theme_family,
            (
                drop_foreign_share_ge5
                OR drop_audio_non_target
                OR drop_und_unsound
                OR drop_language_learning
                OR drop_pakistan_urdu
                OR drop_meeting_zoom_group
                OR drop_startup_interview
            ) AS drop_any
        FROM final_video_filters
    """)
    con.execute("""
        CREATE OR REPLACE VIEW kept_videos_after_video_filters AS
        SELECT *
        FROM final_video_decisions
        WHERE NOT drop_any
    """)
    clean_text = clean_text_expr("s.transcription")
    con.execute(f"""
        CREATE OR REPLACE VIEW cleaned_segment_candidates AS
        SELECT
            s.*,
            k.corrected_language,
            {clean_text} AS clean_text,
            CASE
                WHEN s.gemini_lang NOT IN ({target_sql}) AND s.gemini_lang <> '' THEN 'foreign_segment'
                WHEN s.queue_language = 'ta' AND k.corrected_language = 'te' AND s.gemini_lang = 'ta' THEN 'ta_to_te_residual'
                WHEN (
                    {clean_text} = ''
                    OR coalesce(s.num_unk, 0) > 0
                    OR coalesce(s.num_inaudible, 0) > 0
                    OR regexp_matches(coalesce(s.transcription, ''), '\\\\[UNK\\\\]', 'i')
                    OR regexp_matches(coalesce(s.transcription, ''), '\\\\[INAUDIBLE\\\\]', 'i')
                    OR regexp_matches(coalesce(s.transcription, ''), '\\\\[NO_SPEECH\\\\]', 'i')
                ) THEN 'blank_unk_or_inaudible'
                ELSE 'kept'
            END AS segment_decision
        FROM strict_segments_final s
        JOIN kept_videos_after_video_filters k USING (video_id)
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                recommended_action,
                queue_language,
                youtube_audio_language,
                youtube_default_language,
                dominant_gemini_language,
                corrected_language,
                total_segments,
                distinct_detected_languages,
                foreign_segments,
                foreign_share_pct,
                drop_foreign_share_ge5,
                drop_audio_non_target,
                drop_und_unsound,
                drop_language_learning,
                drop_pakistan_urdu,
                drop_meeting_zoom_group,
                drop_startup_interview
            FROM final_video_decisions
            WHERE drop_any
            ORDER BY video_id
        ) TO '{(FINAL_DIR / "final_cleaned_excluded_videos.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                recommended_action,
                queue_language,
                youtube_audio_language,
                youtube_default_language,
                dominant_gemini_language,
                corrected_language,
                total_segments,
                distinct_detected_languages,
                foreign_segments,
                foreign_share_pct
            FROM kept_videos_after_video_filters
            ORDER BY video_id
        ) TO '{(FINAL_DIR / "final_cleaned_keep_videos.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT
                segment_decision,
                count(*) AS segments
            FROM cleaned_segment_candidates
            GROUP BY segment_decision
            ORDER BY segments DESC, segment_decision
        ) TO '{(FINAL_DIR / "final_cleaned_segment_removal_summary.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT
                corrected_language,
                gemini_lang,
                count(*) AS segments,
                count(DISTINCT video_id) AS videos
            FROM cleaned_segment_candidates
            WHERE segment_decision = 'kept'
              AND gemini_lang IN ({target_sql})
              AND corrected_language IN ({target_sql})
              AND gemini_lang <> corrected_language
            GROUP BY corrected_language, gemini_lang
            ORDER BY segments DESC, corrected_language, gemini_lang
        ) TO '{(FINAL_DIR / "final_cleaned_remaining_target_mismatch_pairs.csv").as_posix()}' (HEADER, DELIMITER ',')
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                recommended_action,
                queue_language,
                corrected_language,
                segment_file,
                speaker_id,
                parent_segment_file,
                is_split_segment,
                split_index_from_id,
                original_start_ms,
                original_end_ms,
                trimmed_start_ms,
                trimmed_end_ms,
                leading_pad_ms,
                trailing_pad_ms,
                expected_language_hint,
                tx_detected_language,
                gemini_lang,
                transcription,
                tagged,
                clean_text,
                num_unk,
                num_inaudible,
                num_event_tags,
                text_length_per_sec,
                tx_quality_score,
                asr_eligible,
                tts_clean_eligible,
                tts_expressive_eligible,
                lang_mismatch_flag,
                duration_s,
                final_validation_source,
                final_has_validation,
                final_bucket
            FROM cleaned_segment_candidates
            WHERE segment_decision = 'kept'
            ORDER BY video_id, segment_file
        ) TO '{(FINAL_DIR / "final_cleaned_segments.parquet").as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)
    con.execute(f"""
        COPY (
            SELECT
                video_id,
                any_value(recommended_action) AS recommended_action,
                any_value(queue_language) AS queue_language,
                any_value(corrected_language) AS corrected_language,
                any_value(gemini_lang) FILTER (WHERE gemini_lang = corrected_language) AS matched_language_example,
                count(*) AS total_segments,
                count(*) FILTER (WHERE final_has_validation) AS final_validated_segments,
                count(*) FILTER (WHERE NOT final_has_validation) AS final_missing_segments,
                count(*) FILTER (WHERE final_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE final_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE final_bucket = 'dispose') AS dispose_segments,
                count(*) FILTER (WHERE gemini_lang <> corrected_language) AS target_language_mismatch_segments,
                count(*) FILTER (WHERE lang_mismatch_flag) AS lang_mismatch_flag_segments
            FROM cleaned_segment_candidates
            WHERE segment_decision = 'kept'
            GROUP BY video_id
            ORDER BY video_id
        ) TO '{(FINAL_DIR / "final_cleaned_video_rollup.parquet").as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

    baseline_all = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) AS videos,
            sum(total_segments) AS total_segments,
            sum(final_validated_segments) AS final_validated_segments,
            sum(final_missing_segments) AS final_missing_segments
        FROM read_parquet('{VIDEO_ROLLUP_ALL}')
        """,
    )
    baseline_selected = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) AS videos,
            sum(total_segments) AS total_segments,
            sum(final_validated_segments) AS final_validated_segments,
            sum(final_missing_segments) AS final_missing_segments
        FROM read_parquet('{VIDEO_ROLLUP_SELECTED}')
        """,
    )
    baseline_strict = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) AS videos,
            sum(total_segments) AS total_segments,
            sum(final_validated_segments) AS final_validated_segments,
            sum(final_missing_segments) AS final_missing_segments
        FROM read_parquet('{VIDEO_ROLLUP_STRICT}')
        """,
    )
    video_filter_counts = fetchone_dict(
        con,
        """
        SELECT
            count(*) AS strict_videos,
            count(*) FILTER (WHERE drop_foreign_share_ge5) AS drop_foreign_share_ge5_videos,
            count(*) FILTER (WHERE drop_audio_non_target) AS drop_audio_non_target_videos,
            count(*) FILTER (WHERE drop_und_unsound) AS drop_und_unsound_videos,
            count(*) FILTER (WHERE drop_language_learning) AS drop_language_learning_videos,
            count(*) FILTER (WHERE drop_pakistan_urdu) AS drop_pakistan_urdu_videos,
            count(*) FILTER (WHERE drop_meeting_zoom_group) AS drop_meeting_zoom_group_videos,
            count(*) FILTER (WHERE drop_startup_interview) AS drop_startup_interview_videos,
            count(*) FILTER (WHERE drop_approved_theme_family) AS drop_approved_theme_family_videos,
            count(*) FILTER (WHERE drop_any) AS drop_any_videos,
            count(*) FILTER (WHERE NOT drop_any) AS kept_after_video_filters_videos,
            sum(total_segments) FILTER (WHERE NOT drop_any) AS kept_after_video_filters_segments
        FROM final_video_decisions
        """,
    )
    strict_foreign_summary = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) FILTER (WHERE gemini_lang NOT IN ({target_sql}) AND gemini_lang <> '') AS strict_foreign_segments,
            count(*) FILTER (WHERE lang_mismatch_flag) AS strict_lang_mismatch_flag_segments
        FROM strict_segments_final
        """,
    )
    candidate_summary = fetchone_dict(
        con,
        """
        SELECT
            count(*) AS candidate_segments,
            count(*) FILTER (WHERE segment_decision = 'kept') AS kept_segments,
            count(*) FILTER (WHERE segment_decision = 'foreign_segment') AS removed_foreign_segments,
            count(*) FILTER (WHERE segment_decision = 'ta_to_te_residual') AS removed_ta_to_te_residual_segments,
            count(*) FILTER (WHERE segment_decision = 'blank_unk_or_inaudible') AS removed_blank_unk_or_inaudible_segments
        FROM cleaned_segment_candidates
        """,
    )
    final_rollup_summary = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) AS videos,
            sum(total_segments) AS total_segments,
            sum(final_validated_segments) AS final_validated_segments,
            sum(final_missing_segments) AS final_missing_segments,
            sum(golden_segments) AS golden_segments,
            sum(redo_segments) AS redo_segments,
            sum(dispose_segments) AS dispose_segments,
            sum(target_language_mismatch_segments) AS remaining_target_language_mismatch_segments,
            sum(lang_mismatch_flag_segments) AS remaining_lang_mismatch_flag_segments,
            count(*) FILTER (WHERE target_language_mismatch_segments > 0) AS videos_with_remaining_target_language_mismatch_segments,
            count(*) FILTER (WHERE lang_mismatch_flag_segments > 0) AS videos_with_remaining_lang_mismatch_flag_segments
        FROM read_parquet('{(FINAL_DIR / "final_cleaned_video_rollup.parquet").as_posix()}')
        """,
    )
    final_source_breakdown = fetchone_dict(
        con,
        """
        SELECT
            count(*) FILTER (WHERE final_validation_source = 'historical') AS historical_segments,
            count(*) FILTER (WHERE final_validation_source = 'recover') AS recover_v1_segments,
            count(*) FILTER (WHERE final_validation_source = 'recover_v2') AS recover_v2_segments,
            count(*) FILTER (WHERE final_validation_source = 'missing') AS missing_segments
        FROM cleaned_segment_candidates
        WHERE segment_decision = 'kept'
        """,
    )
    final_sanity = fetchone_dict(
        con,
        f"""
        SELECT
            count(*) FILTER (WHERE segment_decision = 'kept' AND gemini_lang NOT IN ({target_sql}) AND gemini_lang <> '') AS remaining_foreign_segments,
            count(*) FILTER (WHERE segment_decision = 'kept' AND queue_language = 'ta' AND corrected_language = 'te' AND gemini_lang = 'ta') AS remaining_ta_to_te_residual_segments,
            count(*) FILTER (WHERE segment_decision = 'kept' AND (coalesce(num_unk, 0) > 0 OR regexp_matches(coalesce(transcription, ''), '\\\\[UNK\\\\]', 'i'))) AS remaining_unk_segments,
            count(*) FILTER (WHERE segment_decision = 'kept' AND (coalesce(num_inaudible, 0) > 0 OR regexp_matches(coalesce(transcription, ''), '\\\\[INAUDIBLE\\\\]', 'i'))) AS remaining_inaudible_segments,
            count(*) FILTER (WHERE segment_decision = 'kept' AND regexp_matches(coalesce(transcription, ''), '\\\\[NO_SPEECH\\\\]', 'i')) AS remaining_no_speech_segments
        FROM cleaned_segment_candidates
        """,
    )
    empty_video_drop = fetchone_dict(
        con,
        """
        SELECT
            (SELECT count(*) FROM kept_videos_after_video_filters) AS videos_after_video_filters,
            (SELECT count(DISTINCT video_id) FROM cleaned_segment_candidates WHERE segment_decision = 'kept') AS videos_after_segment_filters
        """,
    )

    summary = {
        "generated_at_epoch_s": round(time.time(), 3),
        "elapsed_s": round(time.time() - started, 2),
        "rule_config": {
            "target_languages": list(TARGET_LANGS),
            "foreign_share_drop_threshold_pct": 5.0,
            "drop_known_non_target_youtube_audio_language": True,
            "und_keep_rule": {
                "max_distinct_detected_languages": 3,
                "required_foreign_segments": 0,
            },
            "approved_theme_families": list(APPROVED_THEME_PATTERNS.keys()),
            "drop_ta_to_te_residual_segments": True,
            "drop_blank_unk_inaudible_segments": True,
        },
        "baselines": {
            "all_transcribed": {
                **baseline_all,
                "coverage_pct": pct(
                    baseline_all.get("final_validated_segments"),
                    baseline_all.get("total_segments"),
                ),
            },
            "selected_set": {
                **baseline_selected,
                "coverage_pct": pct(
                    baseline_selected.get("final_validated_segments"),
                    baseline_selected.get("total_segments"),
                ),
            },
            "strict_set": {
                **baseline_strict,
                "coverage_pct": pct(
                    baseline_strict.get("final_validated_segments"),
                    baseline_strict.get("total_segments"),
                ),
                **strict_foreign_summary,
            },
        },
        "video_filter_counts": {
            **video_filter_counts,
            "drop_any_pct_of_strict": pct(
                video_filter_counts.get("drop_any_videos"),
                video_filter_counts.get("strict_videos"),
            ),
            "kept_after_video_filters_pct_of_strict": pct(
                video_filter_counts.get("kept_after_video_filters_videos"),
                video_filter_counts.get("strict_videos"),
            ),
        },
        "segment_filter_counts": {
            **candidate_summary,
            "kept_segment_pct_after_video_filters": pct(
                candidate_summary.get("kept_segments"),
                candidate_summary.get("candidate_segments"),
            ),
            "videos_removed_to_zero_segments": (
                empty_video_drop.get("videos_after_video_filters", 0)
                - empty_video_drop.get("videos_after_segment_filters", 0)
            ),
        },
        "final_cleaned": {
            **final_rollup_summary,
            **final_source_breakdown,
            **final_sanity,
            "coverage_pct": pct(
                final_rollup_summary.get("final_validated_segments"),
                final_rollup_summary.get("total_segments"),
            ),
        },
        "outputs": {
            "excluded_videos_csv": "final_data/final_cleaned_excluded_videos.csv",
            "keep_videos_csv": "final_data/final_cleaned_keep_videos.csv",
            "segment_removal_summary_csv": "final_data/final_cleaned_segment_removal_summary.csv",
            "remaining_target_mismatch_pairs_csv": "final_data/final_cleaned_remaining_target_mismatch_pairs.csv",
            "video_rollup_parquet": "final_data/final_cleaned_video_rollup.parquet",
            "segments_parquet": "final_data/final_cleaned_segments.parquet",
        },
    }

    write_json(FINAL_DIR / "final_cleaned_summary.json", summary)
    print(json.dumps(summary, indent=2, sort_keys=True))


if __name__ == "__main__":
    main()
