o
    %i-                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	 ej
ejddd edZd	d
 Zdd Zd#ddZd#ddZd$ddZd%ddZd&ddZd d! Zed"kr^e  dS dS )'z
Entry point for the validation pipeline.
Usage:
  python -m validations.main [--max-videos N] [--mock] [--worker-id ID]
  python -m validations.main --test-local VIDEO_ID  # test on a single video
    )annotationsN)Pathz1%(asctime)s [%(levelname)s] %(name)s: %(message)sz%H:%M:%S)levelformatdatefmtvalidationsc                  C  s   t jdd} | jdtddd | jddd	d
 | jdddd
 | jdtddd | jdtddd | jdtddd | jdtddd | jdtddd | jdtdd | jdtdd | jdtdd d |  S )!Nz&Validation pipeline: LID + CTC scoring)descriptionz--max-videosr   z#Max videos to process (0=unlimited))typedefaulthelpz--mock
store_truezMock mode (no R2/DB))actionr   z	--recoverz0Run recover worker over validation_recover_queuez--worker-id z	Worker IDz--test-localzTest single video from R2z--test-recover-localzURecover + validate a single video from raw 1-cleaned-data using transcription_resultsz--modelsallz>Comma-separated model list: mms,vox,conformer,wav2vec or 'all'z--shard-size2   zVideos per parquet shardz--tx-parquetz"data/transcription_results.parquet)r	   r
   z--validation-dirdataz--recover-limitz-Max missing segments to recover in local test)argparseArgumentParseradd_argumentintstr
parse_args)p r   ,/home/ubuntu/transcripts/validations/main.pyr      s*   r   c           	        s   ddl m} ddlm} ddlm} || j| j| jd}t	|| j
 | }|r>| js>|D ]
}td|  q.td | jrC|n|}||}| I d H  d S )N   ValidationConfig)ValidationWorker)RecoverValidationWorker)	mock_mode
max_videos	worker_idzConfig error: )configr   workerr   recover_workerr   mockr!   r"   _apply_model_togglesmodelsvalidateloggererrorsysexitrecoverstart)	argsr   r   r   r#   errorse
worker_clsr$   r   r   r   
run_worker0   s$   

r4   video_idr   c           !        s  ddl }ddl}ddl}ddl}ddlm} ddlm} ddlm	} ddl
m}	 | }
t|
|j td|  d	 td
|
j d|
j d|
j d|
j  ddl}|jd|
j|
j|
jdd}t|jd|  dd}||  d }|  d}t }d}|
j|fdd| ffD ]-\}}z |j||d td| d|  |||t| d}W  n	 t y   Y qw |st!d|   dS t | }|" j#d }td|dd |dd! |$|d"}|j%|d#d$ W d   n1 sw   Y  |&  ||| \}}td%t'| d&|(d'd(  |s(t!d) |)| dS ||
}|*  t }|+| |}t | }td*d+  td,t'| d-|dd! td+  t,d.d/ |D }td0| dt'| d1d2| t'| d3d4 |dd5 D ]*}td6|j- d7|j. d8|j/ d9|j0d:d;|j1 d9|j2d:d<|j3 d=|j4  qt'|d5krtd>t'|d5  d? |	|
|d@ }|5| | |6 } | rtdA|   |7  tdBt | ddC|ddD|ddE tdF|  dS )GuG   Test pipeline on a single video — download, process, inspect results.r   Nr   r   )load_video_segmentsValidationPipelineParquetPackerz=== Test local:  ===Models: MMS=, Vox=, Conformer=
, Wav2Vec=s3auto)endpoint_urlaws_access_key_idaws_secret_access_keyregion_name	val_test__prefixz_transcribed.tarFz1-cleaned-dataztranscribed/)BucketKeyzDownloading s3:///Tz%Tar not found in any R2 location for g    .AzDownloaded: z.1fzMB in szr:*r   )filterzLoaded z segments, language=language?zNo segments found!
z<============================================================z	Results: z segments in c                 s  s    | ]}|j rd V  qdS )r   N)lid_consensus).0rr   r   r   	<genexpr>   s    ztest_local.<locals>.<genexpr>zLID consensus: z (d   z.0fz%)   z  z	: gemini=z mms=(z.2fz) vox=z) consensus=z
 ctc_norm=z
  ... and z moreoutputzTest parquet: zTotal time: zs (download=zs, process=zs)zWork dir (inspect results): )8jsonshutiltarfiletempfiler#   r   audio_loaderr6   pipeliner8   packerr:   r'   r(   r*   infoenable_mms_lidenable_voxlinguaenable_conformer_multienable_wav2vec_langboto3clientr2_endpoint_urlr2_access_key_idr2_secret_access_keyr   mkdtemptimer2_bucket_sourcehead_objectdownload_filer   	Exceptionr+   statst_sizeopen
