o
    i.r                     @  sJ  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZmZ ddlmZmZ ddlmZ ddlZddlZddlmZ ddlZddlZddlmZ ddlm Z m!Z! dd	l"m#Z# dd
l$m%Z% e&e'Z(dZ)dZ*eG dd dZ+eG dd dZ,d'ddZ-d(d d!Z.d)d#d$Z/G d%d& d&Z0dS )*uH  SFT shard worker: processes pre-segmented audio shards into XCodec2 tokens.

SFT data is already segmented FLAC in tar shards. No VAD needed. Flow:

  1. Claim shard from Supabase (atomic, FOR UPDATE SKIP LOCKED)
  2. Download audio.tar from R2 (multi-threaded range GET)
  3. Extract FLACs in memory → decode to 16kHz waveforms
  4. XCodec2 encode (same 6s overlap chunking + center-cut stitch as pretraining)
  5. Upload xcodec2_tokens.parquet back to the same R2 shard folder
  6. Mark shard DONE in Supabase

Uses the custom 198k-step fine-tuned XCodec2 checkpoint (downloaded from R2
xcodec bucket on first run), NOT the base HuggingFace model.

Resilience features:
  - Prefetch: next shard downloads in background while current one encodes.
  - Checkpointing: every CKPT_INTERVAL segments, flush to local parquet.
    On OOM/crash/restart, resume from last checkpoint — only re-encode
    the segments after the checkpoint, not the whole shard.
  - Self-recovery: OOM triggers CUDA cache clear + resume from checkpoint.
    Transient errors retry with backoff, not full shard restart.
    )annotationsN)ThreadPoolExecutorFuture)	dataclassfield)Path)PipelineConfig)
HotEncoderEncodedSegment)SegmentSFTOrchestrator2   i  c                   @  sR   e Zd ZU dZded< ded< ded< ded	< d
ed< d
ed< ded< ded< dS )ShardResultzEncoding results for one shard.str	shard_key	list[str]segment_idslist[np.ndarray]xcodec2_tokens	list[int]token_countsfloattotal_audio_stotal_encode_msintsegments_encodedsegments_failedN)__name__
__module____qualname____doc____annotations__ r#   r#   ;/home/ubuntu/bench-codecs/codecbench/pipeline/sft_worker.pyr   8   s   
 r   c                   @  sN   e Zd ZU dZded< ded< ded< dZd	ed
< dZded< dZded< dS )PreparedShardz?A shard whose tar has been downloaded and may later be decoded.dict	shard_rowr   r   bytes	tar_bytesNz list[tuple[str, Segment]] | Nonedecoded        r   
download_sdecode_s)r   r   r    r!   r"   r*   r,   r-   r#   r#   r#   r$   r%   E   s   
 r%   tmp_dirr   r   returnr   c                 C  s.   t |  dd }t| d| d S )z0Deterministic local checkpoint path for a shard.N   	sft_ckpt_z.parquet)hashlibmd5encode	hexdigestr   )r.   r   hr#   r#   r$   
_ckpt_pathR   s   r7   pathr   r   tokensr   r   r   Nonec                 C  s   dd |D }t t j|t  dt j|t  dt j|t  dd}| jjddd tj	|| dd t
d	t|| j|  jd
  d S )Nc                 S     g | ]}|  qS r#   tobytes.0tr#   r#   r$   
<listcomp>^       z$_save_checkpoint.<locals>.<listcomp>type
segment_idr   token_countTparentsexist_okzstdcompressionu.   Checkpoint saved: %d segments → %s (%.1f MB)    .A)patablearraystringbinaryint32parentmkdirpqwrite_tableloggerinfolennamestatst_size)r8   r   r9   r   token_bytesrP   r#   r#   r$   _save_checkpointX   s   r`   7tuple[set[str], list[str], list[np.ndarray], list[int]]c                 C  sf   t | }|d }|d }|d }dd |D }tdt|| j t||||fS )zELoad checkpoint. Returns (done_ids_set, segment_ids, tokens, counts).rF   rG   r   c                 S  s    g | ]}t j|t jd  qS ))dtype)np
frombufferuint16copy)r?   br#   r#   r$   rA   p        z$_load_checkpoint.<locals>.<listcomp>z&Checkpoint loaded: %d segments from %s)	rW   
read_tablecolumn	to_pylistrY   rZ   r[   r\   set)r8   rP   seg_idscounts	raw_bytesr9   r#   r#   r$   _load_checkpointj   s   
rp   c                   @  s~   e Zd ZdZd5ddZd6dd	Zd7ddZ	d8d9ddZd:ddZd;ddZ	d<d d!Z
d=d$d%Z	&	'd>d?d/d0Zd@d3d4Zd&S )A	SFTWorkeruD   Processes SFT audio shards: R2 tar → XCodec2 tokens → R2 upload.cfgr   c                 C  s   || _ t|j| _d | _d | _d| _d| _d| _d| _	d| _
d| _|jjsMd}tj r6tjddd}||j_d|jj d| dt  |j_d S d S )	N   Fr   r+   unknown _sft_)rr   r	   codecencoderorch_s3_dl_workers_running_total_shards_total_audio_s_total_segments_consecutive_errorsworker	worker_idtorchcudais_availableget_device_namereplacegpu_nameoffer_idosgetpid)selfrr   r   r#   r#   r$   __init__z   s$   

