o
    ier                     @   s  d Z ddlZddlZddlZddlmZmZmZ ddlm	Z	m
Z
 ddlmZ ddlmZmZmZ ddlmZ ddlmZ dd	lmZmZmZ dd
lmZ ddlmZmZ ddlmZ ddlm Z  ddl!m"Z"m#Z#m$Z% e	G dd dZ&e	G dd dZ'e	G dd dZ(G dd dZ)										djde*d ee* d!e*d"e*d#e+d$ee, d%e+d&e+d'e+d(e*d)e-d*e'fd+d,Z.d-gd.fde*d e*d/ee* d0ee* d$e,d*ee*e'f fd1d2Z/e0d3krddl1Z1e1j2d4d5Z3e3j4dd6d7 e3j4d8d9d:d7 e3j4d;d<dd=d> e3j4d?d@dg dAdBdC e3j4dDe+ddEdF e3j4dGdHdIdJ e3j4dKdLe,dMdN e3j4dOdPe+ddQdF e3j4dRe+ddSdF e3j4dTe+ddUdF e3j4dVdWddXd> e35 Z6e.e6j7e6j8e6j9e6j:e6j;e6j<e6j=e6j>e6j?e6j@e6jA dYZBeCdZd[  eCd\ eCd[  eCd]eBj7  eCd^eBj8  eCd_eBj9 d`eBjD da eCdbeBjE  eCdceBjF  eCddeBjGdedf eCdgeBjHdedf eCdheBjI  eBjJreCdieBjJ  dS dS dS )ka  
Main transcription pipeline orchestrating the full workflow:
1. Download audio segments from R2
2. Fetch language info from Supabase
3. Process/split audio segments
4. Transcribe using Gemini
5. Output results

Full control over all parameters for testing and production use.
    N)OptionalListDict)	dataclassfield)datetime)DEFAULT_SETTINGSGEMINI_MODELSget_model_name)download_video_segments)get_video_language)AudioProcessor
AudioChunkget_segment_stats)AudioPolisher)GeminiTranscriberTranscriptionConfig)TranscriptionResult)derive_romanized_text)validate_transcriptionquick_validatecleanupc                   @   sP  e Zd ZU dZeed< dZee ed< dZeed< dZ	e
ed< d	Ze
ed
< dZe
ed< dZe
ed< dZee ed< dZeed< dZeed< dZeed< dZee ed< dZe
ed< dZeed< dZeed< dZeed< dZeed< dZe
ed < d!Zeed"< dZeed#< d$Zeed%< dZeed&< ed'd( d)Z eed*< d+Z!eed,< dZ"eed-< dS ).PipelineConfigzq
    Complete configuration for the transcription pipeline.
    All parameters are exposed for full control.
    video_idNlanguageTelugudefault_language      $@max_segment_duration_sec       @min_segment_duration_sec        chunk_overlap_sec      T@split_boundary_padding_msmax_segmentsr   segment_start_indexTskip_short_segmentsgemini-3-flash-previewmodellowthinking_leveltemperaturevalidate_transcriptionsretryvalidation_actionzgemini-3-pro-previewretry_modelretry_thinking_levelg      @
min_snr_db./transcriptions
output_dirsave_intermediate
   
batch_sizepolish_audioc                   C   s   t d S )Nwork_dir)r    r:   r:   pipeline.py<lambda>K   s    zPipelineConfig.<lambda>)default_factoryr9   Fcleanup_afteruse_regex_fallback)#__name__
__module____qualname____doc__str__annotations__r   r   r   r   floatr    r"   r$   r%   intr&   r'   boolr)   r+   r,   r-   r/   r0   r1   r2   r4   r5   r7   r8   r   r9   r>   r?   r:   r:   r:   r;   r       s6   
 r   c                   @   s   e Zd ZU dZeed< eed< eed< ee ed< eed< eed< eed< eed	< eed
< eed< eed< e	e ed< eed< eed< dS )PipelineResultzResults from a pipeline run.r   r   r)   r+   total_segments_foundsegments_processedchunks_createdtranscriptions_completedtotal_audio_duration_sectotal_processing_time_secoutput_fileerrors
started_atcompleted_atN)
r@   rA   rB   rC   rD   rE   r   rG   rF   r   r:   r:   r:   r;   rI   P   s    
 rI   c                   @   s   e Zd ZU dZdZeed< dZeed< dZeed< dZ	eed< dZ
