o
    %i*l                     @  s4  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZ ddlmZmZ ddlmZmZ ddlmZ ddlmZ d	d
lmZmZmZ d	dlmZ d	dlmZmZ d	dl m!Z! e"e#Z$ddddddddZ%dd Z&eG dd dZ'eG dd dZ(G dd dZ)dS )uG  
Production validation worker: claims done videos, runs LID+CTC pipeline,
packs parquet shards. Modeled on the battle-tested src/worker.py.

Key features vs the basic version:
  - Registers in worker_validators table for fleet monitoring
  - Async heartbeat loop (30s + jitter) with live throughput stats
  - Prefetch: claim + download next video while current one processes
  - Graceful shutdown: SIGTERM → finish current → flush parquet → release → offline
  - Dead video recovery: on startup, reset stale validating claims
  - Rolling throughput tracking (last 10 videos)
    )annotationsNdeque)	dataclassfield)datetimetimezone)Path)Optional   )ValidationConfigPARQUET_SHARD_SIZEHEARTBEAT_INTERVAL_S)load_video_segments)ValidationPipelineSegmentResult)ParquetPacker)	claims_okclaims_failheartbeats_okheartbeats_failmarks_ok
marks_failrecovery_resetc                  C  sh   t } td| d  d| d | d   d| d  d| d | d   d| d	  d| d
  d| d   d S )Nz[DB-STATS] claims=r   /r   z heartbeats=r   r   z marks=r   r   z recovery_reset=r   )	_db_statsloggerinfos r    ./home/ubuntu/transcripts/validations/worker.py_log_db_stats.   s   r"   c                   @  s   e Zd ZU dZdZded< dZded< dZded< dZded	< d
Z	ded< dZ
ded< dZded< d
Zded< dZded< dZded< edd dZded< dS )ValidationStatsz.Mutable stats updated during worker lifecycle.NzOptional[str]current_video_idr   intvideos_processedvideos_failedsegments_processed        floatavg_segs_per_secondshards_writtenshards_uploadedtotal_parquet_mbzOptional[datetime]last_video_completed_at
last_errorc                   C  s
   t ddS )N
   )maxlenr   r    r    r    r!   <lambda>E   s   
 zValidationStats.<lambda>)default_factoryr   _recent_speeds)__name__
__module____qualname____doc__r$   __annotations__r&   r'   r(   r+   r,   r-   r.   r/   r0   r   r5   r    r    r    r!   r#   8   s   
 r#   c                   @  s*   e Zd ZU dZded< ded< ded< dS )PrefetchResultz&Prefetched video ready for processing.strvideo_idlanguager	   work_dirN)r6   r7   r8   r9   r:   r    r    r    r!   r;   H   s
   
 r;   c                   @  s   e Zd ZdZd?ddZdd Zdd	 Zd@dAddZdBddZdCddZ	dDddZ
dd Zdd  Zd!d" Zd#d$ Zd%d& ZdEd(d)ZdFd,d-ZdGd/d0ZdHd1d2Zd3d4 Zd5d6 ZdIdJd9d:Zd;d< Zd=d> Zd
S )KValidationWorkerzv
    Production worker with heartbeats, prefetch, graceful shutdown.
    Each Docker container runs one instance.
    configr   c                 C  sV   || _ t|| _d | _t | _d | _d | _t	 | _
d | _ttjdd| _d| _d S )Nvalidation_prefixr)   )rA   r   pipelinepackerr#   stats_db_s3asyncioEvent_shutdown_event_heartbeat_taskr	   tempfilemkdtemp	_work_dir_start_time)selfrA   r    r    r!   __init__V   s   


zValidationWorker.__init__c              
     s  t    _t }tjtjfD ]}|||f fdd	 qzzO  I dH   	   
 I dH    I dH     td  j   jd }t j| _t   _td jj d   I dH  W n9 ty } z-tjd| d	d
 t|dd  j_ jr j dt|dd dI dH  W Y d}~nd}~ww W  ! I dH  dS W  ! I dH  dS  ! I dH  w )uT   Main entry: connect → recover → register → load models → heartbeat → loop.c                   s   t  | S N)rJ   create_task_handle_shutdownr   rR   r    r!   r3   h   s    z(ValidationWorker.start.<locals>.<lambda>Nz4Loading validation models (downloading if needed)...parquet_shardsWorker z entering main loopzWorker fatal error: Texc_info  error)r]   )"timerQ   rJ   get_running_loopsignalSIGTERMSIGINTadd_signal_handler_connect_db_init_s3_recover_dead_videos	_register	_hf_loginr   r   rE   load_modelsrP   r   rA   rF   rU   _heartbeat_looprM   	worker_id
