o
    lQi?                  	   @  s   d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	m
Z
mZ ddlmZmZ ddlmZ ddlmZmZ eeZdddddddddZd	d
 Ze
G dd dZe
G dd dZG dd dZG dd dZdddZdS )z
Database operations: video_queue claim/release, worker heartbeat, results insert, flags.
Uses asyncpg for direct PostgreSQL (bypasses Supabase REST/PostgREST bottleneck).
Supports mock mode for local testing.
    )annotationsN)	dataclassfield)datetimetimezone)Path)AnyOptional)	claims_okclaims_fail
inserts_okinserts_failinserts_rowsheartbeats_okheartbeats_failretriesc                  C  sr   t } td| d  d| d | d   d| d  d| d  d	| d
  d| d  d| d | d   d| d   d S )Nz[DB-STATS] claims=r
   /r   z	 inserts=r   zok/r   zfail (r   z rows) heartbeats=r   r   z	 retries=r   )	_db_statsloggerinfo)s r   "/home/ubuntu/transcripts/src/db.pylog_db_stats   s    r   c                   @  sB   e Zd ZU ded< ded< dZded< dZded< d	Zd
ed< d	S )	VideoTaskstrvideo_idlanguager   intsegment_countpendingstatusNzOptional[Path]prefetch_dir)__name__
__module____qualname____annotations__r   r!   r"   r   r   r   r   r   (   s   
 r   c                   @  s   e Zd ZU dZded< dZded< dZded< dZded< dZded< dZ	ded< d	Z
d
ed< dZded< dZded< d	Zd
ed< d	Zd
ed< dZded< dZded< dZded< eedZded< dS )WorkerStatsr   r   segments_sentsegments_completedsegments_failedsegments_429
cache_hitsbatches_completedg        floatavg_batch_latency_msNzOptional[str]current_video_idsegments_remaining
active_rpm
active_tpmtotal_input_tokenstotal_output_tokenstotal_cached_tokens)default_factoryz	list[str]errors)r#   r$   r%   r(   r&   r)   r*   r+   r,   r-   r/   r0   r1   r2   r3   r4   r5   r6   r   listr8   r   r   r   r   r'   1   s    
 r'   c                   @  s   e Zd ZdZdd Zd0ddZdd	 Zd
d Zd1ddZd2ddZ	d2ddZ
d3ddZd4ddZd5d"d#Zd6d$d%Zd7d&d'Zd8d*d+Zd9d-d.Zd/S ):MockDBz,In-memory mock for testing without Supabase.c                 C  s   g | _ i | _g | _g | _d S N)video_queueworkersresultsflagsselfr   r   r   __init__G   s   
zMockDB.__init__videoslist[VideoTask]c                 C  s
   || _ d S r;   )r<   )rA   rC   r   r   r   seed_videosM   s   
zMockDB.seed_videosc                      d S r;   r   r@   r   r   r   connectP       zMockDB.connectc                   rF   r;   r   r@   r   r   r   closeQ   rH   zMockDB.close	worker_idr   returnOptional[VideoTask]c                   s*   | j D ]}|jdkrd|_|  S qd S )Nr    claimed)r<   r!   )rA   rJ   vr   r   r   claim_videoS   s   

zMockDB.claim_videor   c                   "   | j D ]
}|j|krd|_qd S )Nr    r<   r   r!   rA   r   rN   r   r   r   release_videoZ      

