o
    ⽮im                     @  s   U d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	Z	dZ
dZdZdZd	Zd
Zg dZded< dd Zd'ddZd(ddZd)ddZd*dd Zd+d"d#Zd$d% Zed&krae  dS dS ),a  
Build the phase-1 metadata lake for transcript + validation analytics.

Phase 1 intentionally stays metadata-only:
  - dedupe the transcription corpus
  - dedupe and merge historical + recover validation shards
  - build one canonical per-segment parquet map
  - emit analytics rollups and provisional bucket assignments

This avoids replaying raw audio or materializing child audio segments while
still giving us a nearly complete final corpus for thresholding and redo work.
    )annotationsN)Path   gffffff?g      ?g       @g333333?g      ?))video_idVARCHAR)segment_filer   )
duration_sDOUBLE)gemini_langr   )gemini_transcriptionr   )gemini_taggedr   )gemini_quality_scorer	   )speaker_infor   )mms_lang_iso3r   )mms_lang_iso1r   )mms_confidencer	   )mms_top3r   )vox_langr   )vox_lang_iso1r   )vox_confidencer	   )vox_top3r   )conformer_multi_transcriptionr   )conformer_multi_ctc_rawr	   )conformer_multi_ctc_normalizedr	   )wav2vec_transcriptionr   )wav2vec_ctc_rawr	   )wav2vec_ctc_normalizedr	   )wav2vec_model_usedr   )lid_consensusBOOLEAN)lid_agree_countINTEGER)consensus_langr   zlist[tuple[str, str]]VALIDATION_COLUMNSc                  C  s   t jdd} | jddd | jddd | jdd	d | jd
dd | jddd | jddd | jddd | jdtdd | jddd | jddd |  S )Nz3Build phase-1 transcript + validation metadata lake)descriptionz--txz"data/transcription_results.parquet)defaultz--flagsz data/transcription_flags.parquetz--queuezdata/video_queue.csv.gzz--historical-shardszdata/validation_shardsz--recover-shardszdata/recover_validation_shardsz--output-dirzdata/phase1z	--db-path z	--threads   )typer%   z--memory-limit24GBz--overwrite
store_true)action)argparseArgumentParseradd_argumentint
parse_args)p r2   scripts/build_phase1_dataset.pyr0   <   s   r0   pathr   c                 C  s.   |   sd S |  rt|  d S |   d S N)existsis_dirshutilrmtreeunlinkr4   r2   r2   r3   remove_pathK   s
   r<   returnboolc                 C  s   |   o
t| dS )N	*.parquet)r6   anyrglobr;   r2   r2   r3   has_parquet_filesT   s   rB   strc                  C  s    d dd tD } d|  dS )N, c                 s  s$    | ]\}}d | d| V  qdS )zCAST(NULL AS z) AS Nr2   ).0namesql_typer2   r2   r3   	<genexpr>Y   s   " z*empty_validation_select.<locals>.<genexpr>SELECT z WHERE 1 = 0)joinr#   )colsr2   r2   r3   empty_validation_selectX   s   rL   conduckdb.DuckDBPyConnectionquerydictc                 C  s<   |  |}| }|d u ri S dd |jD }tt||S )Nc                 S  s   g | ]}|d  qS )r   r2   )rE   dr2   r2   r3   
<listcomp>b   s    z!fetchone_dict.<locals>.<listcomp>)executefetchoner$   rP   zip)rM   rO   relrowrK   r2   r2   r3   fetchone_dict]   s   
rX   payloadc                 C  s   |  tj|ddd d S )N   Tindent	sort_keys)
write_textjsondumps)r4   rY   r2   r2   r3   
write_jsonf   s   ra   c            #      C  s  t  } t| j}t| j}t| j}t| j}t| j}t| j}|jddd |	 s3t
d| |	 s>t
d| |	 sIt
d| | jrQt| jn|d }|d }|d }	| jrlt| t| t|	 |	jddd t|}
t|}|d	 d
  }|d	 d
  }tt|}|d| j  |d| j d |d |d }|jddd |d|  d |d t }td | dd}| dd}| dd}|d| d |d| d |d| d |