_main_loop	Exceptionr]   r<   rG   r0   rH   _update_worker_status_cleanup)rR   loopsig
output_direr    rW   r!   startb   s:   



 " zValidationWorker.startc              
     s`  d}d}d}d}d }d }| j j}|dkr!td| d| d | j s|dkr<| jj|kr<td| d nb|rT|j|j	|j
}}	}
d }td	|  n|  I d H \}}	d }
|s|d
7 }||krst| d n+td| d}td|dd| d| d ztj| j |dI d H  W n tjy   Y q!w d}|| j_|dkr|| jj d
 nd
}|dkr| j st|  }z| ||	|
I d H  | j jd
7  _ttj| j_d}W nl tyS } z_tjd| d| dd | j jd
7  _t|d d | j_| |t|I d H  t|}d|v p+d|v p+d|v }|rG|d
7 }||krFt| d W Y d }~nXnd}W Y d }~nd }~ww d | j_|r||  s|ztj|ddI d H }W n* tjtfy{   d }Y nw |r|  rz|! }W n ty   d }Y nw d }| j r'td| jj d| jj d  d S )!Nr         zMAX_VIDEOS=u    — will stop after z	 video(s)zReached MAX_VIDEOS=z, shutting down.zUsing prefetched video: r   z" consecutive empty claims, exitingg       @g      >@zNo pending videos, waiting .0fzs (attempt r   z)...timeoutzVideo z	 failed: TrZ   r\   CUDAzdevice-sidecuFFTz8 consecutive GPU failures (GPU likely poisoned), exitingr1   zMain loop ended:  processed, z failed)"rA   
max_videosr   r   rL   is_setrG   r&   r=   r>   r?   _claim_videominrJ   wait_forwaitTimeoutErrorr$   rU   _prefetch_next_process_one_videor   nowr   utcr/   rm   r]   r'   r<   r0   _mark_video_faileddoneresult)rR   consecutive_empty	max_emptyconsecutive_failuresmax_consecutive_failuresprefetch_task
prefetchedr}   r=   r>   
video_workr   	remainingrs   err_stris_gpu_fatalr    r    r!   rl      s    

$JzValidationWorker._main_loopNr=   r<   r>   prefetch_dirOptional[Path]c                   s  t   }|p| j| }|jdd t }z4|| d }| s_|d | j||I d H }|s_| |dI d H  W | rL||krLt	j
|dd d S |r[| r]t	j
|dd d S d S d S |d | j||I d H  t||\}}	|	std| d | |dI d H  W | r||krt	j
|dd d S |r| rt	j
|dd d S d S d S | j||	}
| j||
 | j jt|
7  _| jj}|d	 | j_|d
d| j_| |t|
I d H  t   | }|dkrt|
| nd}| jj| t| jjt| jj | j_td| dt|
 d|dd|dd| jjdd W | r7||kr7t	j
|dd d S |rH| rJt	j
|dd d S d S d S | r^||kr^t	j
|dd w |rn| rot	j
|dd w w w )NT)exist_ok_transcribed.tarztar download failedignore_errors[z] No valid segmentsr   r,   total_mbr)   z] Complete: z segments in .1fzs (z segs/s, rolling avg ))r^   rP   mkdirrJ   r_   existsrun_in_executor_download_tarr   shutilrmtree_extract_tarr   r   warning_mark_video_donerE   process_videorF   add_video_resultsrG   r(   lenr,   getr.   r5   appendsumr+   r   )rR   r=   r>   r   t0r   rp   tar_pathmetadatasegmentsresultspacker_statselapsedspeedr    r    r!   r      sp   &z#ValidationWorker._process_one_videoreturnOptional[PrefetchResult]c              
     s  d}zT|   I dH \}}|sW dS ttjd| jj dd}t }|d| j	||I dH }|sEt
j|dd | |dI dH  W dS td| d	|  t|||d
W S  ty } z#td|  |r{| |dt|dd  I dH  W Y d}~dS d}~ww )zBClaim and pre-download the next video while current one processes.N	prefetch__rC   Tr   ztar download failed (prefetch)z[prefetch] z tar ready at )r=   r>   r?   z[prefetch] Failed: zprefetch error:    )r   r	   rN   rO   rA   rk   rJ   r_   r   r   r   r   r   r   r   r;   rm   r   r<   )rR   r=   r>   r?   rp   r   rs   r    r    r!   r     s,   $zValidationWorker._prefetch_nextr?   r	   c           	   	   C  s   || d }| j jrdS | d}| j j|fdd| fg}|D ]C\}}z6| jj||d td| d|  | j||t| |	 j
d }td	| d
|dd |W   S  tyd   Y q!w td| d dS )zDownload transcribed tar from R2. Checks both bucket locations
        (tars split across 'transcribed' bucket and '1-cleaned-data/transcribed/' prefix).r   Nz1-cleaned-dataztranscribed/)BucketKeyzDownloading s3://r   g    .AzDownloaded : r   MBzDownload failed for z": tar not found in any R2 location)rA   	mock_moder2_bucket_sourcerI   head_objectr   r   download_filer<   statst_sizerm   r]   )	rR   r=   r?   r   key	locationsbucketobj_keysize_mbr    r    r!   r   5  s(   


zValidationWorker._download_tarr   c                 C  sJ   t |d}|j|dd W d    n1 sw   Y  |jdd d S )Nzr:*data)filterT)
missing_ok)tarfileopen
extractallunlink)rR   r   r?   tfr    r    r!   r   P  s   zValidationWorker._extract_tarc                 C  s:   | j jrd S dd l}|jd| j j| j j| j jdd| _d S )Nr   s3auto)endpoint_urlaws_access_key_idaws_secret_access_keyregion_name)rA   r   boto3clientr2_endpoint_urlr2_access_key_idr2_secret_access_keyrI   )rR   r   r    r    r!   re   U  s   zValidationWorker._init_s3c                 C  s$   ddl }|jdd td dS )uJ   Remove HF_TOKEN from env — all 4 models load from R2, no HF auth needed.r   NHF_TOKENzFHF_TOKEN cleared from env (all models load from R2, no HF auth needed))osenvironpopr   r   )rR   r   r    r    r!   rh   a  s   zValidationWorker._hf_loginc                   st   | j js	| j jstd d S dd l}d| j jv }|j| j jddd|r&dnddd	I d H | _td
| d d S )Nz1Running without DB (mock mode or no DATABASE_URL)r   z:6543r   rv   <   d   require)dsnmin_sizemax_sizecommand_timeoutstatement_cache_sizesslz(DB pool connected (min=1, max=3, pooler=r   )rA   r   database_urlr   r   asyncpgcreate_poolrH   )rR   r   	is_poolerr    r    r!   rd   i  s   

zValidationWorker._connect_dbc                   sZ  | j sdS | jj| jj| jj| jjd| jj| jjt| jj	d}t
dD ]}z@| j  4 I dH }|d| jj| jj	t|I dH  W d  I dH  n1 I dH sTw   Y  td| jj d W  dS  ty } z2d|d	  td
d }td|d	  dt|dd  d|dd t|I dH  W Y d}~q%d}~ww td dS )z+Insert/upsert into worker_validators table.N)mms_lid	voxlinguaconformer_multiwav2vec_lang)modelsmodel_bucketoutput_bucket
shard_sizegpu_typeru   a  
                        INSERT INTO worker_validators (
                            worker_id, status, gpu_type, config_json,
                            started_at, last_heartbeat_at,
                            videos_processed, videos_failed, segments_processed,
                            shards_written, shards_uploaded, total_parquet_mb
                        ) VALUES ($1, 'online', $2, $3::jsonb, now(), now(), 0, 0, 0, 0, 0, 0)
                        ON CONFLICT (worker_id) DO UPDATE SET
                            status = 'online', gpu_type = $2, config_json = $3::jsonb,
                            started_at = now(), last_heartbeat_at = now(),
                            videos_processed = 0, videos_failed = 0, segments_processed = 0,
                            shards_written = 0, shards_uploaded = 0, total_parquet_mb = 0,
                            last_error = NULL, current_video_id = NULL
                    zRegistered worker z in worker_validatorsrv   r   r      zRegister failed (attempt z/5): r   , retry in rw   r   z3Register failed after 5 attempts, continuing anyway)rH   rA   enable_mms_lidenable_voxlinguaenable_conformer_multienable_wav2vec_langr2_model_bucketr2_bucket_outputr   r   rangeacquireexecuterk   jsondumpsr   r   rm   randomuniformr   r<   rJ   sleepr]   )rR   config_jsonattemptconnrs   r   r    r    r!   rg   y  s:   (0zValidationWorker._registerc              
     s  | j sdS zk| j  4 I dH 6}|d| jjI dH }|r&t| d nd}|dI dH }|r:t| d nd}W d  I dH  n1 I dH sLw   Y  || }|dkro|td< t	d| d| d	| d
 W dS W dS  t
y } ztd|  W Y d}~dS d}~ww )zCOn startup, reset stale validating claims (handles crash recovery).Nz
                    UPDATE video_queue
                    SET validation_status = 'pending', claimed_by = NULL, claimed_at = NULL
                    WHERE validation_status = 'validating' AND claimed_by = $1
                r   a  
                    UPDATE video_queue
                    SET validation_status = 'pending', claimed_by = NULL, claimed_at = NULL
                    WHERE validation_status = 'validating'
                      AND claimed_at < now() - interval '15 minutes'
                r   z
Recovered z dead videos (own=z, stale=r   z(Dead video recovery failed (non-fatal): )rH   r   r  rA   rk   r%   splitr   r   r   rm   r   )rR   r	  r   	own_countstale_counttotalrs   r    r    r!   rf     s*   
($z%ValidationWorker._recover_dead_videostuple[Optional[str], str]c                   s  | j sdS tdD ]}zW| j  4 I dH }|d| jjI dH }W d  I dH  n1 I dH s2w   Y  |r`td  d7  < td|d  d	|d
  d |d |	d
