o
    6i&                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZmZmZ dd	lmZmZmZmZmZ dd
lmZ ddlm Z  ddl!m"Z" ddl#m$Z$ e%e&Z'G dd dZ(dS )z{
Worker lifecycle: register, heartbeat loop, main loop, graceful shutdown.
Each Docker container runs one Worker instance.
    )annotationsN)Path)Optional   )CacheManager)		EnvConfigHEARTBEAT_INTERVAL_S
MAX_VIDEOSPROMPT_VERSIONSCHEMA_VERSIONTRIMMER_VERSIONVALIDATOR_VERSIONTEMPERATURETHINKING_LEVEL)MockDB
SupabaseDB	VideoTaskWorkerStatsget_db)Pipeline)AIStudioProvider)BaseProvider)R2Clientc                   @  sb   e Zd ZdddZdddZd	d
 Zdd Zdd Zdd Zdd Z	dddZ
dd Zdd ZdS )Workerconfigr   c              	   C  s   || _ |j| _t | _t | _d | _t|| _	t
|| _t|j|jd| _|  | _|j}t|j}t|j}td| d| d| d d S )Napi_key	mock_modezProvider pool: key[z] primary, z fallback(s) (z total keys))r   	worker_idr   statsasyncioEvent_shutdown_event_heartbeat_taskr   dbr   r2r   primary_gemini_keyr   primary_provider_create_fallbackfallback_providergemini_key_indexlengemini_keysfallback_gemini_keysloggerinfo)selfr   key_idxn_keysn_fallbacks r4   &/home/ubuntu/transcripts/src/worker.py__init__   s    





"zWorker.__init__returnOptional[BaseProvider]c                 C  s$   | j j}|sdS t|d | j jdS )zEFirst fallback key from the pool (different GCP project, same quota).Nr   r   )r   r-   r   r   )r0   	fallbacksr4   r4   r5   r(   5   s   zWorker._create_fallbackc              
     s  t  }tjtjfD ]}|||f fdd	 qzaz#  I dH   jjs. 	 I dH  t 
   _  I dH  W n) tyg } ztjd| dd  j jt|I dH  W Y d}~nd}~ww W   I dH  dS W   I dH  dS   I dH  w )zDMain entry: register, set up cache, start heartbeat, process videos.c                   s   t  | S )N)r    create_task_handle_shutdown)sr0   r4   r5   <lambda>C   s    zWorker.start.<locals>.<lambda>NzWorker fatal error: T)exc_info)r    get_running_loopsignalSIGTERMSIGINTadd_signal_handler	_registerr   r   _setup_cachesr:   _heartbeat_loopr#   
_main_loop	Exceptionr.   errorr$   set_worker_errorr   str_cleanup)r0   loopsiger4   r=   r5   start?   s&   &"zWorker.startc           
        s   d| j | jjfg}| jr| jjr|d| j| jjd f |D ][\}}}z9t|}| I dH }t|t	r9||_
| I dH }|rL|di ddnd}td| d	| d
| d W q  ty{ }	 ztd| d|	  W Y d}	~	q d}	~	ww dS )zQSet up V2 cache for primary + fallback keys (each GCP project has its own cache).primaryfallbackr   NusageMetadatatotalTokenCount?zCache ready (z): z (z tokens)zCache setup failed for z, falling back to V1 uncached: )r'   r   r&   r)   r-   appendr   ensure_cache
isinstancer   cached_content_nameget_cache_infogetr.   r/   rI   warning)
r0   providers_to_cachelabelproviderr   cm
cache_namer/   tokensrP   r4   r4   r5   rF   T   s$   
""zWorker._setup_cachesc                   sn   t tttttd| jjt| jj	| jj
d
}td| j d| jj d | jj| jd| jj|dI d H  d S )Naistudio)
prompt_versionschema_versiontrimmer_versionvalidator_versiontemperaturethinking_levelr`   r*   
total_keysr   zRegistering worker z (key_index=))r   r`   gpu_typeconfig_json)r
   r   r   r   r   r   r   r*   r+   r,   r   r.   r/   r   r$   register_workerrm   )r0   rn   r4   r4   r5   rE   f   s&   
