o
    %io                     @  sD  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ dd	lmZ d
dlmZmZ d
dlmZ d
dlmZ d
dlm Z m!Z! d
dl"m#Z#m$Z$ d
dl%m&Z& e'e(Z)eG dd dZ*eG dd dZ+G dd dZ,G dd de-Z.dS )a  
Recovery validation worker.

Workflow per video:
  - claim from `validation_recover_queue`
  - download raw tar from `1-cleaned-data`
  - replay `audio_polish` deterministically
  - match replayed child IDs to historical `transcription_results`
  - validate all historical transcription rows
  - flush+upload shard BEFORE marking video recovered (crash-safe)
  - prefetch next video's raw tar + replay during GPU inference
    )annotationsN)	dataclassfield)datetimetimezone)Path)Optional)	EnvConfig)R2Client   )HEARTBEAT_INTERVAL_SValidationConfig)ParquetPacker)ValidationPipeline)RecoveryLoadResultload_recover_segments)ReplayLedgerWriterbuild_replay_ledger_payload)RecoverReferenceStorec                   @  s   e Zd ZU dZded< dZded< dZded< dZded< dZded	< d
Z	ded< dZ
ded< d
Zded< dZded< dZded< eedZded< dS )RecoverStatsNOptional[str]current_video_idr   intvideos_processedvideos_failedvideos_quarantinedsegments_processed        floatavg_segs_per_secondshards_writtentotal_parquet_mbzOptional[datetime]last_video_completed_at
last_error)default_factoryzlist[float]_recent_speeds)__name__
__module____qualname__r   __annotations__r   r   r   r   r   r    r!   r"   r#   r   listr%    r+   r+   6/home/ubuntu/transcripts/validations/recover_worker.pyr   )   s   
 r   c                   @  sF   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< dS )PrefetchedVideostrvideo_id
list[dict]tx_rowsset[str]validated_segment_idsr   recoverdictflag_summaryr   work_dirr   prefetch_elapsedN)r&   r'   r(   r)   r+   r+   r+   r,   r-   8   s   
 r-   c                   @  s   e Zd ZdGddZdd Zdd ZdHddZdIdJddZdd Zdd Z	dd Z
dKddZdLddZdMd d!ZdNd%d&Zdd'dOd4d5ZdPd7d8ZdPd9d:Zd;d< Zd=d> ZdQdRdAdBZdCdD ZdEdF ZdS )SRecoverValidationWorkerconfigr   c                 C  st   || _ t|| _d | _t | _d | _t | _	d | _
ttjdd| _d| _tt | _t|| j| _t|| _d S )Nrecover_validation_)prefixr   )r:   r   pipelinepackerr   stats_dbasyncioEvent_shutdown_event_heartbeat_taskr   tempfilemkdtemp	_work_dir_start_timer
   r	   _raw_r2r   _reference_storer   _replay_ledger_writer)selfr:   r+   r+   r,   __init__D   s   

z RecoverValidationWorker.__init__c              
     sl  t    _t }tjtjfD ]}|||f fdd	 qzzD  I d H   	 I d H   j
 I d H    I d H  td  j   jd }t j| _t   _  I d H  W n5 ty } z)tjd| dd t|d d  j_ d	t|d d I d H  W Y d }~nd }~ww W   I d H  d S W   I d H  d S   I d H  w )
Nc                   s   t  | S N)rA   create_task_handle_shutdown)srL   r+   r,   <lambda>V   s    z/RecoverValidationWorker.start.<locals>.<lambda>z/Loading validation models for recover worker...recover_parquet_shardszRecover worker fatal error: Texc_info  error) timerH   rA   get_running_loopsignalSIGTERMSIGINTadd_signal_handler_connect_db_recover_dead_videosrJ   start	_registerloggerinfor=   load_modelsrG   r   r:   r>   rO   _heartbeat_looprD   
_main_loop	ExceptionrX   r.   r?   r#   _update_worker_status_cleanup)rL   loopsig
output_direr+   rR   r,   ra   R   s4   



*"zRecoverValidationWorker.startc              
     sV  d}d}d}d}| j j}|dkrtd| d| d d }d }| j s|dkr<| jj|kr<td| d n|rJ|j}td	|  nK| 	 I d H }	|	s|d