eed< dZeed< dZeed	< dZeed
< dZeed< dZeed< dZeed< dZdZdZdZdd Zdd ZdS )_BatchHealthz>Tracks validator health per batch. Catches silent degradation.r   	validatedctc_successmms_successdual_successone_sided_acceptacceptreviewr.   rejectretry_improvedr!   total_combinedg?g333333?g?c                 C   s   |  j d7  _ |jdk}|jdk}|r|  jd7  _|r#|  jd7  _|r.|r.|  jd7  _|jdkr>|r7|s>|  jd7  _|j}t| |rQt	| |t
| |d  |  j|j7  _dS )zRecord one validation result.   r   rZ   N)rU   native_ctc_scoreroman_mms_scorerV   rW   rX   statusrY   hasattrsetattrgetattrr^   combined_score)self
validationctc_okmms_okstatus_attrr:   r:   r;   record|   s   


z_BatchHealth.recordc                 C   s  t | jd}| j| }| j| }| j| }| j| }| j| }| jt | jd }|d| j d| j d| j	 d| j
 d| j d|d |d	|d
d|d
d|d
d| j  | jdkre|d| j  g }	|d| j k rz|	dd| d
d |d| j k r|	dd| d
d || jkr|	d|d
d| jd
 | jdkr|| jkr|	d|d
d| jd
 |	D ]
}
|d|
 d q|	S )z5Log batch health summary. Warn on threshold breaches.r_   z
  Health: z validated | accept=z review=z retry=z reject=z	 | avg_S=z.3fz  Validators: CTC=z.0%z MMS=z dual=z | one-sided accepts=r   z  Retries improved: zCTC failing on z of segmentszMMS failing on zReject rate z	 exceeds zOne-sided accept rate z	  ALERT: WARN)maxrU   r^   rV   rW   rX   r\   rY   rZ   r[   r.   r]   CTC_FAIL_THRESHOLDappendMMS_FAIL_THRESHOLDREJECT_THRESHOLDONE_SIDED_THRESHOLD)rg   log_fnnavg_sctc_ratemms_rate	dual_raterej_rateone_sided_ratealertsalertr:   r:   r;   report   sR   







z_BatchHealth.reportN)r@   rA   rB   rC   rU   rG   rE   rV   rW   rX   rY   rZ   r[   r.   r\   r]   r^   rF   ro   rq   rr   rs   rl   r~   r:   r:   r:   r;   rT   g   s&   
 rT   c                   @   sf   e Zd ZdZdefddZddedefdd	Zd
edefddZde	fddZ
ddedefddZdS )TranscriptionPipelinezF
    Main pipeline orchestrating the full transcription workflow.
    configc                 C   s"   || _ g | _g | _d| _g | _dS )z'Initialize pipeline with configuration.N)r   resultsrQ   
start_time_health_alerts)rg   r   r:   r:   r;   __init__   s
   
zTranscriptionPipeline.__init__INFOmessagelevelc                 C   s,   t  d}td| d| d|  dS )zLog a message with timestamp.z%H:%M:%S[z] [z] N)r   nowstrftimeprint)rg   r   r   	timestampr:   r:   r;   _log   s   zTranscriptionPipeline._logr   returnc                 C   s*   ddddddddd	d
ddd}| |dS )z-Convert language name to code for validation.tehitaknmlbnmrgupaorasen)r   HindiTamilKannada	MalayalamBengaliMarathiGujaratiPunjabiOdiaAssameseEnglish)get)rg   r   lang_mapr:   r:   r;   _get_language_code   s   z(TranscriptionPipeline._get_language_codec           (         s  t   _t  }djj  djj djj	  d zt
jjjjjjd\}}W n tyY } zjd|  d| d  d	}~ww t|}d
|dd d|dddd |dddkrd|d  d |dddkrd|d  dd d jjrjj}d|  ntjjjjd}d|  d tjjjjjjjjd}|j|jjjjdjjdkrjjd	 djj dt d  jjrd! t  }t!j"#|d"}	d}
d}g }D ]:}|j$|j%|	d#}|j&r4|j'|_%|
d$7 }
|j(jj)k rB|d$7 }q|j*d% jjk rR|d$7 }q|| qd&|
 d't d(t|
  d) |dkrd*| d+jj) d, |d-t d. t+ }t,jjjj	jj-|d/}t.dtjj/D ]t0jj/ t}|  d0d$  d1| d2t  |j1 | fd3d4d5}jj2r/3|}t4 }t5|D ]2\}}|j6r!t7|j6||j8j9pd6d7}|j:|j8_9t; | j%|j6|j:|jjd8}|j<|_=|j>|_?|j<d9v rjj@d:krd;|jA d<jjB d=|j>d>d?d t,jjBjjCjj-|d/}|D | j%|}|d@sddAlEmF}mG} |dB}tH|tIr{|d^i |nd	}||dCd6|dDd6|dEd6|dFd6||dGd6dH}t7|j8||j9pd6d7}|j:|_9t; | j%|j8|j:|jjd8} | j>|j>kr||_8|j|_J|j	|_	| }dI| j>d>dJ| j< d? | jKd$7  _Kn
