o
    lQi;                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	 ddl
mZmZ eeZeG dd dZeG dd	 d	ZG d
d dZdS )z
Database layer for transcript-variant shard workers.

Uses direct PostgreSQL via asyncpg, mirroring the main transcription pipeline's
claim/heartbeat style but with dedicated tables.
    )annotationsN)	dataclassfield)AnyOptionalc                   @  sX   e Zd ZU ded< ded< ded< ded< ded< ded< ded	< eed
Zded< dS )
VariantJobstrshard_idinput_bucketinput_r2_keyinput_formatoutput_bucketoutput_prefixint
total_rows)default_factorydict[str, Any]metadata_jsonN)__name__
__module____qualname____annotations__r   dictr    r   r   src/variant_db.pyr      s   
 r   c                   @  s   e Zd ZU dZded< dZded< dZded< dZded< dZded< dZ	ded< dZ
ded	< dZded
< dZded< dZded< dZded< dZded< dZded< dZded< dZded< dZded< dZded< dS )VariantWorkerStatsr   r   jobs_claimedjobs_completedjobs_failedrows_processedrows_skippedrows_geminirequests_sentrequests_succeededrequests_failed
cache_hitspacks_uploadedNzOptional[str]current_shard_idrows_remainingg        float
active_rpmtotal_input_tokenstotal_output_tokenstotal_cached_tokens)r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r*   r+   r,   r-   r   r   r   r   r       s$   
 r   c                   @  s   e Zd Zd:ddZdd Zdd Zd	d
 Zd;ddZd<ddZddd=ddZ	ddd=dd Z