zSFTWorker.__init__r/   r:   c                 C  s  ddl }ddlm} ddlm} |  td td| jjj	 t
 p&d}t|d d d| _|jd	t
jd
 t
jd t
jd d|| jd ddddd| _t | _| j  | j  | jjjr| jjjst| jjj}|jddd |d }| std| jjj| jjj | j| jjj| jjjt| td|  j!d  ntd| t|| jj_tdt"| jjj | j#$  t%j&' d }td||| j dS )z>One-time setup: DB, custom checkpoint download, model loading.r   N)Config)load_dotenvz=== SFT Worker Setup ===zWorker ID: %srs      s3R2_ENDPOINT_URLR2_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYauto   adaptive)max_attemptsmode)max_pool_connectionsretries)endpoint_urlaws_access_key_idaws_secret_access_keyregion_nameconfigTrH   zxcodec2_custom.ckptz,Downloading custom XCodec2 checkpoint: %s/%szCustom checkpoint: %.1f MBrN   zCustom checkpoint cached: %sz#Loading XCodec2 (custom_ckpt=%s)...zB=== Setup Complete. VRAM: %.0f MB | vCPUs: %d | dl_workers: %d ===)(boto3botocore.configr   dotenvr   rY   rZ   rr   r   r   r   	cpu_countmaxr|   clientenvironr{   r   rz   ensure_tablescreate_claim_rpcr2xcodec_ckpt_keyrx   xcodec2_custom_ckptr   local_tmp_dirrV   existsxcodec_bucketdownload_filer   r]   r^   boolry   loadr   r   memory_allocated)r   r   
BotoConfigr   cpusr.   
ckpt_localvramr#   r#   r$   setup   s\   



zSFTWorker.setupr   r   r(   c           
        s  t jdd | dt }jj d}|d ddkra d  }d	g| d fdd}tjd}t	|
|t| W d	   n1 sVw   Y  d}njj d}|d  }t | }	tdt|d |	t||	 d j |S )zEDownload audio.tar with parallel range GETs scaled to instance vCPUs.R2_BUCKET_DESTINATIONfinalsftdataz	audio.tar)BucketKeyContentLengthi   i      Nidxr   c                   sP   |  }t | d d }jj d| d| d}|d  | < d S )Nr   zbytes=-)r   r   RangeBody)minr{   
get_objectread)r   startendrespbucket
chunk_sizepartsr   tar_key
total_sizer#   r$   _dl_part   s   z)SFTWorker._download_tar.<locals>._dl_part)max_workers    r   z7Downloaded %s: %.1f MB in %.1fs (%.0f MB/s, %d workers)rN   )r   r   )r   r   gettimeperf_counterr{   head_objectr   r|   listmaprangejoinr   r   rY   rZ   r[   )
r   r   t0headn_partsr   poolbodyr   elapsedr#   r   r$   _download_tar   s.   

	zSFTWorker._download_tar>  r)   	target_srr   list[tuple[str, Segment]]c                 C  s  t  }i }tjt|dd'}| D ]}| r/|j	dr/|
