o
    "Ůi`                     @  s   U d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	 ddl
Z
dZdZdZdZd	Zd
Zg dZded< dd Zd:ddZd;ddZd<ddZd=d>dd Zd?d#d$Zd@d(d)ZdAd,d-ZdBd2d3ZdCd5d6Zd7d8 Zed9krze  dS dS )Da  
Incremental phase-1 metadata build.

This version avoids the monolithic validation merge that can OOM on the full
recover dataset. Instead it:
  1. builds lightweight tx / flag helper tables once
  2. ingests recover validation shards in prefix batches
  3. ingests historical validation shards in prefix batches, skipping keys
     already covered by recover
  4. writes the final segment map one queue language at a time
  5. computes analytics from the on-disk segment map parquet dataset
    )annotationsN)Path   gffffff?g      ?g       @g333333?g      ?))video_idVARCHAR)segment_filer   )
duration_sDOUBLE)gemini_langr   )gemini_transcriptionr   )gemini_taggedr   )gemini_quality_scorer	   )speaker_infor   )mms_lang_iso3r   )mms_lang_iso1r   )mms_confidencer	   )mms_top3r   )vox_langr   )vox_lang_iso1r   )vox_confidencer	   )vox_top3r   )conformer_multi_transcriptionr   )conformer_multi_ctc_rawr	   )conformer_multi_ctc_normalizedr	   )wav2vec_transcriptionr   )wav2vec_ctc_rawr	   )wav2vec_ctc_normalizedr	   )wav2vec_model_usedr   )lid_consensusBOOLEAN)lid_agree_countINTEGER)consensus_langr   zlist[tuple[str, str]]VALIDATION_COLUMNSc                  C  s   t jdd} | jddd | jddd | jdd	d | jd
dd | jddd | jddd | jddd | jdtdd | jddd | jdtdd | jdtdd | jddd |  S )Nz9Incremental phase-1 transcript + validation metadata lake)descriptionz--txz"data/transcription_results.parquet)defaultz--flagsz data/transcription_flags.parquetz--queuezdata/video_queue.csv.gzz--historical-shardszdata/validation_shardsz--recover-shardszdata/recover_shards_rawz--output-dirzdata/phase1_incrementalz	--db-path z	--threads   )typer%   z--memory-limit24GBz--recover-batch-dirs   z--historical-batch-dirsz--overwrite
store_true)action)argparseArgumentParseradd_argumentint
parse_args)p r3   #scripts/build_phase1_incremental.pyr1   =   s   r1   pathr   c                 C  s.   |   sd S |  rt|  d S |   d S N)existsis_dirshutilrmtreeunlinkr5   r3   r3   r4   remove_pathN   s
   r=   strreturnc                 C  s   |  ddS )N'z'')replacer<   r3   r3   r4   	sql_quoteW   s   rB   paths	list[str]c                 C  s    d dd | D }d| dS )N, c                 s  s     | ]}d t | d V  qdS )r@   N)rB   .0r2   r3   r3   r4   	<genexpr>\   s    z(parquet_glob_list_sql.<locals>.<genexpr>[])join)rC   quotedr3   r3   r4   parquet_glob_list_sql[   s   rM   Tinclude_bucketboolc                 C  s0   dd t D }|d | r|d d|S )Nc                 S  s   g | ]\}}| d | qS ) r3   )rG   namesql_typer3   r3   r4   
<listcomp>a   s    z)validation_schema_sql.<locals>.<listcomp>zvalidation_source VARCHARzprovisional_bucket VARCHARrE   )r#   appendrK   )rN   colsr3   r3   r4   validation_schema_sql`   s
   