d>d"d#Zd?d(d)Zd@d,d-ZdAd.d/ZdBd0d1ZdCd4d5ZdDd7d8Zd9S )EVariantPostgresDBdatabase_urlr   c                 C  s   || _ d | _d S N)_dsn_pool)selfr/   r   r   r   __init__6   s   
zVariantPostgresDB.__init__c              	     sT   dd l }dd }d| jv }|j| jddd|rdndd	|d
I d H | _td d S )Nr   c                   s>   | j dtjtjddI d H  | j dtjtjddI d H  d S )Njsonb
pg_catalog)encoderdecoderschemajson)set_type_codecr:   dumpsloads)connr   r   r   
_init_conn=   s   z-VariantPostgresDB.connect.<locals>._init_connz:6543         d   require)dsnmin_sizemax_sizecommand_timeoutstatement_cache_sizesslinitzvariant asyncpg pool ready)asyncpgr1   create_poolr2   loggerinfo)r3   rL   r?   	is_poolerr   r   r   connect:   s   

	zVariantPostgresDB.connectc                   s*   | j r| j  I d H  td d S d S )Nzvariant asyncpg pool closed)r2   closerN   rO   )r3   r   r   r   rR   Q   s
   zVariantPostgresDB.closec              	     sb   d}| j  4 I d H }||I d H  W d   I d H  n1 I d H s%w   Y  td d S )Na  
        CREATE TABLE IF NOT EXISTS transcript_variant_job_queue (
            shard_id TEXT PRIMARY KEY,
            status TEXT NOT NULL DEFAULT 'pending',
            input_bucket TEXT NOT NULL,
            input_r2_key TEXT NOT NULL,
            input_format TEXT NOT NULL DEFAULT 'parquet',
            output_bucket TEXT NOT NULL,
            output_prefix TEXT NOT NULL,
            total_rows INTEGER NOT NULL DEFAULT 0,
            rows_processed INTEGER NOT NULL DEFAULT 0,
            rows_skipped INTEGER NOT NULL DEFAULT 0,
            rows_gemini INTEGER NOT NULL DEFAULT 0,
            packs_uploaded INTEGER NOT NULL DEFAULT 0,
            last_pack_key TEXT,
            claimed_by TEXT,
            claimed_at TIMESTAMPTZ,
            completed_at TIMESTAMPTZ,
            error_message TEXT,
            attempt_count INTEGER NOT NULL DEFAULT 0,
            metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
            created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
        );

        CREATE INDEX IF NOT EXISTS idx_tvjq_claim
            ON transcript_variant_job_queue (status, shard_id);

        CREATE TABLE IF NOT EXISTS transcript_variant_workers (
            worker_id TEXT PRIMARY KEY,
            status TEXT NOT NULL DEFAULT 'online',
            provider TEXT NOT NULL,
            gpu_type TEXT NOT NULL DEFAULT 'unknown',
            config_json JSONB NOT NULL DEFAULT '{}'::jsonb,
            started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            jobs_claimed INTEGER NOT NULL DEFAULT 0,
            jobs_completed INTEGER NOT NULL DEFAULT 0,
            jobs_failed INTEGER NOT NULL DEFAULT 0,
            rows_processed BIGINT NOT NULL DEFAULT 0,
            rows_skipped BIGINT NOT NULL DEFAULT 0,
            rows_gemini BIGINT NOT NULL DEFAULT 0,
            requests_sent BIGINT NOT NULL DEFAULT 0,
            requests_succeeded BIGINT NOT NULL DEFAULT 0,
            requests_failed BIGINT NOT NULL DEFAULT 0,
            total_cache_hits BIGINT NOT NULL DEFAULT 0,
            packs_uploaded BIGINT NOT NULL DEFAULT 0,
            current_shard_id TEXT,
            rows_remaining BIGINT NOT NULL DEFAULT 0,
            active_rpm DOUBLE PRECISION NOT NULL DEFAULT 0,
            total_input_tokens BIGINT NOT NULL DEFAULT 0,
            total_output_tokens BIGINT NOT NULL DEFAULT 0,
            total_cached_tokens BIGINT NOT NULL DEFAULT 0,
            last_error TEXT
        );

        CREATE TABLE IF NOT EXISTS transcript_variant_pack_manifests (
            pack_id TEXT PRIMARY KEY,
            shard_id TEXT NOT NULL,
            worker_id TEXT NOT NULL,
            output_bucket TEXT NOT NULL,
            output_key TEXT NOT NULL,
            row_count INTEGER NOT NULL DEFAULT 0,
            gemini_row_count INTEGER NOT NULL DEFAULT 0,
            skipped_row_count INTEGER NOT NULL DEFAULT 0,
            distinct_video_count INTEGER NOT NULL DEFAULT 0,
            byte_size BIGINT NOT NULL DEFAULT 0,
            metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
            created_at TIMESTAMPTZ NOT NULL DEFAULT now()
        );

        CREATE INDEX IF NOT EXISTS idx_tvpm_shard
            ON transcript_variant_pack_manifests (shard_id, created_at);
        ztranscript variant schema ready)r2   acquireexecuterN   rO   )r3   ddlr>   r   r   r   init_schemaV   s   J(zVariantPostgresDB.init_schemajobslist[dict[str, Any]]c              	     s   |sd S t |d   ddd tt D }d }ddd  D }d| d| d| d	} fd
d|D }| j 4 I d H }|||I d H  W d   I d H  n1 I d H sbw   Y  t	dt| d S )Nr   , c                 s      | ]
}d |d  V  qdS $r@   Nr   .0ir   r   r   	<genexpr>       z.VariantPostgresDB.seed_jobs.<locals>.<genexpr>c                 s  s&    | ]}|d kr| d| V  qdS )r	   z
=EXCLUDED.Nr   r^   colr   r   r   r`      s    z*INSERT INTO transcript_variant_job_queue (
) VALUES (z') ON CONFLICT (shard_id) DO UPDATE SET z, updated_at = now()c                   s"   g | ] t  fd dD qS )c                 3  s    | ]}  |V  qd S r0   getrb   jobr   r   r`      s    z9VariantPostgresDB.seed_jobs.<locals>.<listcomp>.<genexpr>)tuple)r^   columnsrg   r   
<listcomp>   s   " z/VariantPostgresDB.seed_jobs.<locals>.<listcomp>z!seeded %s transcript variant jobs)
listkeysjoinrangelenr2   rS   executemanyrN   rO   )r3   rW   placeholders	col_namesupdate_colssqlvaluesr>   r   rj   r   	seed_jobs   s$   