|}|r/| ||j< qW d   n1 s:w   Y  t  | }t  }	g }
t| D ][}t|j}z;tt|| \}}||krqtj|||}|jd dkr|jddd}|jd	 | }|
|td
||df W qQ ty } ztd|| W Y d}~qQd}~ww t  |	 }tdt||t|
| |
S )z;Extract FLACs from tar, decode to sorted waveform Segments.r)fileobjr   z.flacNr   r   T)dimkeepdimr+   )start_send_saudiozFailed to decode FLAC %s: %sz0Extracted %d FLACs in %.1fs, decoded %d in %.1fs)r   r   tarfileopenioBytesIO
getmembersisfiler\   endswithextractfiler   sortedkeysr   stem
torchaudior   
functionalresampleshapemeanappendr   	ExceptionrY   warningrZ   r[   )r   r)   r   r   flac_maptarmemberf	extract_st1resultsr\   seg_idwavsr
duration_ser-   r#   r#   r$   _extract_and_decode   sH   

zSFTWorker._extract_and_decoder'   r&   PreparedShard | Nonec              
   C  sn   |d }zt  }| |}t  | }t||||dW S  ty6 } ztd|| W Y d}~dS d}~ww )zDownload only. Keep decode off the prefetch thread.

        Decoding the next shard concurrently with GPU encode cut throughput
        roughly in half in local isolation tests. We only overlap download.
        r   )r'   r   r)   r,   zFailed to prepare shard %s: %sN)r   r   r   r%   r   rY   error)r   r'   r   r   r)   r,   r  r#   r#   r$   _prepare_shard  s   
zSFTWorker._prepare_shardpreparedr%   c              
   C  s   z*t  }| |j| jjj}t  | |_d|_||_|s(t	
d|j W dS |W S  tyE } zt	d|j| W Y d}~dS d}~ww )zCDecode the current shard on the main thread after download overlap.r   zNo segments decoded from %sNzFailed to decode shard %s: %s)r   r   r  r)   rr   rx   r   r-   r*   rY   r   r   r   r  )r   r  r   r*   r  r#   r#   r$   _decode_prepared+  s   zSFTWorker._decode_preparedShardResult | Nonec                   s  t  }| jjj}t||j}t  g }g }g }d}d}	| r?t	|\ }}}|D ]
}
|t
|
t 7 }q*td|jt
  |jdusHJ d fdd|jD }d}d}d}td|jt
|t
  d	}d}|t
|k r||||  }d
d |D }dd |D }z| jj|| jjjd}W n tjjtfy } ztdt| vr tdt
|t
|t
  t
| |rt|||| tj  t  t d z
| jj|dd}W n1 ty } z$t d| |t
|7 }tj  t  ||7 }W Y d}~W Y d}~qid}~ww W Y d}~nd}~ww d}t!||D ]K\}}|t
|k rf|| }|d7 }|j"#d$ %t&j'}|(| |(| |(t
| ||j)7 }|	|j*7 }	t+||j)}|d7 }q |d7 }q |t,krzt|||| d}|| d dkrtj  ||7 }|t
|k sp~g |_t  | r|-  t  | }tj. d }tj/ d }td|jt
|t
|| |||t+|d |||
 |sdS t0|j|||||	t
||dS )u   Encode a prepared shard. Checkpoints every CKPT_INTERVAL segments.

        On OOM or transient error: saves checkpoint, clears CUDA, resumes
        from the checkpoint — only re-encodes segments AFTER the failure point.
        r+   z;Resuming shard %s from checkpoint: %d segments already doneNzCall _decode_prepared() firstc                   s    g | ]\}}| vr||fqS r#   r#   )r?   sidsegdone_idsr#   r$   rA   Y  rh   z.SFTWorker._encode_prepared.<locals>.<listcomp>r   z3Encoding %s: %d remaining (%d already checkpointed)   c                 S  s   g | ]\}}|qS r#   r#   )r?   r  rv   r#   r#   r$   rA   g  rB   c                 S  s   g | ]\}}|qS r#   r#   )r?   rv   r  r#   r#   r$   rA   h  rB   )xcodec_batch_size_overrideout of memoryzDOOM at segment %d/%d (batch=%d). Saving checkpoint, clearing CUDA...   r   zB=1 retry also failed: %srs   r   rN   zmEncoded shard %s: %d/%d segs, %.0fs audio, %.1fs wall, RTF=%.0fx, max_seg=%.1fs, VRAM=%.0f/%.0f MB (cur/peak)MbP?)r   r   r   r   r   r   r   r   )1r   r   rr   r   r   r7   r   rl   r   rp   r[   TOKENS_PER_SECrY   rZ   r*   ry   encode_segmentsrx   xcodec_batch_sizer   r   OutOfMemoryErrorRuntimeErrorr   lowerr   r`   empty_cachegccollectsleepr   r  zipr   squeezenumpyastyperc   re   r   r
  encode_time_msr   CKPT_INTERVALunlinkr   max_memory_allocatedr   )r   r  r   r.   	ckpt_filer   