zMockDB.release_videoc                   rP   )NdonerQ   rR   r   r   r   mark_video_done_   rT   zMockDB.mark_video_doneerrorc                   s"   | j D ]
}|j|krd|_qd S )NfailedrQ   )rA   r   rW   rN   r   r   r   mark_video_failedd   rT   zMockDB.mark_video_failedprovidergpu_typeconfig_jsondictc                   s*   |d|||t tj d| j|< d S )Nonline)rJ   r!   rZ   r[   r\   
started_at)r   nowr   utc	isoformatr=   )rA   rJ   rZ   r[   r\   r   r   r   register_workeri   s
   zMockDB.register_workerstatsr'   c                   sl   || j v r4| j | |j|j|j|j|j|j|j|j	|j
|j|j|j|j|jttj d d S d S )N)total_segments_senttotal_segments_completedtotal_segments_failedtotal_segments_429total_cache_hitsr-   r/   r0   r1   r2   r3   r4   r5   r6   last_heartbeat_at)r=   updater(   r)   r*   r+   r,   r-   r/   r0   r1   r2   r3   r4   r5   r6   r   r`   r   ra   rb   )rA   rJ   rd   r   r   r   update_heartbeatp   s(   

zMockDB.update_heartbeatc                   s"   || j v rd| j | d< d S d S )Noffliner!   r=   )rA   rJ   r   r   r   set_worker_offline   s   
zMockDB.set_worker_offlinec                   s0   || j v rd| j | d< || j | d< d S d S )NrW   r!   
last_errorrn   )rA   rJ   rW   r   r   r   set_worker_error   s
   
zMockDB.set_worker_errorr>   
list[dict]c                      | j | d S r;   )r>   extendrA   r>   r   r   r   insert_results      zMockDB.insert_resultsr?   c                   rs   r;   )r?   rt   rA   r?   r   r   r   insert_flags   rw   zMockDB.insert_flagsN)rC   rD   rJ   r   rK   rL   r   r   r   r   rW   r   rJ   r   rZ   r   r[   r   r\   r]   rJ   r   rd   r'   rJ   r   rJ   r   rW   r   r>   rr   r?   rr   )r#   r$   r%   __doc__rB   rE   rG   rI   rO   rS   rV   rY   rc   rl   ro   rq   rv   ry   r   r   r   r   r:   D   s     









r:   c                   @  s   e Zd ZdZd;ddZdd Zdd	 Zd<ddZd=ddZd>ddZ	d>ddZ
d?ddZd@dd ZdAd#d$ZdBd%d&ZdCd'd(ZdDd+d,ZdEd.d/ZdFdGd5d6ZdHd8d9Zd:S )I
PostgresDBuL   Direct PostgreSQL via asyncpg — bypasses Supabase REST/PostgREST entirely.database_urlr   c                 C  s   || _ d | _d S r;   )_dsn_pool)rA   r   r   r   r   rB      s   
zPostgresDB.__init__c              	     s\   dd l }dd }d| jv }|j| jddd|rdndd	|d
I d H | _td| d d S )Nr   c                   s>   | j dtjtjddI d H  | j dtjtjddI d H  d S )Njsonb
pg_catalog)encoderdecoderschemajson)set_type_codecr   dumpsloads)connr   r   r   
_init_conn   s   z&PostgresDB.connect.<locals>._init_connz:6543         d   require)dsnmin_sizemax_sizecommand_timeoutstatement_cache_sizesslinitz)asyncpg pool ready (min=2, max=8, pooler=))asyncpgr   create_poolr   r   r   )rA   r   r   	is_poolerr   r   r   rG      s   

	zPostgresDB.connectc                   s*   | j r| j  I d H  td d S d S )Nzasyncpg pool closed)r   rI   r   r   r@   r   r   r   rI      s
   zPostgresDB.close   db_opc                   s   t |D ]]}z	| I dH W   S  tyb } zFtd  d7  < ||d k rWd| tdd }td| d|d  d	| d
t|dd  d|dd t	|I dH  n W Y d}~qd}~ww dS )z>Retry an async DB operation with exponential backoff + jitter.Nr      r         ?       @[z
] attempt r   z	 failed: r   , retry in .1fr   )
range	Exceptionr   randomuniformr   warningr   asynciosleep)rA   coro_fnmax_retriesop_nameattemptewaitr   r   r   _retry   s   <zPostgresDB._retryrJ   rK   rL   c                   sh  t dD ]}zK| j 4 I dH }|d|I dH }W d  I dH  n1 I dH s+w   Y  |rOtd  d7  < t|d |d pCd|d	 pHd
ddW   S W  dS  ty } zStd  d7  < |dk rd| tdd }t	
d|d  dt|dd  d|dd t|I dH  nt	dt|dd   W Y d}~ dS W Y d}~qd}~ww dS )uJ   Atomic claim: FOR UPDATE SKIP LOCKED — no race conditions, no CAS retry.r   Na  
                        UPDATE video_queue
                        SET status = 'claimed',
                            claimed_by = $1,
                            claimed_at = now()
                        WHERE video_id = (
                            SELECT video_id FROM video_queue
                            WHERE status = 'pending'
                            LIMIT 1
                            FOR UPDATE SKIP LOCKED
                        )
                        RETURNING video_id, language, segment_count
                    r
   r   r   r   enr   r   rM   )r   r   r   r!   r   r   r   g      ?zClaim failed (attempt z): r   r   r   r   zClaim failed after 3 attempts: )r   r   acquirefetchrowr   r   r   r   r   r   r   r   r   r   rW   )rA   rJ   r   r   rowr   r   r   r   r   rO      s:   (

0zPostgresDB.claim_videor   c                   s   z-| j  4 I d H }|d|I d H  W d   I d H  W d S 1 I d H s'w   Y  W d S  tyR } ztd| dt|d d   W Y d }~d S d }~ww )Nz
                    UPDATE video_queue
                    SET status = 'pending', claimed_by = NULL, claimed_at = NULL
                    WHERE video_id = $1
                zrelease_video failed for z: P   )r   r   executer   r   rW   r   )rA   r   r   r   r   r   r   rS      s   20zPostgresDB.release_videoc                   sp   z j  fddddI d H  W d S  ty7 } ztd dt|d d   W Y d }~d S d }~ww )Nc                     s     dS )NzPUPDATE video_queue SET status = 'done', completed_at = now() WHERE video_id = $1_execr   rA   r   r   r   <lambda>   s    z,PostgresDB.mark_video_done.<locals>.<lambda>	mark_doner   zmark_video_done failed for  (non-fatal): r   r   r   r   rW   r   )rA   r   r   r   r   r   rV      s   0zPostgresDB.mark_video_donerW   c                   sr   zj  fddddI d H  W d S  ty8 } ztd dt|d d   W Y d }~d S d }~ww )Nc                     s    d d d S )NzPUPDATE video_queue SET status = 'failed', error_message = $2 WHERE video_id = $1  r   r   rW   rA   r   r   r   r     s    z.PostgresDB.mark_video_failed.<locals>.<lambda>mark_failedr   zmark_video_failed failed for r   r   r   )rA   r   rW   r   r   r   r   rY     s   0zPostgresDB.mark_video_failedrZ   r[   r\   r]   c           	        s
  t dD ]x}z8| j 4 I d H }|d||||I d H  W d   I d H  n1 I d H s.w   Y  td| d W  d S  ty} } z2d|d  tdd }t	d	|d  d
t
|d d  d|dd t|I d H  W Y d }~qd }~ww td d S )N   a  
                        INSERT INTO workers (
                            worker_id, status, provider, gpu_type, config_json,
                            started_at, last_heartbeat_at,
                            total_segments_sent, total_segments_completed,
                            total_segments_failed, total_segments_429,
                            total_cache_hits, batches_completed
                        ) VALUES ($1, 'online', $2, $3, $4, now(), now(), 0, 0, 0, 0, 0, 0)
                        ON CONFLICT (worker_id) DO UPDATE SET
                            status = 'online', provider = $2, gpu_type = $3,
                            config_json = $4, started_at = now(), last_heartbeat_at = now(),
                            total_segments_sent = 0, total_segments_completed = 0,
                            total_segments_failed = 0, total_segments_429 = 0,
                            total_cache_hits = 0, batches_completed = 0
                    zWorker z registered via direct PGr   r   r   r   z Register worker failed (attempt z/5): r   r   z.0fr   z:Register worker failed after 5 attempts, continuing anyway)r   r   r   r   r   r   r   r   r   r   r   r   r   rW   )	rA   rJ   rZ   r[   r\   r   r   r   r   r   r   r   rc     s"   (0zPostgresDB.register_workerrd   r'   c                   s   zO| j  4 I d H 0}|d||j|j|j|j|j|j|j	|j
|j|j|j|j|j|jI d H  W d   I d H  n1 I d H sAw   Y  td  d7  < W d S  tyy } ztd  d7  < tdt|d d   W Y d }~d S d }~ww )Na  
                    UPDATE workers SET
                        total_segments_sent = $2, total_segments_completed = $3,
                        total_segments_failed = $4, total_segments_429 = $5,
                        total_cache_hits = $6, batches_completed = $7,
                        avg_batch_latency_ms = $8, current_video_id = $9,
                        segments_remaining = $10, active_rpm = $11, active_tpm = $12,
                        total_input_tokens = $13, total_output_tokens = $14,
                        total_cached_tokens = $15, last_heartbeat_at = now()
                    WHERE worker_id = $1
                r   r   r   z%Heartbeat update failed (non-fatal): r   )r   r   r   r(   r)   r*   r+   r,   r-   r/   r0   r1   r2   r3   r4   r5   r6   r   r   r   r   r   )rA   rJ   rd   r   r   r   r   r   rl   )  s(   (*zPostgresDB.update_heartbeatc              
     s^   z|  d|I d H  W d S  ty. } ztdt|d d   W Y d }~d S d }~ww )NzUUPDATE workers SET status = 'offline', last_heartbeat_at = now() WHERE worker_id = $1zset_worker_offline failed: r   r   r   r   r   r   )rA   rJ   r   r   r   r   ro   E  s   *zPostgresDB.set_worker_offlinec              
     sh   z|  d||d d I d H  W d S  ty3 } ztdt|d d   W Y d }~d S d }~ww )NzdUPDATE workers SET status = 'error', last_error = $2, last_heartbeat_at = now() WHERE worker_id = $1r   zset_worker_error failed: r   r   )rA   rJ   rW   r   r   r   r   rq   N  s   *zPostgresDB.set_worker_errorr>   rr   c                       |sd S |  d|I d H  d S )Ntranscription_results_batch_insertru   r   r   r   rv   W     zPostgresDB.insert_resultsr?   c                   r   )Ntranscription_flagsr   rx   r   r   r   ry   \  r   zPostgresDB.insert_flags2   tablerows
chunk_sizer   c                   s  |sdS t |d   ddd tt D }d }d| d| d| d	}tdt||D ]}||||  } fd
d|D }	tdD ]}
z=| j 4 I dH }|||	I dH  W d  I dH  n1 I dH sqw   Y  td  d7  < td  t|7  < W  nt t	y } zhtd  d7  < |
dk rd|
 t
dd }td| d||  d|
d  dt|dd  d|dd t|I dH  n#td  d7  < td| d||  dt| dt|dd   W Y d}~qKd}~ww q4dS ) uW   Insert rows in chunks via executemany — far more efficient than REST chunked inserts.Nr   z, c                 s  s    | ]
}d |d  V  qdS )$r   Nr   ).0ir   r   r   	<genexpr>g  s    z+PostgresDB._batch_insert.<locals>.<genexpr>zINSERT INTO z (z
) VALUES (z) ON CONFLICT DO NOTHINGc                   s"   g | ] t  fd dD qS )c                 3  s    | ]}  |V  qd S r;   )get)r   colr   r   r   r   m  s    z6PostgresDB._batch_insert.<locals>.<listcomp>.<genexpr>)tuple)r   columnsr   r   
<listcomp>m  s   " z,PostgresDB._batch_insert.<locals>.<listcomp>   r   r   r   r   r   r   r   r   zInsert z chunk z failed (attempt z/4): r   r   r   r   r   z# failed after 4 attempts, skipping z rows: )r9   keysjoinr   lenr   r   executemanyr   r   r   r   r   r   r   r   r   rW   )rA   r   r   r   placeholders	col_namessqlr   chunkvaluesr   r   r   r   r   r   r   r   a  sX   
(zPostgresDB._batch_insertr   c              	     s\   | j  4 I d H }|j|g|R  I d H W  d   I d H  S 1 I d H s'w   Y  d S r;   )r   r   r   )rA   r   argsr   r   r   r   r     s   0zPostgresDB._execN)r   r   )r   r   rz   r{   r|   r}   r~   r   r   r   r   )r   )r   r   r   rr   r   r   )r   r   )r#   r$   r%   r   rB   rG   rI   r   rO   rS   rV   rY   rc   rl   ro   rq   rv   ry   r   r   r   r   r   r   r      s$    



&





	
	
%r   rK   MockDB | PostgresDBc                 C  s$   | j rt S | jstdt| jS )Nu`   DATABASE_URL required — direct PostgreSQL replaces Supabase REST to avoid PostgREST bottleneck)	mock_moder:   r   RuntimeErrorr   )configr   r   r   get_db  s
   
r   )rK   r   )r   
__future__r   r   r   loggingr   timeuuiddataclassesr   r   r   r   pathlibr   typingr   r	   	getLoggerr#   r   r   r   r   r'   r:   r   r   r   r   r   r   <module>   s6    

P x