(zVariantPostgresDB.seed_jobs	worker_idreturnOptional[VariantJob]c                   sP  t dD ]}zQ| j 4 I d H }|d|I d H }W d   I d H  n1 I d H s+w   Y  |rUt|d |d |d |d |d |d |d	 pJd
|d pOi dW   S W  d S  ty } zA|dk rd| tdd }t	d|d t
|d d | t|I d H  ntdt
|d d  W Y d }~ d S W Y d }~qd }~ww d S )N   ai  
                        UPDATE transcript_variant_job_queue
                        SET status = 'claimed',
                            claimed_by = $1,
                            claimed_at = now(),
                            attempt_count = attempt_count + 1,
                            updated_at = now()
                        WHERE shard_id = (
                            SELECT shard_id
                            FROM transcript_variant_job_queue
                            WHERE status = 'pending'
                            ORDER BY shard_id
                            LIMIT 1
                            FOR UPDATE SKIP LOCKED
                        )
                        RETURNING shard_id, input_bucket, input_r2_key, input_format,
                                  output_bucket, output_prefix, total_rows, metadata_json
                        r	   r
   r   r   r   r   r   r   r   )r	   r
   r   r   r   r   r   r      g      ?g      ?z5variant claim failed (attempt %s): %s, retry in %.1fsr@   x   z&variant claim failed after retries: %s)rp   r2   rS   fetchrowr   	ExceptionrandomuniformrN   warningr   asynciosleeperror)r3   ry   attemptr>   rowewaitr   r   r   	claim_job   sJ   (


zVariantPostgresDB.claim_job )last_pack_keyr	   r   r   r    r!   r&   r   c             	     "   |  d||||||I d H  d S )Naa  
            UPDATE transcript_variant_job_queue
            SET rows_processed = $2,
                rows_skipped = $3,
                rows_gemini = $4,
                packs_uploaded = $5,
                last_pack_key = CASE WHEN $6 = '' THEN last_pack_key ELSE $6 END,
                updated_at = now()
            WHERE shard_id = $1
            _execr3   r	   r   r    r!   r&   r   r   r   r   update_job_progress   s   

z%VariantPostgresDB.update_job_progressc             	     r   )Na  
            UPDATE transcript_variant_job_queue
            SET status = 'done',
                rows_processed = $2,
                rows_skipped = $3,
                rows_gemini = $4,
                packs_uploaded = $5,
                last_pack_key = CASE WHEN $6 = '' THEN last_pack_key ELSE $6 END,
                completed_at = now(),
                updated_at = now()
            WHERE shard_id = $1
            r   r   r   r   r   complete_job  s   
zVariantPostgresDB.complete_joberror_messagec                   "   |  d||d d I d H  d S )Nz
            UPDATE transcript_variant_job_queue
            SET status = 'failed',
                error_message = $2,
                updated_at = now()
            WHERE shard_id = $1
              r   )r3   r	   r   r   r   r   fail_job&     
