o
    ~	i($                     @  s   d Z ddlmZ ddlZddlZddlmZmZ ddlmZ ddl	Z	ddl
mZ e  eeZdZg dZd	d
 ZG dd dZdS )u   Supabase orchestrator for SFT shard encoding pipeline.

Table: sft_encoding_shards — one row per audio shard to process.
Uses the same atomic FOR UPDATE SKIP LOCKED claim pattern as the
pretraining pipeline to allow hundreds of GPU workers.
    )annotationsN)datetimetimezone)Any)load_dotenvsft_encoding_shards)PENDINGCLAIMED
PROCESSINGDONEFAILEDc                  C  s"   t dd} | stdt| S )NDATABASE_URL zDATABASE_URL not set)osgetenvRuntimeErrorpsycopg2connect)db_url r   =/home/ubuntu/bench-codecs/codecbench/pipeline/sft_supabase.py_get_pg_conn   s   
r   c                   @  s   e Zd ZdZdd Zd2d3ddZd2d4ddZd5ddZd5ddZ	d6d7ddZ	d8d9d d!Z
d:d;d&d'Z		(d<d=d-d.Zd>d0d1ZdS )?SFTOrchestratorzCManages sft_encoding_shards table for distributed shard processing.c                 C  s   d S Nr   selfr   r   r   __init__'   s   zSFTOrchestrator.__init__r   sqlstrparamstuplereturnNonec                 C  sh   t  }z+d|_| }||| W d    n1 sw   Y  W |  d S W |  d S |  w NT)r   
autocommitcursorexecutecloser   r   r   conncurr   r   r   	_exec_sql*   s   
zSFTOrchestrator._exec_sql
list[dict]c                   s   t  }z9| &}||| dd |jD   fdd| D W  d    W |  S 1 s1w   Y  W |  d S |  w )Nc                 S     g | ]}|d  qS r   r   .0dr   r   r   
<listcomp>8       z*SFTOrchestrator._query.<locals>.<listcomp>c                   s   g | ]	}t t |qS r   )dictzip)r0   rowcolsr   r   r2   9   s    )r   r%   r&   descriptionfetchallr'   r(   r   r7   r   _query3   s   

zSFTOrchestrator._queryc                 C  sF   |  dt dt dt dt dt dt dt d td	t d S )
Nz$
        CREATE TABLE IF NOT EXISTS az   (
            shard_key text PRIMARY KEY,
            dataset text NOT NULL,
            language text NOT NULL,
            segment_count integer,
            status text DEFAULT 'PENDING',
            claimed_by text,
            claimed_at timestamptz,
            segments_encoded integer,
            segments_failed integer,
            total_audio_s real,
            total_encode_ms real,
            output_r2_key text,
            error_detail text,
            started_at timestamptz,
            finished_at timestamptz,
            updated_at timestamptz DEFAULT now()
        );

        CREATE INDEX IF NOT EXISTS idx_z_status ON z2 (status);
        CREATE INDEX IF NOT EXISTS idx_z_dataset ON z3 (dataset);
        CREATE INDEX IF NOT EXISTS idx_z_language ON z (language);
        zSFT table %s verified/createdr+   TABLEloggerinfor   r   r   r   ensure_tables=   s    
zSFTOrchestrator.ensure_tablesc                 C  sD   |  dt dt dt dt dt dt dt d td	 d
S )z*Atomic claim using FOR UPDATE SKIP LOCKED.z
        CREATE OR REPLACE FUNCTION claim_next_sft_shard(
            p_worker_id text,
            p_dataset text DEFAULT NULL,
            p_language text DEFAULT NULL
        )
        RETURNS SETOF z) AS $$
        DECLARE
            v_row z~;
        BEGIN
            IF p_dataset IS NOT NULL AND p_language IS NOT NULL THEN
                SELECT * INTO v_row FROM z
                WHERE status = 'PENDING' AND dataset = p_dataset AND language = p_language
                LIMIT 1 FOR UPDATE SKIP LOCKED;
            ELSIF p_dataset IS NOT NULL THEN
                SELECT * INTO v_row FROM z
                WHERE status = 'PENDING' AND dataset = p_dataset
                LIMIT 1 FOR UPDATE SKIP LOCKED;
            ELSIF p_language IS NOT NULL THEN
                SELECT * INTO v_row FROM z
                WHERE status = 'PENDING' AND language = p_language
                LIMIT 1 FOR UPDATE SKIP LOCKED;
            ELSE
                SELECT * INTO v_row FROM z
                WHERE status = 'PENDING'
                LIMIT 1 FOR UPDATE SKIP LOCKED;
            END IF;

            IF v_row IS NULL THEN RETURN; END IF;

            UPDATE ao   SET
                status = 'CLAIMED',
                claimed_by = p_worker_id,
                claimed_at = now(),
                updated_at = now()
            WHERE shard_key = v_row.shard_key;

            v_row.status := 'CLAIMED';
            v_row.claimed_by := p_worker_id;
            RETURN NEXT v_row;
        END;
        $$ LANGUAGE plpgsql;
        z Created claim_next_sft_shard RPCNr<   r   r   r   r   create_claim_rpcX   s    