extractallunlinklengetrmtreeload_modelsprocess_videosumsegment_filegemini_langmms_lang_iso1mms_confidencevox_lang_iso1vox_confidencerR   conformer_multi_ctc_normalizedadd_video_resultsflushunload_models)!r5   r0   rZ   r[   r\   r]   r   r6   r8   r:   r#   rf   r@   work_dirtar_pathkeyt0
downloadedbucketobj_keydl_timesize_mbtfmetadatasegmentsr_   t1results	proc_timeconsensus_countrT   r`   
shard_pathr   r   r   
test_localH   s   
 "

0$,r   c                   s  ddl }ddl}ddl}ddlm} ddlm} ddlm} ddl	m
} ddlm}	 dd	lm}
 | }t||j tt|j| }|sRtd
|  d|j  dS tt|j| }dd |D }t|| }|jdkrs|d|j }td|  d tdt| dt| dt|  td|j d|j d|j d|j  |std dS | }||}t|j d|  dd}z|!| |}|"|| }|
|j#| |t$|d}tdt|j% dt|j& dt|j' dt|j( d t|j) 
 |j'rt*d!|j'dd"   |j%std# W dS |	|}|+  z|,| |j%}W |-  n|-  w td$t| d% |d&|j.d'}|||d( }|/| | |0 }|rbtd)|  td*|  W dS  t1y|   |j2|d&d+  w ),zFRecover + validate a single video from raw tar and historical tx rows.r   N)	EnvConfig)R2Clientr   r   r9   r7   )load_recover_segmentsz No transcription rows found for z in c                 S     h | ]}|d  qS )r|   r   rS   rowr   r   r   	<setcomp>       z%test_recover_local.<locals>.<setcomp>z=== Test recover local: r;   zHistorical tx rows=z, currently validated=z, missing_validation_targets=r<   r=   r>   r?   zKNothing missing for this video under the current local validation snapshot.recover_test_rG   rH   )target_segment_idszRecovered target segments=z, matched_tx_ids=z, missing_tx_ids=z, extra_regen_ids=z, missing_parent_files=z%Missing historical IDs after replay: 
   z6Replay produced no recoverable segments for validationzValidation results: z segment rowsT)r    r"   recover_outputzRecover test parquet: z#Work dir (inspect recover output): )ignore_errors)3r[   r]   duckdb
src.configr   src.r2_clientr   r#   r   r`   r:   r_   r8   recover_loaderr   r'   r(   _load_tx_rows_for_videor   
tx_parquetr*   r+   _load_validated_segment_idsvalidation_dirsortedrecover_limitra   rv   rb   rc   rd   re   rk   download_tarextract_tarr   setr   matched_tx_idsmissing_tx_idsextra_regen_idsmissing_parent_fileswarningry   rz   r   r"   r   r   rp   rx   )r5   r0   r[   r]   r   r   r   r   r:   r8   r   r#   tx_rowsvalidated_idstx_ids
target_ids
raw_configr2r   r   	extractedr.   r_   r   pack_configr`   r   r   r   r   test_recover_local   s   


r   
models_argc                 C  sL   |dkrd S dd | dD }d|v | _d|v | _d|v | _d|v | _d S )	Nr   c                 S  s   h | ]
}|  r|  qS r   )strip)rS   mr   r   r   r         z'_apply_model_toggles.<locals>.<setcomp>,mmsvox	conformerwav2vec)splitrb   rc   rd   re   )r#   r   r(   r   r   r   r'     s   


r'   r   r   return
list[dict]c                   sl   dd l }|  std|  | }|dt| |g}dd |jD   fdd| D }|  |S )Nr   zMissing tx parquet: a  
        SELECT
            segment_file,
            expected_language_hint,
            detected_language,
            transcription,
            tagged,
            quality_score,
            speaker_emotion,
            speaker_style,
            speaker_pace,
            speaker_accent
        FROM read_parquet(?)
        WHERE video_id = ?
        ORDER BY segment_file
        c                 S  s   g | ]}|d  qS r   r   )rS   dr   r   r   
<listcomp>5  r   z+_load_tx_rows_for_video.<locals>.<listcomp>c                   s   g | ]	}t t |qS r   )dictzipr   colsr   r   r   6  s    )	r   existsFileNotFoundErrorconnectexecuter   r   fetchallclose)r   r5   r   conrelrowsr   r   r   r     s   
r   r   set[str]c                 C  s   dd l }| d | d | d g}dd |D }|st S ddd |D }| }|d	| d
|g }|  dd |D S )Nr   zgolden_segments.csvzredo_segments.csvzdispose_segments.csvc                 S  s   g | ]
}|  rt|qS r   )r   r   rS   r   r   r   r   r   C  r   z/_load_validated_segment_ids.<locals>.<listcomp>z, c                 S  s   g | ]}d | d qS )'r   r   r   r   r   r   G  s    zB
        SELECT DISTINCT segment_file
        FROM read_csv_auto([zG], union_by_name=true, header=true)
        WHERE video_id = ?
        c                 S  r   r   r   r   r   r   r   r   R  r   z._load_validated_segment_ids.<locals>.<setcomp>)r   r   joinr   r   r   r   )r   r5   r   	csv_pathsexistingcsv_listr   r   r   r   r   r   ;  s(   r   c                  C  sP   t  } | jrtt| j|  d S | jrtt| j|  d S tt|  d S )N)r   r   asynciorunr   r4   )r0   r   r   r   mainU  s   r   __main__)r5   r   )r   r   )r   r   r5   r   r   r   )r   r   r5   r   r   r   )__doc__
__future__r   r   r   loggingr,   rl   pathlibr   basicConfigINFO	getLoggerr*   r   r4   r   r   r'   r   r   r   __name__r   r   r   r   <module>   s2    


l
]


 
