o
    lQi'                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZmZ ddlmZmZ dd	lmZmZmZ dd
lmZmZmZmZ ddlmZmZ ddlmZmZ e e!Z"G dd dZ#dS )z
Pipeline orchestrator per worker:
claim video -> download tar -> polish audio -> batch-cycle through segments -> pack results -> upload -> mark done.
    )annotationsN)Path)Optional   )polish_all_segmentsPolishedSegment)BatchCycleEngineBatchResult)WORKER_BATCH_SIZEBATCH_INTERVAL_SECONDS	EnvConfig)MockDB
PostgresDB	VideoTaskWorkerStats)BaseProviderTranscriptionRequest)R2ClientExtractedVideoc                   @  s    e Zd ZdddZdddZdS )Pipelineconfigr   dbMockDB | PostgresDBr2r   primary_providerr   fallback_providerOptional[BaseProvider]	worker_idstrstatsr   c                 C  s.   || _ || _|| _|| _|| _|| _|| _d S )N)r   r   r   primaryfallbackr   r   )selfr   r   r   r   r   r   r    r#   (/home/ubuntu/transcripts/src/pipeline.py__init__   s   

zPipeline.__init__taskr   returnboolc           "        s  |j }|j}|jr|j r|j}td| d|  nttjd| j	 dd}z+ztd| d|  || j
_t }|| d }| rWtd| d ntd| d	 |d
| jj||I d
H }|d
| jj||I d
H }td| dt|j d |jstd| d | j|I d
H  W W | j| dS |jp|}td| dt|j d |d
t|jI d
H }dd |D }	t|t|	 }
td| dt|	 d|
 d |	std| d | j|I d
H  W W | j| dS t|	| j
_t| j| j| j	|d}i }t|	t d t }t|D ]l}t  }|t }t!|t t|	}|	|| }g }i }i }|D ];}|j"j#}|j"j$r_|j"j# d|j"j% }|&t'||j(||j"j#d |j"j)d ||< |j"j*|j"j+|j"j$d||< qItd| d|d  d| dt| d	 |j,||||d I d
H }|j-D ]}|j.r|j.||j/< q|j0r| j1|j0I d
H  |j2r| j3|j2I d
H  | j
 j4|j47  _4| j
 j5|j67  _5| j
 j7|j87  _7| j
 j9|j97  _9| j
 j:|j:7  _:| j
 j;d7  _;t|	| | j
_|j<| j
_=|j-D ](}|j>j?d!kr@| j
 j@|j>jA7  _@| j
 jB|j>jC7  _B| j
 jD|j>jE7  _Dqt  | }|d!krh|j4| d" | j
_FtGd#d$ |j-D }|| d" | j
_Ht  | }tI| }|d!kr||d k rtJd| d%|d&d' tK|I d
H  q'td| d( d)d* |jD   fd+d|	D }t }|d
| jjL||jM||i |jNd,t|t|	|
t|| j	d-iI d
H } td| d. |d
| jjO| |I d
H  | j|I d
H  d
| j
_td| d/ W W | j| dS  tPyW }! z=tjQd| d0|! dd1 | jR|tS|!I d
H  | jT| j	tS|!I d
H  | j
jU&tS|! W Y d
}!~!W | j| d2S d
}!~!ww | j| w )3z;Process a single video end-to-end. Returns True on success.[z] Using prefetched download at worker__)prefixz] Starting pipeline, language=z.tarz2] Tar already downloaded (prefetch), extracting...z] Downloading tar from R2...Nz] Extracted z	 segmentsz!] No segments found, marking doneTz] Polishing z segments...c                 S  s   g | ]}|j js|qS r#   )	trim_meta	discarded.0pr#   r#   r$   
<listcomp>V   s    z*Pipeline.process_video.<locals>.<listcomp>z] Polished: z valid, z
 discardedz(] All segments discarded after polishing)r   r   r   video_idr   _split)
segment_idaudio_base64language_codeoriginal_filei  )abrupt_start
abrupt_end	was_splitz] Batch /z: )requestsexpected_languageaudio_durations
trim_metasr   <   c                 s  s$    | ]}|j jd kr|j jV  qdS )r   N)token_usagetotal_tokens)r0   rr#   r#   r$   	<genexpr>   s    z)Pipeline.process_video.<locals>.<genexpr>z
] Waiting z.1fzs for next batchz] Packing results tar...c                 S  s   i | ]}|j |qS r#   )namer/   r#   r#   r$   
<dictcomp>   s    z*Pipeline.process_video.<locals>.<dictcomp>c                   s$   g | ]}|j j v r |j j qS r#   )r-   r8   )r0   s
path_indexr#   r$   r2      s    
transcription_summary)total_segmentsvalid_segmentsdiscarded_segmentstranscribed_segmentsr   z] Uploading result tar to R2...z] Pipeline complete!z] Pipeline failed: )exc_infoF)Vr3   languageprefetch_direxistsloggerinfor   tempfilemkdtempr   r   current_video_idasyncioget_running_looprun_in_executorr   download_tarextract_tarlensegment_pathswarningr   mark_video_donecleanupr   segments_remainingr   r    r!   r
   rangetime	monotonicminr-   r8   r;   split_indexappendr   base64_audiofinal_duration_msr9   r:   	run_batch	responsestranscription_datar5   transcription_recordsinsert_resultsflag_recordsinsert_flagssegments_sentsegments_completedsegments_returnedsegments_failedsegments_errorsegments_429
cache_hitsbatches_completedavg_latency_msavg_batch_latency_msrB   rC   total_input_tokensinput_tokenstotal_output_tokensoutput_tokenstotal_cached_tokenscached_tokens
active_rpmsum
active_tpmr   debugsleeppack_results_tarwork_dirmetadata
upload_tar	Exceptionerrormark_video_failedr   set_worker_errorerrors)"r"   r&   r3   rQ   r   looptar_path	extractedpolishedrM   r.   batch_engineall_transcription_jsonstotal_batches	batch_idxbatch_start_timestartendbatch_segmentsr=   r?   r@   segseg_idbatch_resultresprD   batch_elapsed_srC   elapsedremaining_waitvalid_paths
result_tarer#   rI   r$   process_video,   s2  

  

"  
,








zPipeline.process_videoN)r   r   r   r   r   r   r   r   r   r   r   r   r   r   )r&   r   r'   r(   )__name__
__module____qualname__r%   r   r#   r#   r#   r$   r      s    
r   )$__doc__
__future__r   rY   loggingrV   re   pathlibr   typingr   audio_polishr   r   batch_cycler   r	   r   r
   r   r   r   r   r   r   r   providers.baser   r   	r2_clientr   r   	getLoggerr   rT   r   r#   r#   r#   r$   <module>   s     