all_tokensr   r   r   t_arr	remainingr   max_seg_durationsince_last_ckpt
batch_sizeibatchseg_ids_batchsegments_batchencodedr  inner_eenc_idxr  r  es	tokens_npr   vram_mbvram_peak_mbr#   r  r$   _encode_prepared=  s   












@


	zSFTWorker._encode_preparedresultr   c                 C  s   dd |j D }ttj|jt dtj|t dtj|jt dd}t	
 }tj||dd | }tjdd}|j d	}| jj|||d
d td|t|d t|j |S )z=Pack tokens into parquet and upload alongside original shard.c                 S  r;   r#   r<   r>   r#   r#   r$   rA     rB   z,SFTWorker._upload_tokens.<locals>.<listcomp>rC   rE   rK   rL   r   r   zxcodec2_tokens.parquetzapplication/octet-stream)r   r   r   ContentTypez!Uploaded %s: %.1f MB, %d segmentsrN   )r   rO   rP   rQ   r   rR   rS   r   rT   r   r   rW   rX   getvaluer   r   r   r   r{   
put_objectrY   rZ   r[   )r   rA  token_bytes_listrP   bufparquet_bytesr   
upload_keyr#   r#   r$   _upload_tokens  s*   zSFTWorker._upload_tokensNF
max_shards
int | Nonedataset
str | Nonelanguage	benchmarkr   c                   sR  d_ d}g }|rd}tj  fdd}ttj| ttj| td| | d}t	ddd	}	d) fdd}
|	
|
}j r |rU||krUtd| n|du r[n| }d}|du rltd n|pw|du pw|d |k }|rj r|	
|
}td|j z|}|du r jd7  _W qC|dur| rtd|j t }|}t | }|du r҈j|jdddi  jd7  _W qCt }|}t | }j|jd|j|jt|jdt|jd|d  jd7  _ j|j7  _ j|j7  _d_|d7 }|j|j|j|||j|j|jt |d d}|!| td||j|j|||j|j|d 	 |rd}d}|durd| s\td | }|du}||d< |r|durtd|j j|jddddd  W nzW n] t"y } zPt#|j$ d!| }t%d"|j|t&'  zj|jdd|dd# i W n
 t"y   Y nw  jd7  _d$t(|) v rtj*  t+,  W Y d}~nd}~ww jj-j.j/krt%d%j t0d& d_j sG|r| s|1  |	j2dd' |r|r3| td(|jj dS )*a  Main loop with prefetch: download shard N+1 while encoding shard N.

        benchmark=True: process exactly one shard, but also prefetch the next
        shard's download so we can confirm overlap is working without paying
        the cost of encoding a second shard.
        Tr   r   c                   s   t d|  d _d S )Nz$Received signal %s, shutting down...F)rY   rZ   r}   )sigframe)r   r#   r$   	_shutdown  s   
z SFTWorker.run.<locals>._shutdownzG=== SFT Worker Starting (max=%s, dataset=%s, lang=%s, benchmark=%s) ===Nprefetch)r   thread_name_prefixr/   r  c                    sL   j jjjj d} | d u rd S j | d ddjjji | S )N)rL  rN  r   
PROCESSING
claimed_by)rz   claim_shardrr   r   r   update_shard_statusr  )shardrL  rN  r   r#   r$   _claim_and_prepare  s   