zVariantPostgresDB.fail_jobprovidergpu_typeconfig_jsonr   c                   s   |  d||||I d H  d S )Naw  
            INSERT INTO transcript_variant_workers (
                worker_id, status, provider, gpu_type, config_json, started_at, last_heartbeat_at
            ) VALUES ($1, 'online', $2, $3, $4, now(), now())
            ON CONFLICT (worker_id) DO UPDATE SET
                status = 'online',
                provider = $2,
                gpu_type = $3,
                config_json = $4,
                started_at = now(),
                last_heartbeat_at = now(),
                jobs_claimed = 0,
                jobs_completed = 0,
                jobs_failed = 0,
                rows_processed = 0,
                rows_skipped = 0,
                rows_gemini = 0,
                requests_sent = 0,
                requests_succeeded = 0,
                requests_failed = 0,
                total_cache_hits = 0,
                packs_uploaded = 0,
                current_shard_id = NULL,
                rows_remaining = 0,
                active_rpm = 0,
                total_input_tokens = 0,
                total_output_tokens = 0,
                total_cached_tokens = 0,
                last_error = NULL
            r   )r3   ry   r   r   r   r   r   r   register_worker3  s   z!VariantPostgresDB.register_workerstatsr   c                   s\   |  d||j|j|j|j|j|j|j|j|j	|j
|j|j|j|j|j|j|jI d H  d S )Na  
            UPDATE transcript_variant_workers
            SET jobs_claimed = $2,
                jobs_completed = $3,
                jobs_failed = $4,
                rows_processed = $5,
                rows_skipped = $6,
                rows_gemini = $7,
                requests_sent = $8,
                requests_succeeded = $9,
                requests_failed = $10,
                total_cache_hits = $11,
                packs_uploaded = $12,
                current_shard_id = $13,
                rows_remaining = $14,
                active_rpm = $15,
                total_input_tokens = $16,
                total_output_tokens = $17,
                total_cached_tokens = $18,
                last_heartbeat_at = now()
            WHERE worker_id = $1
            )r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r*   r+   r,   r-   )r3   ry   r   r   r   r   update_heartbeatY  s,   z"VariantPostgresDB.update_heartbeatc                   s   |  d|I d H  d S )Nz
            UPDATE transcript_variant_workers
            SET status = 'offline', last_heartbeat_at = now()
            WHERE worker_id = $1
            r   )r3   ry   r   r   r   set_worker_offline  s
   z$VariantPostgresDB.set_worker_offlinec                   r   )Nz
            UPDATE transcript_variant_workers
            SET status = 'error',
                last_error = $2,
                last_heartbeat_at = now()
            WHERE worker_id = $1
            r   r   )r3   ry   r   r   r   r   set_worker_error  r   z"VariantPostgresDB.set_worker_errormanifestr   c                   sp   t   }ddd tt|D }d|}d| d| d}| j|g fdd|D R  I d H  d S )	NrY   c                 s  rZ   r[   r   r]   r   r   r   r`     ra   z9VariantPostgresDB.insert_pack_manifest.<locals>.<genexpr>z/INSERT INTO transcript_variant_pack_manifests (rd   z") ON CONFLICT (pack_id) DO NOTHINGc                   s   g | ]}  |qS r   re   rb   r   r   r   rl     s    z:VariantPostgresDB.insert_pack_manifest.<locals>.<listcomp>)rm   rn   ro   rp   rq   r   )r3   r   rk   rs   rt   rv   r   r   r   insert_pack_manifest  s   
*z&VariantPostgresDB.insert_pack_manifestrv   c              	     s\   | j  4 I d H }|j|g|R  I d H W  d   I d H  S 1 I d H s'w   Y  d S r0   )r2   rS   rT   )r3   rv   argsr>   r   r   r   r     s   0zVariantPostgresDB._execN)r/   r   )rW   rX   )ry   r   rz   r{   )r	   r   r   r   r    r   r!   r   r&   r   r   r   )r	   r   r   r   )ry   r   r   r   r   r   r   r   )ry   r   r   r   )ry   r   )ry   r   r   r   )r   r   )rv   r   )r   r   r   r4   rQ   rR   rV   rx   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r.   5   s$    

O
;%


&
,



r.   )__doc__
__future__r   r   r:   loggingr   dataclassesr   r   typingr   r   	getLoggerr   rN   r   r   r.   r   r   r   r   <module>   s    