rV   payloaddictc                 C  s   |  tj|ddd d S )N   Tindent	sort_keys)
write_textjsondumps)r5   rW   r3   r3   r4   
write_jsonh   s   r`   conduckdb.DuckDBPyConnectionqueryc                 C  s<   |  |}| }|d u ri S dd |jD }tt||S )Nc                 S     g | ]}|d  qS r   r3   rG   dr3   r3   r4   rS   q       z!fetchone_dict.<locals>.<listcomp>)executefetchoner$   rX   zip)ra   rc   relrowrU   r3   r3   r4   fetchone_dictl   s   
rn   root
list[Path]c                 C  s&   |   sg S dd t|  D }|S )Nc                 S  s&   g | ]}|  rt|d r|qS 	*.parquet)r8   anyglobrF   r3   r3   r4   rS   x   s   & z%shard_prefix_dirs.<locals>.<listcomp>)r7   sortediterdir)ro   dirsr3   r3   r4   shard_prefix_dirsu   s   rx   itemssizer0   list[list[Path]]c                   s     fddt dt D S )Nc                   s   g | ]
} ||  qS r3   r3   )rG   iry   rz   r3   r4   rS   }   s    zbatched.<locals>.<listcomp>r   )rangelenr}   r3   r}   r4   batched|   s    r   rw   c                 C  s   dd | D }dt | dS )Nc                 S  s   g | ]}|d    qS rq   )as_posixrf   r3   r3   r4   rS      s    z"batch_read_sql.<locals>.<listcomp>zread_parquet(z., hive_partitioning=false, union_by_name=true))rM   )rw   globsr3   r3   r4   batch_read_sql   s   r   c            +      C  sj  t  } t| j}t| j}t| j}t| j}t| j}t| j}|jddd |	 s3t
d| |	 s>t
d| |	 sIt
d| | jrQt| jn|d }|d }|d }	|d	 }
| jrtt| t| t|	 t|
 |	jddd |
jddd t|}t|}|st
d
| |st
d| tt|}|d| j  |d| j d |d |d|
  d |d t }td t| }t| }t| }|d| d |d| d |d| d td |d |d d }td|d td |d td  |d! |d" d#d$d% tD }d&t d't d(t d)t d*t  d+t! d,}|d-t"  d. |d/t"  d. td0 t#|| j$}t%|d1d2D ]P\}}t&|}|d3 |d4| d5| d6| d7| d8	 |d9' }|d: d }td;| d<t(| d=t(| d>t(|dd?|d
 |d3 qt|d@ tdA t#|| j)}t%|d1d2D ]_\}}t&|}|d3 |dB |d4| d5| d6| dC| d8	 |dD |dE' }|dF d }tdG| d<t(| d=t(| d>t(|dd?|d
 |d3 |dB q|dH |dI |dJ d }tdK|d tdL |jddd dMdN |dO' D }|D ].}|dP|  }|jddd |dQ } |dRt| dSt|   dT tdU|  qnt|dV dW  }!|dX|! dY tdZ |	d[ }"|	d\ }#|	d] }$|	d^ }%|	d_ }&|	d` }'|dat|"  db |dct|#  db |ddt|$  db |det|%  db |dft|&  db |dgt|'  db t*|dh}(t*|di})dj|(dk< t+t dl|(dm< t||(dn< t||(do< djt|t|t|t|t|t(|t(|dpt|t|t|	t|"t|#t|$t|%t|&t|'dq	ttt t!ttdr|(|)t+t | dsdt}*t,|	du |( t,|	dv |) t,|	dw |* tt-j.|(dsddx tdyt | dzd{ d S )|NT)parentsexist_okzMissing transcription parquet: z%Missing transcription flags parquet: zMissing queue snapshot: zphase1_incremental.duckdbsegment_map_v1analytics_v1
duckdb_tmpz#No recover shard prefixes found in z&No historical shard prefixes found in zSET threads = zSET memory_limit = 'r@   z$SET preserve_insertion_order = falsezSET temp_directory = 'zPRAGMA enable_progress_barz0Building phase-1 metadata lake incrementally ...z=CREATE OR REPLACE VIEW tx_raw AS SELECT * FROM read_parquet('z')z@CREATE OR REPLACE VIEW flags_raw AS SELECT * FROM read_parquet('zACREATE OR REPLACE VIEW queue_raw AS SELECT * FROM read_csv_auto('z', header=true)z(  Step 1a: Finding duplicate tx keys ...z
        CREATE OR REPLACE TABLE tx_dup_keys AS
        SELECT video_id, segment_file, max(id) AS keep_id
        FROM tx_raw
        GROUP BY video_id, segment_file
        HAVING count(*) > 1
    z SELECT count(*) FROM tx_dup_keysr   z    duplicate keys: ,z)  Step 1b: Building tx_canonical view ...a]  
        CREATE OR REPLACE VIEW tx_canonical AS
        SELECT
            tx_raw.*,
            regexp_matches(segment_file, '_split[0-9]+$') AS is_split_segment,
            regexp_replace(segment_file, '_split[0-9]+$', '') AS parent_segment_file,
            TRY_CAST(regexp_extract(segment_file, '_split([0-9]+)$', 1) AS INTEGER) AS split_index_from_id
        FROM tx_raw
        WHERE NOT EXISTS (
            SELECT 1 FROM tx_dup_keys dk
            WHERE dk.video_id = tx_raw.video_id
              AND dk.segment_file = tx_raw.segment_file
              AND dk.keep_id != tx_raw.id
        )
    z'  Step 2: Building tx helper tables ...z
        CREATE OR REPLACE TABLE segment_name_uniqueness AS
        SELECT
            segment_file,
            count(DISTINCT video_id) AS segment_name_video_count
        FROM tx_canonical
        GROUP BY segment_file
    a[  
        CREATE OR REPLACE TABLE flag_summary_by_segment_name AS
        SELECT
            segment_id AS segment_file,
            count(*) AS flag_rows_total,
            count(DISTINCT flag_type) AS flag_types_distinct,
            string_agg(DISTINCT flag_type, ',' ORDER BY flag_type) AS flag_types_csv,
            count(*) FILTER (WHERE flag_type = 'timeout') AS timeout_flag_rows,
            count(*) FILTER (WHERE flag_type = 'error') AS error_flag_rows,
            count(*) FILTER (WHERE flag_type = 'rate_limited') AS rate_limited_flag_rows,
            count(*) FILTER (WHERE flag_type = 'lang_mismatch') AS lang_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'tag_text_mismatch') AS tag_text_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'suspicious_length_ratio') AS suspicious_length_ratio_flag_rows,
            count(*) FILTER (WHERE flag_type = 'high_unk_density') AS high_unk_density_flag_rows,
            count(*) FILTER (WHERE flag_type = 'empty_transcription') AS empty_transcription_flag_rows
        FROM flags_raw
        GROUP BY segment_id
    rE   c                 s  s    | ]\}}|V  qd S r6   r3   )rG   rQ   _r3   r3   r4   rH      s    zmain.<locals>.<genexpr>z
        CASE
            WHEN lid_consensus = false AND COALESCE(lid_agree_count, 0) < 2 THEN 'dispose'
            WHEN conformer_multi_ctc_normalized IS NOT NULL
                 AND conformer_multi_ctc_normalized < z. THEN 'dispose'
            WHEN duration_s < zA THEN 'dispose'
            WHEN COALESCE(lid_agree_count, 0) >= z9
                 AND (conformer_multi_ctc_normalized >= zp
                      OR conformer_multi_ctc_normalized IS NULL)
                 AND (gemini_quality_score >= z
                      OR gemini_quality_score = 0
                      OR gemini_quality_score IS NULL)
                 AND duration_s >= z7 THEN 'golden'
            ELSE 'redo'
        END
    z3CREATE OR REPLACE TABLE validation_recover_unique ()z6CREATE OR REPLACE TABLE validation_historical_unique (z3  Step 3: Ingesting recover validation prefixes ...   )startzDROP TABLE IF EXISTS val_batchzt
            CREATE TEMP TABLE val_batch AS
            WITH ranked AS (
                SELECT
                    a  ,
                    ROW_NUMBER() OVER (
                        PARTITION BY video_id, segment_file
                        ORDER BY
                            CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                            conformer_multi_ctc_normalized DESC NULLS LAST,
                            mms_confidence DESC NULLS LAST
                    ) AS rn
                FROM z2
            )
            SELECT
                zB,
                'recover' AS validation_source,
                zP AS provisional_bucket
            FROM ranked
            WHERE rn = 1
        ab  
            INSERT INTO validation_recover_unique
            SELECT b.*
            FROM val_batch b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_recover_unique t
                WHERE t.video_id = b.video_id
                  AND t.segment_file = b.segment_file
            )
            RETURNING 1
        z.SELECT count(*) FROM validation_recover_uniquez    recover batch /z: z dirs, inserted z, total zjCREATE INDEX IF NOT EXISTS idx_validation_recover_key ON validation_recover_unique(video_id, segment_file)z6  Step 4: Ingesting historical validation prefixes ...z'DROP TABLE IF EXISTS val_batch_filteredzE,
                'historical' AS validation_source,
                aL  
            CREATE TEMP TABLE val_batch_filtered AS
            SELECT b.*
            FROM val_batch b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_recover_unique r
                WHERE r.video_id = b.video_id
                  AND r.segment_file = b.segment_file
            )
        aq  
            INSERT INTO validation_historical_unique
            SELECT b.*
            FROM val_batch_filtered b
            WHERE NOT EXISTS (
                SELECT 1
                FROM validation_historical_unique t
                WHERE t.video_id = b.video_id
                  AND t.segment_file = b.segment_file
            )
            RETURNING 1
        z1SELECT count(*) FROM validation_historical_uniquez    historical batch zjCREATE INDEX IF NOT EXISTS idx_validation_hist_key ON validation_historical_unique(video_id, segment_file)z
        CREATE OR REPLACE VIEW validation_final AS
        SELECT * FROM validation_recover_unique
        UNION ALL
        SELECT * FROM validation_historical_unique
    z%SELECT count(*) FROM validation_finalz    validation_final rows: z3  Step 5: Writing segment map by queue language ...c                 S  rd   re   r3   )rG   rm   r3   r3   r4   rS     rh   zmain.<locals>.<listcomp>zp
        SELECT DISTINCT COALESCE(language, 'unknown') AS lang
        FROM queue_raw
        ORDER BY lang
    zqueue_language=zpart-000.parqueta"  
            COPY (
                SELECT
                    tx.video_id,
                    COALESCE(q.language, 'unknown') AS queue_language,
                    tx.segment_file,
                    tx.parent_segment_file,
                    tx.is_split_segment,
                    tx.split_index_from_id,
                    tx.speaker_id,
                    tx.original_start_ms,
                    tx.original_end_ms,
                    tx.trimmed_start_ms,
                    tx.trimmed_end_ms,
                    tx.leading_pad_ms,
                    tx.trailing_pad_ms,
                    tx.expected_language_hint,
                    tx.detected_language AS tx_detected_language,
                    tx.lang_mismatch_flag,
                    tx.transcription,
                    tx.tagged,
                    tx.speaker_emotion,
                    tx.speaker_style,
                    tx.speaker_pace,
                    tx.speaker_accent,
                    tx.num_unk,
                    tx.num_inaudible,
                    tx.num_event_tags,
                    tx.boundary_score,
                    tx.text_length_per_sec,
                    tx.overlap_suspected,
                    tx.quality_score AS tx_quality_score,
                    tx.alignment_score,
                    tx.asr_eligible,
                    tx.tts_clean_eligible,
                    tx.tts_expressive_eligible,
                    tx.prompt_version,
                    tx.schema_version,
                    tx.trimmer_version,
                    tx.validator_version,
                    tx.model_id,
                    tx.temperature,
                    tx.thinking_level,
                    tx.provider,
                    tx.worker_id,
                    tx.cache_hit,
                    tx.token_usage_json,
                    tx.created_at,
                    uniq.segment_name_video_count,
                    uniq.segment_name_video_count = 1 AS flag_join_safe,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_rows_total END AS flag_rows_total,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_distinct END AS flag_types_distinct,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_csv END AS flag_types_csv,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.timeout_flag_rows END AS timeout_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.error_flag_rows END AS error_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.rate_limited_flag_rows END AS rate_limited_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.lang_mismatch_flag_rows END AS lang_mismatch_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.tag_text_mismatch_flag_rows END AS tag_text_mismatch_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.suspicious_length_ratio_flag_rows END AS suspicious_length_ratio_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.high_unk_density_flag_rows END AS high_unk_density_flag_rows,
                    CASE WHEN uniq.segment_name_video_count = 1 THEN flags.empty_transcription_flag_rows END AS empty_transcription_flag_rows,
                    val.validation_source,
                    val.validation_source IS NOT NULL AS has_validation,
                    val.duration_s,
                    val.gemini_lang,
                    val.gemini_transcription,
                    val.gemini_tagged,
                    val.gemini_quality_score,
                    val.speaker_info,
                    val.mms_lang_iso3,
                    val.mms_lang_iso1,
                    val.mms_confidence,
                    val.mms_top3,
                    val.vox_lang,
                    val.vox_lang_iso1,
                    val.vox_confidence,
                    val.vox_top3,
                    val.conformer_multi_transcription,
                    val.conformer_multi_ctc_raw,
                    val.conformer_multi_ctc_normalized,
                    val.wav2vec_transcription,
                    val.wav2vec_ctc_raw,
                    val.wav2vec_ctc_normalized,
                    val.wav2vec_model_used,
                    val.lid_consensus,
                    val.lid_agree_count,
                    val.consensus_lang,
                    COALESCE(val.provisional_bucket, 'missing') AS provisional_bucket
                FROM tx_canonical tx
                LEFT JOIN queue_raw q USING (video_id)
                LEFT JOIN segment_name_uniqueness uniq
                    ON tx.segment_file = uniq.segment_file
                LEFT JOIN flag_summary_by_segment_name flags
                    ON tx.segment_file = flags.segment_file
                LEFT JOIN validation_final val
                    ON tx.video_id = val.video_id
                   AND tx.segment_file = val.segment_file
                WHERE COALESCE(q.language, 'unknown') = 'z'
            ) TO 'z-' (FORMAT PARQUET, COMPRESSION ZSTD)
        z    wrote queue_language=z**rr   zV
        CREATE OR REPLACE VIEW segment_map_v1 AS
        SELECT * FROM read_parquet('z3', hive_partitioning=true, union_by_name=true)
    z'  Step 6: Writing analytics outputs ...zlanguage_rollup.parquetzvideo_rollup.parquetzbucket_rollup.parquetzmodel_rollup.parquetzdisagreement_rollup.parquetz!missing_validation_videos.parqueta  
        COPY (
            SELECT
                queue_language,
                count(*) AS total_segments,
                count(DISTINCT video_id) AS total_videos,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 4) AS validation_coverage_pct,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY total_segments DESC
        ) TO 'z)' (FORMAT PARQUET, COMPRESSION ZSTD)
    au  
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS total_segments,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY video_id
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                provisional_bucket,
                count(*) AS segments,
                count(DISTINCT video_id) AS videos,
                round(sum(duration_s) / 3600, 4) AS hours
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language, provisional_bucket
            ORDER BY queue_language, provisional_bucket
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p10_mms_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p50_mms_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p90_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p10_vox_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p50_vox_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p90_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p10_conformer_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p50_conformer_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p90_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p10_wav2vec_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p50_wav2vec_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p90_wav2vec_ctc
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE has_validation AND gemini_lang = mms_lang_iso1) AS gemini_mms_match,
                count(*) FILTER (WHERE has_validation AND gemini_lang = vox_lang_iso1) AS gemini_vox_match,
                count(*) FILTER (WHERE has_validation AND mms_lang_iso1 = vox_lang_iso1) AS mms_vox_match,
                count(*) FILTER (
                    WHERE has_validation
                      AND gemini_lang = mms_lang_iso1
                      AND gemini_lang = vox_lang_iso1
                ) AS all_three_agree,
                count(*) FILTER (
                    WHERE has_validation
                      AND (
                          gemini_lang IS NULL OR gemini_lang = ''
                          OR mms_lang_iso1 IS NULL OR mms_lang_iso1 = ''
                          OR vox_lang_iso1 IS NULL OR vox_lang_iso1 = ''
                      )
                ) AS missing_any_lid
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO 'ac  
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS missing_validation_segments
            FROM segment_map_v1
            WHERE NOT has_validation
            GROUP BY video_id
            ORDER BY missing_validation_segments DESC, video_id
        ) TO 'a   
        SELECT
            count(*) AS total_segments,
            count(DISTINCT video_id) AS total_videos,
            count(*) FILTER (WHERE has_validation) AS validated_segments,
            count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
            count(DISTINCT CASE WHEN NOT has_validation THEN video_id END) AS videos_with_missing_validation,
            count(*) FILTER (WHERE validation_source = 'historical') AS historical_validation_segments,
            count(*) FILTER (WHERE validation_source = 'recover') AS recover_validation_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
            count(*) FILTER (WHERE provisional_bucket = 'missing') AS missing_bucket_segments,
            count(*) FILTER (WHERE is_split_segment) AS split_segments,
            count(*) FILTER (WHERE flag_join_safe) AS flag_join_safe_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS flag_join_ambiguous_segments,
            round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 6) AS validation_coverage_pct
        FROM segment_map_v1
    ai  
        SELECT
            count(*) AS total_segments,
            count(*) FILTER (WHERE flag_join_safe) AS safe_join_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS ambiguous_join_segments,
            count(*) FILTER (WHERE flag_join_safe AND flag_rows_total IS NOT NULL) AS safe_join_segments_with_flags
        FROM segment_map_v1
    zphase-1-incrementalphaser   generated_at_epoch_sduckdb_pathsegment_map_path)txflagsqueuehistorical_shardsrecover_shardsrecover_prefixeshistorical_prefixes)	r   r   analytics_dirlanguage_rollupvideo_rollupbucket_rollupmodel_rollupdisagreement_rollupmissing_validation_videos)golden_lid_agreegolden_ctc_mingolden_quality_mingolden_duration_mindispose_ctc_maxdispose_duration_maxrY   )r   inputsoutputs