z)SFTWorker.run.<locals>._claim_and_preparezReached max_shards=%d, stoppingz*No more shards or prepare failed, stoppingz8Prefetch: next shard download queued while processing %sz6Prefetch download completed before encode start for %sFAILEDerror_detailzEncode returned no resultsDONE)r   r   r   r   output_r2_keyr  )rY  r,   r-   encode_supload_saudio_ssegmentsrtfzVShard %d done: dl=%.1fs dec=%.1fs enc=%.1fs up=%.1fs | %.0fs audio, %d segs, RTF=%.0fxrd  Fz@Waiting for prefetched shard download to finish for benchmark...next_download_readyzPrefetch confirmed ready: %sPENDING)rV  
claimed_at
started_atz: zShard %s failed: %s
%si  r  z!Too many errors (%d), pausing 60s<   )waitz<=== SFT Worker Done. %d shards, %d segments, %.0fs audio ===)r/   r  )4r}   r   r   reset_peak_memory_statssignalSIGTERMSIGINTrY   rZ   r   submitrA  r   r  r   doner   r   r@  rz   rX  rI  r   r   roundr   r   r~   r   r   r,   r-   r   r   r   rD   r   r  	traceback
format_excr   r!  r"  r#  r$  rr   r   max_retriesr%  cancelshutdown_print_benchmark_report)r   rJ  rL  rN  rO  shards_done
timing_logrR  prefetch_futureprefetch_poolr[  r  
needs_moreshard_t0rA  r`  	upload_t0r2_keyra  shard_timingprefetched_ready
prefetchedr  	error_msgr#   rZ  r$   run  s   



















s

zSFTWorker.runry  
list[dict]c                 C  s  t j d }t j rt jdnd}t j r"t jdjd nd}t|dkr.|d n|d }t|	dd}|rI|d |d	  |d
  }n|d |d  |d	  |d
  }ddl
m} | }	|	 }
tdd |
D }tdd  td td  td|  td|dd|dd td| jjj  td| j dt  d tdt d t  t|D ]\}}|dkrdnd}|dkrdnd}t| d |d  d!| d" td#|d d$d% td&|d d$d% td'|d	 d$d(|d) dd* td+|d
 d$d% td,|d- d.d/|d- d0 d1d2 td3|d4   td5t|	dd  t  qtd6|d1d% |rMtd7 ntd8 t  || d0 }d9D ]}|| }td:|d;d<|d$d=|d> d1d?| d@	 q\td d dAS )Bz6Detailed benchmark report for fleet scaling estimates.rN   r   CPUr   r   re  Fr-   r`  ra  r,   r   c                 s  s$    | ]}|d  dkr|d V  qdS )statusrf  cntNr#   )r?   r   r#   r#   r$   	<genexpr>  s   " z4SFTWorker._print_benchmark_report.<locals>.<genexpr>
zF======================================================================z  SFT BENCHMARK REPORTz  GPU: z  VRAM: z.0fz / z MB (peak / total)z  Batch size: z  Download workers: z (of z vCPUs)z  Checkpoint interval: z	 segmentsz  z(benchmark shard)z(extra)zShard ru   :z    Download:  z>7.1fsz    Decode:    z    Encode:    zs (RTF=rd  zx)z    Upload:    z    Audio:     rb  z>7.0fzs (i  z.1fzh)z    Segments:  rc  z*    Next download ready by upload finish: z  Effective per shard: z?  Download is hidden by overlap; decode+encode+upload dominate.z<  Download was not fully hidden; full pipeline time is used.)r   
   r   d      z  ETA z>3z GPUs: zh (   z days) for z shardsN)r   r   r-  r   r   get_device_properties	total_memr[   r   r    codecbench.pipeline.sft_supabaser   	get_statssumprintrr   rx   r  r|   r   r   r+  	enumerate)r   ry  	vram_peakr   
vram_totalrepre  	effectiver   rz   statstotal_pendingr5  r@   pfxlabel
eta_1gpu_hneta_hr#   r#   r$   rw    sV     
"&
0z!SFTWorker._print_benchmark_report)rr   r   )r/   r:   )r   r   r/   r(   )r   )r)   r(   r   r   r/   r   )r'   r&   r/   r  )r  r%   r/   r  )r  r%   r/   r  )rA  r   r/   r   )NNNF)
rJ  rK  rL  rM  rN  rM  rO  r   r/   r:   )ry  r  r/   r:   )r   r   r    r!   r   r   r   r  r  r  r@  rI  r  rw  r#   r#   r#   r$   rq   w   s"    


<'
&

 
 +rq   )r.   r   r   r   r/   r   )
r8   r   r   r   r9   r   r   r   r/   r:   )r8   r   r/   ra   )1r!   
__future__r   r#  r2   r   loggingr   rl  r   r   rr  concurrent.futuresr   r   dataclassesr   r   pathlibr   r(  rc   pyarrowrO   pyarrow.parquetparquetrW   r   r   codecbench.pipeline.configr   codecbench.pipeline.encoderr	   r
   codecbench.pipeline.vadr   r  r   	getLoggerr   rY   r  r+  r   r%   r7   r`   rp   rq   r#   r#   r#   r$   <module>   sD    