zWorker._registerc              
     s   | j  sQz| j| j| jI d H  W n ty. } ztd|  W Y d }~nd }~ww zt	j
| j  tdI d H  W d S  t	jyI   Y nw | j  rd S d S )NzHeartbeat update failed: timeout)r"   is_setr$   update_heartbeatr   r   rI   r.   r]   r    wait_forwaitr   TimeoutError)r0   rP   r4   r4   r5   rG   {   s$   
zWorker._heartbeat_loopc           	   	     s>  t | j| j| j| j| j| j| jd}d}d}d }d }tdkr+t	
dt dt d | j stdkrC|tkrCt	
dt d n|rS|}d }t	
d|j  n
| j| jI d H }|d u r|d	7 }|d
krot	
d nt	
d| d ztj| j ddI d H  W n tjy   Y q+w d}t	
d|j d|j d tdkrt| d	 nd	}|dkrt|  }||I d H }|d	7 }|st	d|j d |r| sztj|ddI d H }W n) tjtfy   d }Y nw |r| rz| }W n ty   d }Y nw d }| j r1t	
d| d d S )N)r   r$   r%   r'   r)   r   r   r   zMAX_VIDEOS=u    — will stop after z	 video(s)zReached MAX_VIDEOS=z, shutting down.zUsing prefetched video: r      z-No more videos to process. Worker going idle.z(No pending videos, waiting 30s (attempt z/3)...   rp   zClaimed video: z (lang=rl   zVideo z failed, continuing to next...   zMain loop ended: z videos processed)r   r   r$   r%   r'   r)   r   r   r	   r.   r/   r"   rr   video_idclaim_videor    rt   ru   rv   languager:   _prefetch_nextprocess_videorJ   donerI   result)	r0   pipelineconsecutive_emptyvideos_processedprefetch_task
prefetchedtask	remainingsuccessr4   r4   r5   rH      sz   


2zWorker._main_loopOptional[VideoTask]c              
     s   zF| j | jI dH }|du rW dS td|j d ddl}t|jd| j dd}| j	
|j| ||_td|j d	|  |W S  tyb } ztd
|  W Y d}~dS d}~ww )zBClaim and pre-download the next video while current one processes.Nz[prefetch] Claimed next: z, downloading tar...r   	prefetch__)prefixz[prefetch] z tar ready at z[prefetch] Failed: )r$   r{   r   r.   r/   rz   tempfiler   mkdtempr%   download_tarprefetch_dirrI   r]   )r0   r   r   work_dirrP   r4   r4   r5   r}      s"   zWorker._prefetch_nextc                   s$   t d|j d | j  d S )Nz	Received z!, initiating graceful shutdown...)r.   r/   namer"   set)r0   rO   r4   r4   r5   r;      s   zWorker._handle_shutdownc                   s   t d | jr!| j  z| jI d H  W n
 tjy    Y nw | jjr:t d| jj  | j	| jjI d H  z| j
| j| jI d H  W n	 tyQ   Y nw | j| jI d H  t d| j d| jj d| jj d| jj d	 d S )NzCleaning up worker...zReleasing video zWorker z shutdown complete. Stats: z completed, z	 failed, z batches)r.   r/   r#   cancelr    CancelledErrorr   current_video_idr$   release_videors   r   rI   set_worker_offlinesegments_completedsegments_failedbatches_completedr=   r4   r4   r5   rM      s2   

zWorker._cleanupN)r   r   )r7   r8   )r7   r   )__name__
__module____qualname__r6   r(   rQ   rF   rE   rG   rH   r}   r;   rM   r4   r4   r4   r5   r      s    



Gr   ))__doc__
__future__r   r    loggingrA   syspathlibr   typingr   cache_managerr   r   r   r   r	   r
   r   r   r   r   r   r$   r   r   r   r   r   r   r   providers.aistudior   providers.baser   	r2_clientr   	getLoggerr   r.   r   r4   r4   r4   r5   <module>   s"    ,