dK| j>d> |j<|_=|j>|_?|j<dLv rdM|jA dN|j< d=|j>d>dO|jLd>dP|jMd>d?d |N| q|Oj}!jPQ|! jRQ| jjSr@jTdQdR qdS jTdTdR}"jj2rUtU  jjVrzdU dd	lW}#t!j"#jjjj}$t!j"X|$rz|#Y|$ t   j }%t  }&tZdVdW jRD }'t[jj|jjjj	|ddtt\dXdW jRD ttjR|'|%|"j||&dY}dZ|%dd d[|'dd\ d]|"  |S )_z}
        Execute the full pipeline.
        
        Returns:
            PipelineResult with summary and statistics
        zStarting pipeline for video: Model: z, Thinking: z-Step 1: Downloading audio segments from R2...)r9   r?   zR2 download failed: zERROR: ERRORNzFound total_filesr   z audio files, total duration: total_duration_sec.1fsover_10s_countz  -> z  segments over 10s will be splitunder_2s_countz! segments under 2s may be skippedrm   z!Step 2: Fetching language info...zUsing specified language: )defaultzLanguage from Supabase: z$Step 3: Processing audio segments...)max_duration_secmin_duration_secoverlap_secboundary_padding_ms)r%   
skip_shortzStarting from index z, z chunks remainingz'Step 3.5: Polishing audio boundaries...polished)r4   r_   g     @@z	Polished /z	 chunks (z already clean)zSNR gate: skipped z chunks (SNR < zdB)zStep 4: Transcribing z
 chunks...)r)   r+   r,   r   zProcessing batch -z of c              
      s.    d|   dt d | d  j S )Nz  Chunk r   : r_   )r   lenoriginal_segment)ctbatchbatch_startchunksrg   r:   r;   r<   I  s
    z+TranscriptionPipeline.run.<locals>.<lambda>)progress_callback )native_textr   fallback_romanized)romanized_textr   r   )r.   r\   r.   z  Retrying z with z (S=z.2f)error)TranscriptionOutputSpeakerMetaspeakertranscriptioncode_switch	romanizedtaggeddetected_language)r   r   r   r   r   r   z  Retry improved: S=z (z  Retry did not improve: S=)r\   r.   z  Validation: z -> z, CTC=z, MMS=T)partialzStep 5: Saving results...FzCleaning up downloaded files...c                 s       | ]}|j V  qd S Nduration_sec.0rr:   r:   r;   	<genexpr>      z,TranscriptionPipeline.run.<locals>.<genexpr>c                 s   r   r   )