+z SFTOrchestrator.create_claim_rpcN	worker_iddataset
str | Nonelanguagedict | Nonec           
   
   C  s&  t  }zz`d|_| O}|d|||f dd |jD }| }|  |rLtt||}t	
d|d |d |d  |W  d	   W W |  S 	 W d	   W W |  d	S 1 s^w   Y  W n" ty }	 z|  t	d
|	 W Y d	}	~	W |  d	S d	}	~	ww W |  d	S |  w )z(Atomically claim the next PENDING shard.Fz.SELECT * FROM claim_next_sft_shard(%s, %s, %s)c                 S  r-   r.   r   r/   r   r   r   r2      r3   z/SFTOrchestrator.claim_shard.<locals>.<listcomp>zClaimed shard %s (%s/%s)	shard_keyrC   rE   Nzclaim_shard failed: %s)r   r$   r%   r&   r9   fetchonecommitr4   r5   r>   r?   r'   	Exceptionrollbackerror)
r   rB   rC   rE   r)   r*   r8   r6   sharder   r   r   claim_shard   s<   

zSFTOrchestrator.claim_shardrG   statusextradict[str, Any] | Nonec           	      C  s   t tj }ddg}||g}|dkr|d || n|dv r-|d || |rE| D ]\}}|| d || q3|| | dt d	d
	| dt
| d S )Nzstatus = %szupdated_at = %sr
   zstarted_at = %s)r   r   zfinished_at = %sz = %szUPDATE z SET , z WHERE shard_key = %s)r   nowr   utc	isoformatappenditemsr+   r=   joinr    )	r   rG   rP   rQ   rT   setsvalskvr   r   r   update_shard_status   s$   



z#SFTOrchestrator.update_shard_status  rows
batch_sizeintc                 C  s  t  }d}zud|_| `}tdt||D ]O}||||  }g }|D ]}	||d|	d |	d |	d |	df  q$dt	 d	d

| d}
||
 |t|7 }|d dkretd|t| qW d   n1 spw   Y  W |  n|  w td| |S )z:Bulk insert shard rows. Skip conflicts (already ingested).r   Tz(%s, %s, %s, %s, 'PENDING')rG   rC   rE   segment_countzINSERT INTO z> (shard_key, dataset, language, segment_count, status) VALUES ,z# ON CONFLICT (shard_key) DO NOTHINGi  zIngested %d / %d shardsNzIngested %d shards total)r   r$   r%   rangelenrW   mogrifygetdecoder=   rY   r&   r>   r?   r'   )r   r`   ra   r)   totalr*   ibatchargsrr   r   r   r   ingest_shards   s@   

zSFTOrchestrator.ingest_shardsTstatuseslist[str] | Noneclear_outputsboolc              	   C  s   |du rg d}g d}|r| g d t }z3d|_| }|dt dd| d	|f |jp6d
}W d   n1 sAw   Y  W |  n|  w t	
d|d| |S )zReset selected statuses back to PENDING for a clean rerun.

        Keeps static shard metadata (dataset/language/segment_count) but clears
        worker/runtime fields so workers can reclaim from scratch.
        N)r	   r
   r   r   )zstatus = 'PENDING'zclaimed_by = NULLzclaimed_at = NULLzstarted_at = NULLzfinished_at = NULLzupdated_at = now()zerror_detail = NULL)zsegments_encoded = NULLzsegments_failed = NULLztotal_audio_s = NULLztotal_encode_ms = NULLzoutput_r2_key = NULLTz
                    UPDATE z
                    SET rS   z@
                    WHERE status = ANY(%s)
                    r   z-Reset %d shards back to PENDING (statuses=%s)rd   )extendr   r$   r%   r&   r=   rY   rowcountr'   r>   r?   )r   rp   rr   rZ   r)   r*   countr   r   r   reset_pending   s.   
	
zSFTOrchestrator.reset_pendingdict[str, Any]c                 C  s   |  dt d}|S )Nz
            SELECT status, dataset, count(*) as cnt,
                   coalesce(sum(total_audio_s), 0) as audio_s
            FROM zS
            GROUP BY status, dataset
            ORDER BY dataset, status
        )r;   r=   )r   r`   r   r   r   	get_stats  s   
zSFTOrchestrator.get_stats)r   )r   r   r   r    r!   r"   )r   r   r   r    r!   r,   )r!   r"   )NN)rB   r   rC   rD   rE   rD   r!   rF   r   )rG   r   rP   r   rQ   rR   r!   r"   )r_   )r`   r,   ra   rb   r!   rb   r#   )rp   rq   rr   rs   r!   rb   )r!   rx   )__name__
__module____qualname____doc__r   r+   r;   r@   rA   rO   r^   ro   rw   ry   r   r   r   r   r   $   s    	


02r   )r}   
__future__r   loggingr   r   r   typingr   r   dotenvr   	getLoggerrz   r>   r=   SHARD_STATESr   r   r   r   r   r   <module>   s    