7 }||kret| d nt
d| d}
td|
dd ztj| j |
dI d H  W n tjy   Y q!w |	d }d}|| j_|dkr|| jj d
 nd
}|dkr| j st|  }zzA| ||I d H  | j jd
7  _ttj| j_d}z	|  I d H  W n ty } ztd|  W Y d }~nd }~ww W n ty7 } z.| j jd
7  _t|d d | j_td| d|  | |t|I d H  W Y d }~nd }~w ty } z{| j j d
7  _ t|d d | j_tj!d| d| dd | "|t|I d H  t|}d|v pzd|v pzd|v }|rdd l#}|j$%  |d
7 }td| d| d ||krt!| d  W Y d }~W d | j_d }ncnd}W Y d }~nd }~ww W d | j_d }nd | j_d }w |r|& sztj|d!dI d H }W n* tjtfy   d }Y nw |r|& rz|' }W n ty   d }Y nw d }| j r'td"| jj d#| jj d$| jj  d% d S )&Nr         zRECOVER MAX_VIDEOS=u    — will stop after z	 video(s)zReached RECOVER MAX_VIDEOS=z, shutting down.zUsing prefetched video: r   z* consecutive empty recover claims, exitingg       @g      >@z#No pending recover videos, waiting z.0fzs...timeoutr/   zPost-video heartbeat failed: rW   [z] Quarantined: z
] Failed: TrU   CUDAzdevice-sidecuFFTzGPU error (/z), cleared CUDA cachez8 consecutive GPU failures (GPU likely poisoned), exiting   zRecover main loop ended: z processed, z quarantined, z failed)(r:   
max_videosrc   rd   rC   is_setr?   r   r/   _claim_videominrA   wait_forwaitTimeoutErrorr   rO   _prefetch_next_process_one_videor   nowr   utcr"   _send_heartbeatrh   warningRecoverQuarantineErrorr   r.   r#   _mark_video_quarantinedr   rX   _mark_video_failedtorchcudaempty_cachedoneresult)rL   consecutive_empty	max_emptyconsecutive_gpu_failuresmax_gpu_failuresrx   prefetch_task
prefetchedr/   claimedr}   	remainingrn   err_stris_gpu_fatalr   r+   r+   r,   rg   m   s   "


Tz"RecoverValidationWorker._main_loopreturnOptional[PrefetchedVideo]c                   s  dz|   I dH }|sW dS |d t }| j }|jdd t }| I dH sD| dI dH  tj	|dd W dS | 
I dH }dd D | |d| jj|I dH }|d| jj|I dH  |d fd	d
I dH }| |jI dH }t | }	td dt|j dt| dt|j d|	dd t|||||	dW S  ty }
 z#td|
  r| dt|
dd  I dH  W Y d}
~
dS d}
~
ww )zGClaim + download + replay the next video while current one runs on GPU.Nr/   Texist_okzNo transcription rows foundignore_errorsc                 S     h | ]}|d  qS segment_filer+   .0rowr+   r+   r,   	<setcomp>       z9RecoverValidationWorker._prefetch_next.<locals>.<setcomp>c                     &   t  jkrddS d ddS NT)target_segment_idsreplay_all_tx_parentsr   r7   r+   	extracted
target_idstx_idsr1   r/   r+   r,   rS          
z8RecoverValidationWorker._prefetch_next.<locals>.<lambda>z[prefetch] z: z to validate (z already validated, z extras) ready in .1frQ   )r/   r1   r3   r4   r6   r7   r8   z[prefetch] Failed: zprefetch error:    )rz   rY   rG   mkdirrA   rZ   _fetch_tx_rowsr   shutilrmtree_fetch_validated_segment_idsrun_in_executorrI   download_tarextract_tar_fetch_flag_summaryextra_regen_idsrc   rd   lensegmentsr-   rh   r   r.   )rL   r   t0
video_workrk   validated_idstar_pathr4   r6   elapsedrn   r+   r   r,   r      sh   

	$z&RecoverValidationWorker._prefetch_nextNr/   r.   r   c                   s  t   }t }|r'|jkr'|j|j}|j}|j}|j}d}	d}
|j	}n~| j
 }|jdd t   }| I d H sDtdt   | }	t   }|d | jj|I d H }|d | jj|I d H  t   | }