thresholdsglobal_summaryflag_join_summary	elapsed_szglobal_summary.jsonzflag_join_summary.jsonzphase1_run_manifest.jsonrZ   z&Phase-1 incremental build complete in z.1fs)/r1   r   r   r   r   r   r   
output_dirmkdirr7   
SystemExitdb_path	overwriter=   rx   duckdbconnectr>   ri   threadsmemory_limitr   timeprintrB   rj   rK   r#   DISPOSE_CTC_MAXDISPOSE_DURATION_MAXGOLDEN_LID_AGREEGOLDEN_CTC_MINGOLDEN_QUALITY_MINGOLDEN_DURATION_MINrV   r   recover_batch_dirs	enumerater   fetchallr   historical_batch_dirsrn   roundr`   r^   r_   )+argstx_path
flags_path
queue_pathhistorical_dirrecover_dirr   r   segment_map_dirr   temp_dirr   r   ra   t0tx_sql	flags_sql	queue_sql	dup_countval_colsbucket_caserecover_batches	batch_idxrw   	batch_sqlinserted
total_rowshistorical_batchesvalidation_final_rows	languageslangout_dirout_pathsegment_map_globlanguage_rollup_pathvideo_rollup_pathbucket_rollup_pathmodel_rollup_pathdisagreement_rollup_pathmissing_validation_videos_pathr   r   run_manifestr3   r3   r4   main   s  











	









a
b
d















	
#r   __main__)r5   r   )r5   r>   r?   r>   )rC   rD   r?   r>   )T)rN   rO   r?   r>   )r5   r   rW   rX   )ra   rb   rc   r>   r?   rX   )ro   r   r?   rp   )ry   rp   rz   r0   r?   r{   )rw   rp   r?   r>   )__doc__
__future__r   r-   r^   mathr9   r   pathlibr   r   r   r   r   r   r   r   r#   __annotations__r1   r=   rB   rM   rV   r`   rn   rx   r   r   r   __name__r3   r3   r3   r4   <module>   sD    

	



	

    C