segment_idr   r:   r:   r;   r     r   )r   r   r)   r+   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   zPipeline completed in zTranscribed z
s of audiozOutput saved to: r:   )]timer   r   r   	isoformatr   r   r   r)   r+   r   r9   r?   	ExceptionrQ   rp   r   r   r   r   r   r   r   r    r"   r$   process_segments_directoryr%   r'   r&   r   r8   r   ospathjoinpolish	file_pathwas_modifiedoutput_pathsnr_dbr2   polished_duration_msr   r   r,   ranger7   mintranscribe_batchr-   r   rT   	enumeratenativer   r   r   textr   rb   validation_statusrf   validation_scorer/   r   r0   r1   transcribe_audio src.backend.transcription_schemar   r   
isinstancedict
model_usedr]   r`   ra   rl   r~   r   extendr   r5   _save_resultscleanup_validatorr>   shutilexistsrmtreesumrI   set)(rg   rR   segments_dirmetadataestatsr   	processorpolisherpolished_dirpolished_countsnr_skippedsurviving_chunkschunkresulttranscribertranscription_config	batch_endbatch_results	lang_codebatch_healthiromanized_resultrh   retry_config	retry_rawr   r   speaker_datar   retry_outputretry_romanized	retry_valr|   rP   r   video_work_dir
total_timerS   total_audior:   r   r;   run   s  











$






















zTranscriptionPipeline.runFr   c              	   C   s"  t j| jjdd t d}|rdnd}| jj d| jj	dd d| | d}t j
| jj|}| jj| jj| jj| jrF| jd	 jn| jj| jj| jj| jj| jjd
t| jtdd | jD dd | jD d}t|ddd}tj||ddd W d   |S 1 sw   Y  |S )zSave results to JSON file.T)exist_okz%Y%m%d_%H%M%S_partialr   _r   z.jsonr   )r   r    r$   r%   c                 s   r   r   r   r   r:   r:   r;   r     r   z6TranscriptionPipeline._save_results.<locals>.<genexpr>c                 S   s   g | ]}|  qS r:   )
model_dumpr   r:   r:   r;   
<listcomp>  s    z7TranscriptionPipeline._save_results.<locals>.<listcomp>)r   r)   r+   r   r   results_countr   r   wzutf-8)encodingF   )ensure_asciiindentN)r   makedirsr   r4   r   r   r   r   r)   replacer   r   r+   r   r   r   r   r    r$   r%   r   r   openjsondump)rg   r   r   suffixfilenamer   output_datafr:   r:   r;   r     s0   *
z#TranscriptionPipeline._save_resultsN)r   )F)r@   rA   rB   rC   r   r   rD   r   r   rI   r  rH   r   r:   r:   r:   r;   r      s    
  r   r(   r*   r!   r   r   r#   r3   Tr   r   r)   r+   r,   r%   r   r   split_pad_msr4   validater   c                 C   s.   t | |||||||||	|
d}t|}| S )a   
    Convenience function to run the pipeline with common settings.
    
    Args:
        video_id: YouTube video ID
        language: Primary language (if None, fetched from Supabase)
        model: Gemini model to use
        thinking_level: Thinking level for Gemini 3 (default: low)
        temperature: Generation temperature (default: 0.0 deterministic)
        max_segments: Limit segments for testing
        max_duration_sec: Max segment duration (hard limit)
        min_duration_sec: Skip segments shorter than this duration
        split_pad_ms: Padding context to add around split boundaries
        output_dir: Directory for output files
        validate: Whether to validate transcriptions
        
    Returns:
        PipelineResult with summary
    )r   r   r)   r+   r,   r%   r   r    r$   r4   r-   )r   r   r  )r   r   r)   r+   r,   r%   r   r   r1  r4   r2  r   pipeliner:   r:   r;   run_pipeline  s    r4  high   modelsthinking_levelsc                 C   s   i }|D ]P}|D ]K}| d| }t dd  t d|  t d  zt| ||||d|  d}	|	||< W q tyS }
 zt d| d|
  W Y d	}
~
qd	}
~
ww q|S )
ae  
    Test multiple models on the same video for comparison.
    
    Args:
        video_id: Video ID to test
        language: Primary language
        models: List of model names to test
        thinking_levels: Thinking levels to try
        max_segments: Segments per test
        
    Returns:
        Dict mapping model_thinking to PipelineResult
    r  
<============================================================z	Testing: z./transcriptions/test_)r   r   r)   r+   r%   r4   zERROR testing r   N)r   r4  r   )r   r   r7  r8  r%   r   r)   thinkingkeyr
  r  r:   r:   r;   test_models?  s.   
 r=  __main__zTranscription Pipeline)descriptionzYouTube video ID)helpz
--languagez-lz'Primary language (default: auto-detect)z--modelz-mzGemini model to use)r   r@  z
--thinkingz-t)minimalr*   mediumr5  z*Thinking level for Gemini 3 (default: low))r   choicesr@  z--temperaturez3Generation temperature (default: 0.0 deterministic))typer   r@  z--no-validate
store_truezSkip transcription validation)actionr@  z--max-segmentsz-nzMaximum segments to process)rD  r@  z--max-durationz-dz#Maximum segment duration in secondsz--min-durationz2Minimum segment duration in seconds (default: 2.0)z--split-pad-msz8Padding (ms) added around split boundaries (default: 80)z--outputz-ozOutput directory)r   r   r)   r+   r,   r%   r   r   r1  r4   r2  r9  r:  zPipeline Summaryz
Video ID: z
Language: r   z (thinking: r   zSegments processed: zChunks transcribed: zTotal audio: r   r   zTotal time: zOutput: zErrors: )
Nr(   r*   r!   Nr   r   r#   r3   T)KrC   r   r+  r   typingr   r   r   dataclassesr   r   r   src.backend.configr   r	   r
   src.backend.r2_storager   src.backend.supabase_clientr   src.backend.audio_processorr   r   r   src.backend.audio_polisherr   src.backend.gemini_transcriberr   r   r   r   src.romanizationr   src.validatorsr   r   r   r   r   rI   rT   r   rD   rF   rG   rH   r4  r=  r@   argparseArgumentParserparseradd_argument
parse_argsargsr   r   r)   r;  r,   r%   max_durationmin_durationr1  outputno_validater
  r   r+   rK   rM   rN   rO   rP   rQ   r:   r:   r:   r;   <module>   s   
/L  ]	

6


-







5