| I d H }dd D | t   }|d  fddI d H }t   | }| |jI d H }z=|jrtd	t|j |jrtd
t|j |jrt   }| j|j}t   | }| j| | j  | jj}| j j t|7  _ |d | j_!|"dd| j_#ng }d}t$%d d t   | }|dkrt|| nd}| jj&'| | jj&dd  | j_&t(| jj&t| jj& | j_)|jrNt*+t,|jd d nd }t-|j.|j/||j|| j0j1|j|jd
}| j23|}| j4t|t|j.t|jdd|d |d |d t5t|j|d  d|dI d H  t$%d dt dt| dt|j d|d  dt5t|j|d  d d|j6 d|	dd|
dd |dd!|dd"|dd# W t7j8|dd$ d S t7j8|dd$ w )%Nr   Tr   z-No transcription rows found for recover videoc                 S  r   r   r+   r   r+   r+   r,   r   *  r   z=RecoverValidationWorker._process_one_video.<locals>.<setcomp>c                     r   r   r   r+   r   r+   r,   rS   0  r   z<RecoverValidationWorker._process_one_video.<locals>.<lambda>z%missing historical IDs after replay: zmissing raw parent files: r    total_mbrs   z8] All segments already validated, replay-only for extrasr   ii  )
r/   r1   replayed_segment_idsmatched_tx_idsr3   r   r6   	worker_idmissing_tx_idsmissing_parent_filesrr   rX   flagged_total)
validated_segmentsreplayed_segmentsextras_countmissing_tx_segmentsr   extra_timeout_segmentsextra_error_segmentsextra_flagged_segmentsextra_unflagged_segmentsextra_regen_ids_jsonz] Recover complete: tx=z, validated=z	, extras=z
 (flagged=z, unflagged=z
), ledger=z, timings tx=r   zs raw=z	s replay=zs validate=zs total=rQ   r   )9rY   rA   rZ   r/   r1   r3   r4   r6   r7   r8   rG   r   r   RuntimeErrorr   rI   r   r   r   r   r   r   r   r   r   r   r=   process_videor>   add_video_resultsflushr?   r   r    getr!   rc   rd   r%   appendsumr   jsondumpssortedr   replayed_regen_idsr   r:   r   rK   upload_mark_video_donemaxkeyr   r   )rL   r/   r   r   rk   r   r4   r6   r   
tx_elapsedraw_elapsedreplay_elapsedtx_t0raw_t0r   	replay_t0val_t0resultsval_elapsedpacker_statsr   segs_per_secextras_jsonledger_payloadledger_artifactr+   r   r,   r     s   