rdddd tD  d| dnt }|r+dddd tD  d| dnt }td  |d! |d" d# }td$|d% td& |d' |d( |d) dd*d tD }d+t d,t d-t d.t  d/t! d0t" d1}td2 |r|d3| d4| d5 n
|d6t  d7 td8|d9 d# d% td: |
r|d;| d<| d5 n
|d=t  d7 td>|d? d# d% td@ |dA| dB| dC |dD |dE tdF|dG d# d% |dH | dd}|dI| dJ |	dK }|	dL }|	dM }|	dN }|	dO }|	dP }|dQ|  dR |dS|  dR |dT|  dR |dU|  dR |dV|  dR |dW|  dR t#|dX} |
| dY< || dZ< d[| d\< t$t d]| d^< t|| d_< t|| d`< t#|da}!d[t|t|t|t|t||
|dbt|t|t|	t|t|t|t|t|t|dc	tt t!t"ttdd| |!t$t | dedf}"t%|	dg |  t%|	dh |! t%|	di |" tt&j'| deddj tdkt | dldm d S )nNT)parentsexist_okzMissing transcription parquet: z%Missing transcription flags parquet: zMissing queue snapshot: zphase1.duckdbsegment_map_v1analytics_v1z**r?   zSET threads = zSET memory_limit = ''z$SET preserve_insertion_order = false
duckdb_tmpzSET temp_directory = 'zPRAGMA enable_progress_barz"Building phase-1 metadata lake ...z''zN
        CREATE OR REPLACE VIEW tx_raw AS
        SELECT * FROM read_parquet('z')
    zQ
        CREATE OR REPLACE VIEW flags_raw AS
        SELECT * FROM read_parquet('zR
        CREATE OR REPLACE VIEW queue_raw AS
        SELECT * FROM read_csv_auto('z', header=true)
    rI   rD   c                 s      | ]\}}|V  qd S r5   r2   rE   rF   _r2   r2   r3   rH          zmain.<locals>.<genexpr>z FROM read_parquet('z', hive_partitioning=false)c                 s  rh   r5   r2   ri   r2   r2   r3   rH      rk   z6  Step 1a: Finding duplicate tx keys (lightweight) ...z
        CREATE OR REPLACE TABLE tx_dup_keys AS
        SELECT video_id, segment_file, max(id) AS keep_id
        FROM tx_raw
        GROUP BY video_id, segment_file
        HAVING count(*) > 1
    z SELECT count(*) FROM tx_dup_keysr   z    duplicate keys: ,z>  Step 1b: Building tx_canonical (streaming, no full sort) ...a]  
        CREATE OR REPLACE VIEW tx_canonical AS
        SELECT
            tx_raw.*,
            regexp_matches(segment_file, '_split[0-9]+$') AS is_split_segment,
            regexp_replace(segment_file, '_split[0-9]+$', '') AS parent_segment_file,
            TRY_CAST(regexp_extract(segment_file, '_split([0-9]+)$', 1) AS INTEGER) AS split_index_from_id
        FROM tx_raw
        WHERE NOT EXISTS (
            SELECT 1 FROM tx_dup_keys dk
            WHERE dk.video_id = tx_raw.video_id
              AND dk.segment_file = tx_raw.segment_file
              AND dk.keep_id != tx_raw.id
        )
    z
        CREATE OR REPLACE TABLE segment_name_uniqueness AS
        SELECT
            segment_file,
            count(DISTINCT video_id) AS segment_name_video_count
        FROM tx_canonical
        GROUP BY segment_file
    a[  
        CREATE OR REPLACE TABLE flag_summary_by_segment_name AS
        SELECT
            segment_id AS segment_file,
            count(*) AS flag_rows_total,
            count(DISTINCT flag_type) AS flag_types_distinct,
            string_agg(DISTINCT flag_type, ',' ORDER BY flag_type) AS flag_types_csv,
            count(*) FILTER (WHERE flag_type = 'timeout') AS timeout_flag_rows,
            count(*) FILTER (WHERE flag_type = 'error') AS error_flag_rows,
            count(*) FILTER (WHERE flag_type = 'rate_limited') AS rate_limited_flag_rows,
            count(*) FILTER (WHERE flag_type = 'lang_mismatch') AS lang_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'tag_text_mismatch') AS tag_text_mismatch_flag_rows,
            count(*) FILTER (WHERE flag_type = 'suspicious_length_ratio') AS suspicious_length_ratio_flag_rows,
            count(*) FILTER (WHERE flag_type = 'high_unk_density') AS high_unk_density_flag_rows,
            count(*) FILTER (WHERE flag_type = 'empty_transcription') AS empty_transcription_flag_rows
        FROM flags_raw
        GROUP BY segment_id
    c                 s  rh   r5   r2   ri   r2   r2   r3   rH      rk   z
        CASE
            WHEN lid_consensus = false AND COALESCE(lid_agree_count, 0) < 2 THEN 'dispose'
            WHEN conformer_multi_ctc_normalized IS NOT NULL
                 AND conformer_multi_ctc_normalized < z. THEN 'dispose'
            WHEN duration_s < zA THEN 'dispose'
            WHEN COALESCE(lid_agree_count, 0) >= z9
                 AND (conformer_multi_ctc_normalized >= zp
                      OR conformer_multi_ctc_normalized IS NULL)
                 AND (gemini_quality_score >= z
                      OR gemini_quality_score = 0
                      OR gemini_quality_score IS NULL)
                 AND duration_s >= z7 THEN 'golden'
            ELSE 'redo'
        END
    z0  Step 4a: Loading recover validation shards ...zG
            CREATE OR REPLACE TABLE val_recover AS
            SELECT z@, 'recover' AS validation_source
            FROM read_parquet('z8', hive_partitioning=false, union_by_name=true)
        z@
            CREATE OR REPLACE TABLE val_recover AS
            z5, CAST(NULL AS VARCHAR) AS validation_source
        z    recover rows: z SELECT count(*) FROM val_recoverz3  Step 4b: Loading historical validation shards ...zJ
            CREATE OR REPLACE TABLE val_historical AS
            SELECT zC, 'historical' AS validation_source
            FROM read_parquet('zC
            CREATE OR REPLACE TABLE val_historical AS
            z    historical rows: z#SELECT count(*) FROM val_historicalz;  Step 4c: Merging and deduplicating validation sources ...a  
        CREATE OR REPLACE TABLE validation_final AS
        WITH unioned AS (
            SELECT *, 0 AS source_rank FROM val_recover
            UNION ALL
            SELECT *, 1 AS source_rank FROM val_historical
        ),
        ranked AS (
            SELECT
                unioned.*,
                ROW_NUMBER() OVER (
                    PARTITION BY video_id, segment_file
                    ORDER BY
                        source_rank ASC,
                        CASE WHEN conformer_multi_ctc_normalized IS NULL THEN 1 ELSE 0 END ASC,
                        conformer_multi_ctc_normalized DESC NULLS LAST,
                        mms_confidence DESC NULLS LAST
                ) AS rn
            FROM unioned
        )
        SELECT
            z-,
            validation_source,
            zD AS provisional_bucket
        FROM ranked
        WHERE rn = 1
    z DROP TABLE IF EXISTS val_recoverz#DROP TABLE IF EXISTS val_historicalz    validation_final rows: z%SELECT count(*) FROM validation_finala  
        CREATE OR REPLACE TABLE segment_map_v1 AS
        SELECT
            tx.video_id,
            COALESCE(q.language, 'unknown') AS queue_language,
            tx.segment_file,
            tx.parent_segment_file,
            tx.is_split_segment,
            tx.split_index_from_id,
            tx.speaker_id,
            tx.original_start_ms,
            tx.original_end_ms,
            tx.trimmed_start_ms,
            tx.trimmed_end_ms,
            tx.leading_pad_ms,
            tx.trailing_pad_ms,
            tx.expected_language_hint,
            tx.detected_language AS tx_detected_language,
            tx.lang_mismatch_flag,
            tx.transcription,
            tx.tagged,
            tx.speaker_emotion,
            tx.speaker_style,
            tx.speaker_pace,
            tx.speaker_accent,
            tx.num_unk,
            tx.num_inaudible,
            tx.num_event_tags,
            tx.boundary_score,
            tx.text_length_per_sec,
            tx.overlap_suspected,
            tx.quality_score AS tx_quality_score,
            tx.alignment_score,
            tx.asr_eligible,
            tx.tts_clean_eligible,
            tx.tts_expressive_eligible,
            tx.prompt_version,
            tx.schema_version,
            tx.trimmer_version,
            tx.validator_version,
            tx.model_id,
            tx.temperature,
            tx.thinking_level,
            tx.provider,
            tx.worker_id,
            tx.cache_hit,
            tx.token_usage_json,
            tx.created_at,
            uniq.segment_name_video_count,
            uniq.segment_name_video_count = 1 AS flag_join_safe,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_rows_total END AS flag_rows_total,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_distinct END AS flag_types_distinct,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.flag_types_csv END AS flag_types_csv,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.timeout_flag_rows END AS timeout_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.error_flag_rows END AS error_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.rate_limited_flag_rows END AS rate_limited_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.lang_mismatch_flag_rows END AS lang_mismatch_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.tag_text_mismatch_flag_rows END AS tag_text_mismatch_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.suspicious_length_ratio_flag_rows END AS suspicious_length_ratio_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.high_unk_density_flag_rows END AS high_unk_density_flag_rows,
            CASE WHEN uniq.segment_name_video_count = 1 THEN flags.empty_transcription_flag_rows END AS empty_transcription_flag_rows,
            val.validation_source,
            val.validation_source IS NOT NULL AS has_validation,
            val.duration_s,
            val.gemini_lang,
            val.gemini_transcription,
            val.gemini_tagged,
            val.gemini_quality_score,
            val.speaker_info,
            val.mms_lang_iso3,
            val.mms_lang_iso1,
            val.mms_confidence,
            val.mms_top3,
            val.vox_lang,
            val.vox_lang_iso1,
            val.vox_confidence,
            val.vox_top3,
            val.conformer_multi_transcription,
            val.conformer_multi_ctc_raw,
            val.conformer_multi_ctc_normalized,
            val.wav2vec_transcription,
            val.wav2vec_ctc_raw,
            val.wav2vec_ctc_normalized,
            val.wav2vec_model_used,
            val.lid_consensus,
            val.lid_agree_count,
            val.consensus_lang,
            COALESCE(val.provisional_bucket, 'missing') AS provisional_bucket
        FROM tx_canonical tx
        LEFT JOIN queue_raw q USING (video_id)
        LEFT JOIN segment_name_uniqueness uniq
            ON tx.segment_file = uniq.segment_file
        LEFT JOIN flag_summary_by_segment_name flags
            ON tx.segment_file = flags.segment_file
        LEFT JOIN validation_final val
            ON tx.video_id = val.video_id
           AND tx.segment_file = val.segment_file
    z)
        COPY segment_map_v1
        TO 'zP'
        (FORMAT PARQUET, COMPRESSION ZSTD, PARTITION_BY (queue_language))
    zlanguage_rollup.parquetzvideo_rollup.parquetzbucket_rollup.parquetzmodel_rollup.parquetzdisagreement_rollup.parquetz!missing_validation_videos.parqueta  
        COPY (
            SELECT
                queue_language,
                count(*) AS total_segments,
                count(DISTINCT video_id) AS total_videos,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 4) AS validation_coverage_pct,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY total_segments DESC
        ) TO 'z)' (FORMAT PARQUET, COMPRESSION ZSTD)
    au  
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS total_segments,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
                count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
                count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
                count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
                round(avg(tx_quality_score), 6) AS avg_tx_quality_score,
                round(avg(gemini_quality_score), 6) AS avg_validation_gemini_quality,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(100.0 * count(*) FILTER (WHERE lid_consensus) / NULLIF(count(*) FILTER (WHERE has_validation), 0), 4) AS lid_consensus_pct
            FROM segment_map_v1
            GROUP BY video_id
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                provisional_bucket,
                count(*) AS segments,
                count(DISTINCT video_id) AS videos,
                round(sum(duration_s) / 3600, 4) AS hours
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language, provisional_bucket
            ORDER BY queue_language, provisional_bucket
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                round(avg(mms_confidence), 6) AS avg_mms_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p10_mms_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p50_mms_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY mms_confidence), 6) AS p90_mms_confidence,
                round(avg(vox_confidence), 6) AS avg_vox_confidence,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p10_vox_confidence,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p50_vox_confidence,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY vox_confidence), 6) AS p90_vox_confidence,
                round(avg(conformer_multi_ctc_normalized), 6) AS avg_conformer_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p10_conformer_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p50_conformer_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY conformer_multi_ctc_normalized), 6) AS p90_conformer_ctc,
                round(avg(wav2vec_ctc_normalized), 6) AS avg_wav2vec_ctc,
                round(percentile_cont(0.1) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p10_wav2vec_ctc,
                round(percentile_cont(0.5) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p50_wav2vec_ctc,
                round(percentile_cont(0.9) WITHIN GROUP (ORDER BY wav2vec_ctc_normalized), 6) AS p90_wav2vec_ctc
            FROM segment_map_v1
            WHERE has_validation
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO 'a  
        COPY (
            SELECT
                queue_language,
                count(*) FILTER (WHERE has_validation) AS validated_segments,
                count(*) FILTER (WHERE has_validation AND gemini_lang = mms_lang_iso1) AS gemini_mms_match,
                count(*) FILTER (WHERE has_validation AND gemini_lang = vox_lang_iso1) AS gemini_vox_match,
                count(*) FILTER (WHERE has_validation AND mms_lang_iso1 = vox_lang_iso1) AS mms_vox_match,
                count(*) FILTER (
                    WHERE has_validation
                      AND gemini_lang = mms_lang_iso1
                      AND gemini_lang = vox_lang_iso1
                ) AS all_three_agree,
                count(*) FILTER (
                    WHERE has_validation
                      AND (
                          gemini_lang IS NULL OR gemini_lang = ''
                          OR mms_lang_iso1 IS NULL OR mms_lang_iso1 = ''
                          OR vox_lang_iso1 IS NULL OR vox_lang_iso1 = ''
                      )
                ) AS missing_any_lid
            FROM segment_map_v1
            GROUP BY queue_language
            ORDER BY validated_segments DESC
        ) TO 'ac  
        COPY (
            SELECT
                video_id,
                any_value(queue_language) AS queue_language,
                count(*) AS missing_validation_segments
            FROM segment_map_v1
            WHERE NOT has_validation
            GROUP BY video_id
            ORDER BY missing_validation_segments DESC, video_id
        ) TO 'a   
        SELECT
            count(*) AS total_segments,
            count(DISTINCT video_id) AS total_videos,
            count(*) FILTER (WHERE has_validation) AS validated_segments,
            count(*) FILTER (WHERE NOT has_validation) AS missing_validation_segments,
            count(DISTINCT CASE WHEN NOT has_validation THEN video_id END) AS videos_with_missing_validation,
            count(*) FILTER (WHERE validation_source = 'historical') AS historical_validation_segments,
            count(*) FILTER (WHERE validation_source = 'recover') AS recover_validation_segments,
            count(*) FILTER (WHERE provisional_bucket = 'golden') AS golden_segments,
            count(*) FILTER (WHERE provisional_bucket = 'redo') AS redo_segments,
            count(*) FILTER (WHERE provisional_bucket = 'dispose') AS dispose_segments,
            count(*) FILTER (WHERE provisional_bucket = 'missing') AS missing_bucket_segments,
            count(*) FILTER (WHERE is_split_segment) AS split_segments,
            count(*) FILTER (WHERE flag_join_safe) AS flag_join_safe_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS flag_join_ambiguous_segments,
            round(100.0 * count(*) FILTER (WHERE has_validation) / count(*), 6) AS validation_coverage_pct
        FROM segment_map_v1
    historical_shards_availablerecover_shards_availablezphase-1phaser   generated_at_epoch_sduckdb_pathsegment_map_pathai  
        SELECT
            count(*) AS total_segments,
            count(*) FILTER (WHERE flag_join_safe) AS safe_join_segments,
            count(*) FILTER (WHERE NOT flag_join_safe) AS ambiguous_join_segments,
            count(*) FILTER (WHERE flag_join_safe AND flag_rows_total IS NOT NULL) AS safe_join_segments_with_flags
        FROM segment_map_v1
    )txflagsqueuehistorical_shardsrecover_shardsrm   rn   )	rq   rd   analytics_dirlanguage_rollupvideo_rollupbucket_rollupmodel_rollupdisagreement_rollupmissing_validation_videos)golden_lid_agreegolden_ctc_mingolden_quality_mingolden_duration_mindispose_ctc_maxdispose_duration_maxrZ   )ro   inputsoutputs
thresholdsglobal_summaryflag_join_summary	elapsed_szglobal_summary.jsonzflag_join_summary.jsonzphase1_run_manifest.jsonr[   zPhase-1 build complete in z.1fs)(r0   r   rs   rt   ru   rv   rw   
output_dirmkdirr6   
SystemExitdb_path	overwriter<   rB   as_posixduckdbconnectrC   rS   threadsmemory_limittimeprintreplacerJ   r#   rL   rT   DISPOSE_CTC_MAXDISPOSE_DURATION_MAXGOLDEN_LID_AGREEGOLDEN_CTC_MINGOLDEN_QUALITY_MINGOLDEN_DURATION_MINrX   roundra   r_   r`   )#argstx_path
flags_path
queue_pathhistorical_dirrecover_dirr   r   segment_map_dirrx   historical_availablerecover_availablehistorical_globrecover_globrM   temp_dirt0tx_sql	flags_sql	queue_sqlhistorical_selectrecover_select	dup_countval_colsbucket_casesegment_map_sqllanguage_rollup_pathvideo_rollup_pathbucket_rollup_pathmodel_rollup_pathdisagreement_rollup_pathmissing_validation_videos_pathr   r   run_manifestr2   r2   r3   mainj   sr  










	



		





c











#r   __main__)r4   r   )r4   r   r=   r>   )r=   rC   )rM   rN   rO   rC   r=   rP   )r4   r   rY   rP   )__doc__
__future__r   r,   r_   r8   r   pathlibr   r   r   r   r   r   r   r   r#   __annotations__r0   r<   rB   rL   rX   ra   r   __name__r2   r2   r2   r3   <module>   s:    

	


	    