dp[dfW   S W  dS  t
y } zStd  d7  < |dk rd| tdd }td|d  dt|dd  d|dd t|I dH  ntdt|dd   W Y d}~ dS W Y d}~q
d}~ww dS )zAtomic claim using validation_status column. Leaves status='done' untouched.
        ORDER BY video_id forces planner to use idx_vq_val_claim partial index
        instead of seq-scanning 500K rows.)N rv   Naj  
                        UPDATE video_queue
                        SET validation_status = 'validating',
                            claimed_by = $1,
                            claimed_at = now()
                        WHERE video_id = (
                            SELECT video_id FROM video_queue
                            WHERE status = 'done' AND validation_status = 'pending'
                            ORDER BY video_id
                            LIMIT 1
                            FOR UPDATE SKIP LOCKED
                        )
                        RETURNING video_id, language
                    r   r   zClaimed r=   z (lang=r>   r   r  r   r   g      ?g      ?zClaim failed (attempt z/3): r   r   r   r   zClaim failed after 3 attempts: )rH   r   r   fetchrowrA   rk   r   r   r   r   rm   r  r  r   r<   rJ   r  r]   )rR   r  r	  rowrs   r   r    r    r!   r     s6   ( 0zValidationWorker._claim_videosegment_countr%   c              
     s   | j sd S z4| j  4 I d H }|d||I d H  W d   I d H  n1 I d H s+w   Y  td  d7  < W d S  ty` } ztd  d7  < td| d|  W Y d }~d S d }~ww )Nz
                    UPDATE video_queue
                    SET validation_status = 'validated',
                        validation_segments = $2,
                        validation_at = now()
                    WHERE video_id = $1
                r   r   r   zmark_done failed for r   rH   r   r  r   rm   r   r   )rR   r=   r  r	  rs   r    r    r!   r     s   ($z!ValidationWorker._mark_video_doner]   c              
     s   | j sd S z8| j  4 I d H }|d||d d I d H  W d   I d H  n1 I d H s/w   Y  td  d7  < W d S  tyd } ztd  d7  < td| d|  W Y d }~d S d }~ww )Nz
                    UPDATE video_queue
                    SET validation_status = 'validation_failed',
                        error_message = $2
                    WHERE video_id = $1
                r\   r   r   r   zmark_failed failed for r   r  )rR   r=   r]   r	  rs   r    r    r!   r     s   ($z#ValidationWorker._mark_video_failedc              
     s   | j sdS z4| j  4 I dH }|d|I dH  W d  I dH  n1 I dH s*w   Y  td| d W dS  tyX } ztd| d|  W Y d}~dS d}~ww )z?Release a claimed video back to pending (used during shutdown).Nz
                    UPDATE video_queue
                    SET validation_status = 'pending', claimed_by = NULL, claimed_at = NULL
                    WHERE video_id = $1 AND validation_status = 'validating'
                z	Released z back to pendingzRelease failed for r   )rH   r   r  r   r   rm   r   )rR   r=   r	  rs   r    r    r!   _release_video  s   ($zValidationWorker._release_videoc              
     s   d}| j  scz	|  I d H  W n ty+ } ztd|  W Y d }~nd }~ww |d7 }|d dkr9t  tdd}zt	j
| j  t| dI d H  W d S  t	jy[   Y nw | j  rd S d S )Nr   zHeartbeat failed: r   r1      rx   )rL   r~   _send_heartbeatrm   r   r   r"   r  r  rJ   r   r   r   r   )rR   hb_countrs   jitterr    r    r!   rj     s.   
z ValidationWorker._heartbeat_loopc                   s   | j sd S zR| j}| j  4 I d H 0}|d| jj|j|j|j|j	t
|jd|j|jt
|jd|j|jI d H  W d   I d H  n1 I d H sIw   Y  td  d7  < W d S  tys } ztd  d7  < W Y d }~d S d }~ww )Nau  
                    UPDATE worker_validators SET
                        current_video_id = $2,
                        videos_processed = $3,
                        videos_failed = $4,
                        segments_processed = $5,
                        avg_segs_per_second = $6,
                        shards_written = $7,
                        shards_uploaded = $8,
                        total_parquet_mb = $9,
                        last_heartbeat_at = now(),
                        last_video_completed_at = $10,
                        last_error = $11
                    WHERE worker_id = $1
                r   r   r   r   )rH   rG   r   r  rA   rk   r$   r&   r'   r(   roundr+   r,   r-   r.   r/   r0   r   rm   )rR   r   r	  rs   r    r    r!   r  4  s2   

(z ValidationWorker._send_heartbeatr  statusc              
     s   | j sd S z9| j  4 I d H "}|d| jj||r |d d nd I d H  W d   I d H  W d S 1 I d H s8w   Y  W d S  tyZ } ztd|  W Y d }~d S d }~ww )Nz
                    UPDATE worker_validators
                    SET status = $2, last_error = $3, last_heartbeat_at = now()
                    WHERE worker_id = $1
                r\   zStatus update failed: )rH   r   r  rA   rk   rm   r   r   )rR   r  r]   r	  rs   r    r    r!   rn   Y  s   2z&ValidationWorker._update_worker_statusc                   s$   t d|j d | j  d S )Nz	Received z!, initiating graceful shutdown...)r   r   namerL   set)rR   rq   r    r    r!   rV   h  s   z!ValidationWorker._handle_shutdownc                   s^  t d | jr!| j  z| jI d H  W n
 tjy    Y nw | jjr/| | jjI d H  | j	rA| j	
 }|rAt d|  z| j  W n	 tyQ   Y nw z	|  I d H  W n	 tyd   Y nw | dI d H  t  | jr{| j I d H  | j rtj| jdd t | j }t d| jj d| jj d| jj d	| jj d
|dd d S )NzCleaning up worker...zFlushed final shard: offlineTr   rY   z shutdown complete. r|   z	 failed, z segments, rw   zs total)r   r   rM   cancelrJ   CancelledErrorrG   r$   r  rF   flushrE   unload_modelsrm   r  rn   r"   rH   closerP   r   r   r   r^   rQ   rA   rk   r&   r'   r(   )rR   shardr   r    r    r!   ro   l  sV   



zValidationWorker._cleanup)rA   r   rT   )r=   r<   r>   r<   r   r   )r   r   )r=   r<   r?   r	   r   r   )r   r	   r?   r	   )r   r  )r=   r<   r  r%   )r=   r<   r]   r<   )r=   r<   )r  )r  r<   r]   r<   )r6   r7   r8   r9   rS   rt   rl   r   r   r   r   re   rh   rd   rg   rf   r   r   r   r  rj   r  rn   rV   ro   r    r    r    r!   r@   P   s.    
%Z
:

+

)

%r@   )*r9   
__future__r   rJ   r  loggingr  r   r`   r   rN   r^   collectionsr   dataclassesr   r   r   r   pathlibr	   typingr
   rA   r   r   r   audio_loaderr   rE   r   r   rF   r   	getLoggerr6   r   r   r"   r#   r;   r@   r    r    r    r!   <module>   s@    