$"	z*RecoverValidationWorker._process_one_videoc                   sf   | j js	tddd l}d| j jv }|j| j jddd|rdnddd	I d H | _td
| d d S )Nz(DATABASE_URL required for recover workerr   z:6543r      <   d   require)dsnmin_sizemax_sizecommand_timeoutstatement_cache_sizesslz"Recover DB pool connected (pooler=))r:   database_urlr   asyncpgcreate_poolr@   rc   rd   )rL   r   	is_poolerr+   r+   r,   r_     s   
z#RecoverValidationWorker._connect_dbc              	     sb   | j  4 I d H }|d| jj dI d H  W d   I d H  d S 1 I d H s*w   Y  d S )N
                UPDATE z
                SET status = 'pending', claimed_by = NULL, claimed_at = NULL, updated_at = now()
                WHERE status = 'recovering'
                  AND claimed_at < now() - interval '15 minutes'
            r@   acquireexecuter:   recover_queue_tablerL   connr+   r+   r,   r`     s   .z,RecoverValidationWorker._recover_dead_videosc                   s   d| j j| j j| j j| j jd| j j| j j| j j| j j| j j	| j j
| j j| j jd
}| j 4 I d H }|d| j j| j jt|I d H  W d   I d H  d S 1 I d H sXw   Y  d S )Nr4   )mms_lid	voxlinguaconformer_multiwav2vec_lang)
modemodelsreference_modereference_buckettx_parquet_keyflags_parquet_keyreplay_ledger_prefixmodel_bucketoutput_bucketr  a  
                INSERT INTO worker_validators (
                    worker_id, status, gpu_type, config_json,
                    started_at, last_heartbeat_at,
                    videos_processed, videos_failed, segments_processed,
                    shards_written, shards_uploaded, total_parquet_mb
                ) VALUES ($1, 'online', $2, $3::jsonb, now(), now(), 0, 0, 0, 0, 0, 0)
                ON CONFLICT (worker_id) DO UPDATE SET
                    status = 'online',
                    gpu_type = $2,
                    config_json = $3::jsonb,
                    started_at = now(),
                    last_heartbeat_at = now(),
                    videos_processed = 0,
                    videos_failed = 0,
                    segments_processed = 0,
                    shards_written = 0,
                    shards_uploaded = 0,
                    total_parquet_mb = 0,
                    last_error = NULL,
                    current_video_id = NULL
            )r:   enable_mms_lidenable_voxlinguaenable_conformer_multienable_wav2vec_langrecover_reference_moder2_reference_bucketrecover_tx_parquet_keyrecover_flags_parquet_keyrecover_replay_ledger_prefixr2_model_bucketr2_bucket_outputr  r@   r   r  r   gpu_typer   r   )rL   config_jsonr  r+   r+   r,   rb     s*   .z!RecoverValidationWorker._registerOptional[dict]c              	     s|   | j  4 I d H !}|d| jj d| jj d| jjI d H }W d   I d H  n1 I d H s1w   Y  |r<t|S d S )Nr   a   
                SET status = 'recovering',
                    claimed_by = $1,
                    claimed_at = now(),
                    updated_at = now()
                WHERE video_id = (
                    SELECT video_id
                    FROM z
                    WHERE status = 'pending'
                    ORDER BY tx_segments DESC, video_id
                    LIMIT 1
                    FOR UPDATE SKIP LOCKED
                )
                RETURNING video_id, tx_segments
            )r@   r   fetchrowr:   r  r   r5   )rL   r  r   r+   r+   r,   rz     s   (z$RecoverValidationWorker._claim_videor2   c                   s"   | j jr| j |I d H S t S rN   )rJ   enabledfetch_validated_segment_idsset)rL   r/   r+   r+   r,   r     s   z4RecoverValidationWorker._fetch_validated_segment_idsr0   c              	     sz   | j jr| j |I d H S | j 4 I d H }|d|I d H }W d   I d H  n1 I d H s1w   Y  dd |D S )Na  
                SELECT
                    segment_file,
                    expected_language_hint,
                    detected_language,
                    transcription,
                    tagged,
                    quality_score,
                    speaker_emotion,
                    speaker_style,
                    speaker_pace,
                    speaker_accent
                FROM transcription_results
                WHERE video_id = $1
                ORDER BY segment_file
            c                 S  s   g | ]}t |qS r+   )r5   r   r+   r+   r,   
<listcomp>  r   z:RecoverValidationWorker._fetch_tx_rows.<locals>.<listcomp>)rJ   r!  fetch_tx_rowsr@   r   fetch)rL   r/   r  rowsr+   r+   r,   r     s   (z&RecoverValidationWorker._fetch_tx_rowssegment_ids	list[str]r5   c              	     s   |s
dddddS | j jr| j |I d H S | j 4 I d H }|d|I d H }|d|I d H }W d   I d H  n1 I d H sCw   Y  dddt|pOdd}|D ]}|d }||v rgt|d ||< qU|S )Nr   )rr   rX   rate_limitedr   z
                SELECT count(DISTINCT segment_id)
                FROM transcription_flags
                WHERE segment_id = ANY($1::text[])
            z
                SELECT flag_type, count(DISTINCT segment_id) AS cnt
                FROM transcription_flags
                WHERE segment_id = ANY($1::text[])
                GROUP BY flag_type
            	flag_typecnt)rJ   r!  fetch_flag_summaryr@   r   fetchvalr&  r   )rL   r(  r  r   r'  summaryr   r+  r+   r+   r,   r     s(   
(z+RecoverValidationWorker._fetch_flag_summary)r   r   r   r   r   r   r   r   r   r   r   r   r   c       
           sx   | j  4 I d H %}|d| jj d|||||||||	|
|I d H  W d   I d H  d S 1 I d H s5w   Y  d S )Nr   a  
                SET status = 'recovered',
                    recovered_segments = $2,
                    replayed_segments = $3,
                    extra_regen_segments = $4,
                    missing_tx_segments = $5,
                    missing_parent_files = $6,
                    extra_timeout_segments = $7,
                    extra_error_segments = $8,
                    extra_flagged_segments = $9,
                    extra_unflagged_segments = $10,
                    extra_regen_ids_json = $11,
                    completed_at = now(),
                    updated_at = now(),
                    error_message = NULL
                WHERE video_id = $1
            r   )rL   r/   r   r   r   r   r   r   r   r   r   r   r  r+   r+   r,   r     s   
.z(RecoverValidationWorker._mark_video_donerX   c              	     n   | j  4 I d H  }|d| jj d||d d I d H  W d   I d H  d S 1 I d H s0w   Y  d S )Nr   z
                SET status = 'quarantined',
                    error_message = $2,
                    updated_at = now()
                WHERE video_id = $1
            rW   r   rL   r/   rX   r  r+   r+   r,   r   5     .z/RecoverValidationWorker._mark_video_quarantinedc              	     r0  )Nr   z
                SET status = 'failed',
                    error_message = $2,
                    updated_at = now()
                WHERE video_id = $1
            rW   r   r1  r+   r+   r,   r   ?  r2  z*RecoverValidationWorker._mark_video_failedc              
     s   | j  sLz	|  I d H  W n ty) } ztd|  W Y d }~nd }~ww ztj| j  t	dI d H  W d S  tj
yD   Y nw | j  rd S d S )NzRecover heartbeat failed: rq   )rC   ry   r   rh   rc   r   rA   r|   r}   r   r~   )rL   rn   r+   r+   r,   rf   K  s   
z'RecoverValidationWorker._heartbeat_loopc                   s   | j  4 I d H <}|d| jj| jj| jj| jj| jj	 | jj
t| jjd| jjt| jjd| jj| jjI d H  W d   I d H  d S 1 I d H sLw   Y  d S )Na  
                UPDATE worker_validators SET
                    current_video_id = $2,
                    videos_processed = $3,
                    videos_failed = $4,
                    segments_processed = $5,
                    avg_segs_per_second = $6,
                    shards_written = $7,
                    total_parquet_mb = $8,
                    last_heartbeat_at = now(),
                    last_video_completed_at = $9,
                    last_error = $10
                WHERE worker_id = $1
               )r@   r   r  r:   r   r?   r   r   r   r   r   roundr   r    r!   r"   r#   r  r+   r+   r,   r   W  s   .z'RecoverValidationWorker._send_heartbeat statusc              	     sz   | j sd S | j  4 I d H !}|d| jj||r|d d nd I d H  W d   I d H  d S 1 I d H s6w   Y  d S )Nz
                UPDATE worker_validators
                SET status = $2, last_error = $3, last_heartbeat_at = now()
                WHERE worker_id = $1
            rW   )r@   r   r  r:   r   )rL   r6  rX   r  r+   r+   r,   ri   s  s   .z-RecoverValidationWorker._update_worker_statusc                   s$   t d|j d | j  d S )Nz	Received z', initiating recover worker shutdown...)rc   rd   namerC   r#  )rL   rl   r+   r+   r,   rP     s   z(RecoverValidationWorker._handle_shutdownc              
     sH  | j r| j   z| j I d H  W n
 tjy   Y nw | jr6| j  | jj}|d | j_|dd| j_	z| j
  W n	 tyF   Y nw z
| j I d H  W n	 tyZ   Y nw z	|  I d H  W n	 tym   Y nw | dI d H  | jr| j I d H  tj| jdd td| jj d| jj d	| jj d
| jj  d S )Nr    r   r   offlineTr   z,Recover worker shutdown complete. processed=z quarantined=z failed=z
 segments=)rD   cancelrA   CancelledErrorr>   r   r?   r    r   r!   r=   unload_modelsrh   rJ   closer   ri   r@   r   r   rG   rc   rd   r   r   r   r   )rL   r   r+   r+   r,   rj     sR   

z RecoverValidationWorker._cleanup)r:   r   )r   r   rN   )r/   r.   r   r   )r   r  )r/   r.   r   r2   )r/   r.   r   r0   )r(  r)  r   r5   )r/   r.   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )r/   r.   rX   r.   )r5  )r6  r.   rX   r.   )r&   r'   r(   rM   ra   rg   r   r   r_   r`   rb   rz   r   r   r   r   r   r   rf   r   ri   rP   rj   r+   r+   r+   r,   r9   C   s,    

f:w	
*


'
%

r9   c                   @  s   e Zd ZdZdS )r   zCRaised when replay cannot faithfully recover the historical tx set.N)r&   r'   r(   __doc__r+   r+   r+   r,   r     s    r   )/r=  
__future__r   rA   r   loggingrandomr   r[   rE   rY   dataclassesr   r   r   r   pathlibr   typingr   
src.configr	   src.r2_clientr
   r:   r   r   r>   r   r=   r   recover_loaderr   r   recover_replay_ledgerr   r   recover_reference_storer   	getLoggerr&   rc   r   r-   r9   r   r   r+   r+   r+   r,   <module>   sB    

    m