o
    liT                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ddlZd'd(d
dZd)ddZd)ddZd)ddZd)ddZd)ddZd)ddZd)ddZd)ddZd)ddZd)d d!Zd)d"d#Zd$d% Zed&krxe  dS dS )*u  CLI for the encoding pipeline.

Commands:
  ingest        — Upload video metadata from CSVs to Supabase
  run           — Start the pipeline worker (pretraining)
  setup-db      — Create Supabase tables + RPC functions only
  stats         — Show pipeline progress stats
  bench         — Profile encoding on this GPU for optimal config
  sft-snapshot  — Snapshot R2 SFT shard paths into Supabase
  sft-run       — Start SFT shard processing worker
  sft-stats     — Show SFT processing stats
    )annotationsN)PathFverboseboolreturnNonec                 C  s$   | rt jnt j}t j|ddd d S )Nz1%(asctime)s [%(levelname)s] %(name)s: %(message)sz%Y-%m-%d %H:%M:%S)levelformatdatefmt)loggingDEBUGINFObasicConfig)r   r    r   4/home/ubuntu/bench-codecs/codecbench/pipeline/cli.pysetup_logging   s   
r   argsargparse.Namespacec                 C  sV  ddl m} ddlm} | }||j}|  |  |  | j	D ]}t
|}| s5td| q$td| g }t|dB}t|}|D ]2}	|	dd}
|
d	krXd
nd}||	d |	ddt|	dd|	dd|	dd|
|dd qJW d   n1 sw   Y  tdt||j |j|| jd tdt||j q$dS )z.Ingest video metadata from CSVs into Supabase.r   PipelineConfigSupabaseOrchestratorzCSV not found: %szReading %s...rlanguageunknownenglish
pt-englishpt-indicvideo_idtitle duration_minclassificationchannelPENDINGr   r   r!   r"   r#   r   source_bucketstatusNzIngesting %d videos from %s
batch_sizez Done: %d videos ingested from %s)codecbench.pipeline.configr   #codecbench.pipeline.supabase_clientr   from_envsupabaseensure_tablescreate_claim_rpccreate_release_stale_rpc	csv_filesr   existsr   errorinfoopencsv
DictReadergetappendfloatlennameingest_videosr)   )r   r   r   cfgorchcsv_pathrowsfreaderrowlangbucketr   r   r   
cmd_ingest%   sF   






rG   c                 C  s  ddl m} ddlm} ddlm} ddl}| }||j}||j	}|
  |  |  |jj}| jr]|jj|dd}	|	dg D ]}
td	|
d
 dd	|
d d dd qEdS | j}|sltd td d}|D ]}td|| |jj||d}	|	d  d}t||}g }|D ]2}|dd}|dkrdnd}||d |ddt |dd|dd|d d||d!d" qi }|D ]}|||d < qt!|" }td#t#|| |j$|| j%d$}||7 }td%|| qptd&|t#| dS )'z<Pull CSVs from R2 metafiles bucket and ingest into Supabase.r   r   r   )R2ClientNd   )BucketMaxKeysContents  Key50sSize    .A.1f MBz9No CSV keys specified. Use --list to see available files.   zDownloading %s/%s ...)rJ   rN   Bodyzutf-8r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   z"Ingesting %d unique videos from %sr(   zDone: %d videos from %sz&Total ingested: %d videos from %d CSVs)&r*   r   r+   r   codecbench.pipeline.r2_clientrH   ior,   r2r-   r.   r/   r0   metafiles_bucket	list_csvs_clientlist_objects_v2r8   printcsv_keysr   r3   sysexitr4   
get_objectreaddecoder6   r7   StringIOr9   r:   listvaluesr;   r=   r)   )r   r   r   rH   rW   r>   rX   r?   rF   respobjr^   total_ingestedkeycontentrC   rA   rD   rE   bucket_nameseenr   countr   r   r   cmd_syncM   sb   

(





ro   c                 C  sP   ddl m} ddlm} | }||j}|  |  |  t	
d dS )z)Create Supabase tables and RPC functions.r   r   r   zDatabase setup completeN)r*   r   r+   r   r,   r-   r.   r/   r0   r   r4   )r   r   r   r>   r?   r   r   r   cmd_setup_db   s   
rp   c                 C  s   ddl m} ddlm} | }| jr	 | jr| j|j_| j	r#d|j
_| jr+| j|j
_| jr3| j|j
_| jr;| j|j
_| jrC| j|j_| jrK| j|j
_| jdurXt| jd|j
_| jr_d|j
_| jrg| j|j
_||}|  |j| j| jd dS )	zStart the pipeline worker.r   r   )PipelineWorkerFNrT   T)r   
max_videos)r*   r   codecbench.pipeline.workerrq   r,   r   r)   codecxcodec_batch_sizeno_parallelworkerparallel_encodeshard_countshard_pack_countprefetchprefetch_videosoffer_idcustom_ckptxcodec2_custom_ckpttmp_dirlocal_tmp_diroom_segment_thresholdmax	use_asyncuse_async_pipelineextract_workerssetuprunrr   )r   r   rq   r>   rw   r   r   r   cmd_run   s8   







r   c           
      C  s   ddl m} ddlm} | }||j}| }tdd | D }t	dddd	d
dd	dd t	d t
| D ]\}}d| t|d }	t	|dd	|dd	|	dd q>t	d t	ddd	|d t	  dS )zShow pipeline progress.r   r   r   c                 s  s    | ]	}|d kr|V  qdS )r   Nr   ).0vr   r   r   	<genexpr>   s    zcmd_stats.<locals>.<genexpr>
Statusz<15 Count>10%>7z#-----------------------------------rI   rT   z>10,>6.1fTOTALN)r*   r   r+   r   r,   r-   	get_statssumrf   r]   sorteditemsr   )
r   r   r   r>   r?   statstotalr'   rn   pctr   r   r   	cmd_stats   s   
 "
r   c                 C  s  ddl m} ddlm} | }||j}|j|jj	dj
ddd }|jp+g }|s4td d	S d
d |D }dd |D }dd |D }	tdd  tdt| dt| dt|	 dt| d	 td  tddddddddddddddddd d!dd"d!dd#d$dd%d$dd&  tdd'  dd(lm}
m} |D ]}|d)d*d	d+ }|dd*}|d,pd*d	d- }|d.pd}|d/pd}|d0p|d1pd}|d2pd}|d3pdd4 }|d5pdd4 }|d6pd7d	d8 }|d9pd:}d;d<d=d>d?|d*}td|dd| d|d@d|dd|dd|dd|dAdB|dAd|dCdD|dCdD| d|  qtdEdF |D }tdGdF |D }tdHdF |D d4 }tdIdF |D }tdd'  tddJddd:dKdd:dd|dd|ddd:d!d|dAd|dCdL |dkrdM| }|| }tdN|dOdP|dQ dRdS|dTdU t  d	S )Vz*Show all workers and their current status.r   r   r   *r'   F)descz
No workers registered.
Nc                 S     g | ]}| d dkr|qS )r'   ALIVEr8   r   wr   r   r   
<listcomp>       zcmd_fleet.<locals>.<listcomp>c                 S  r   )r'   LOADING_MODELSr   r   r   r   r   r      r   c                 S  s   g | ]}| d dv r|qS )r'   )STOPPEDDEADr   r   r   r   r   r      r   r   zx========================================================================================================================u     WORKER FLEET — z alive, z
 loading, z stopped  |  z totalrM   z	Worker IDz<40r   r   <12GPUz<22Videosr   ShardsRTFz>6zV/hrAudio>8UptimeCurrentzv----------------------------------------------------------------------------------------------------------------------)datetimetimezone	worker_id?&   gpu_name   total_videos_donetotal_shards_producedavg_encode_rtfrtfvideos_per_hourtotal_audio_processed_s  uptime_scurrent_video_idu   —   current_stager    u   ●u   ◐u   ○u   ✗)r   r   r   r   z<10z>5.0fzx >7.1fzh c                 s       | ]}| d dpdV  qdS )r   r   Nr   r   r   r   r   r         zcmd_fleet.<locals>.<genexpr>c                 s  r   )r   r   Nr   r   r   r   r   r     r   c                 s  r   )r   r   Nr   r   r   r   r   r     r   c                 s  r   )r   r   Nr   r   r   r   r   r     r   r   12hi@- z
  ETA at current rate: z,.0fzh (   z,.1fz days) for ,z remaining videos)r*   r   r+   r   r,   r-   r[   tableworkers_tableselectorderexecutedatar]   r;   r   r   r8   r   )r   r   r   r>   r?   resultworkersaliveloadingstoppedr   r   r   widr'   gpuvidsshardsr   vphaudio_huptime_hcur_vidstagestatus_icon
total_vidstotal_shardstotal_audio_h	total_vph	remainingeta_hr   r   r   	cmd_fleet   s   

2
,2
,

&
r   c                 C  sf  ddl m} ddlm} ddlm} | }| jr| j|j_	t
j r%dnd}t
j r2t
jdnd}t
j rBt
jdjd nd}td	 td
|  td|dd t  g }dD ]}	|	|j_||j|d}
|
  t|	d d}g }t|D ]}t
dd}||dd|d q}|
|dd  t
j  t }|
|}t
j  t | }|d t| }dt| | }t
j d }||	|||d td|	 d|dd|dd|d d!	 ~
t
j  t  q_t |d"d# d$}td%d&  td'|d(   td)|d* dd|d+ dd|d, d-d! td.d/|d+  d-d0 t  dS )1z@Profile XCodec2 encoding on this GPU to find optimal batch size.r   r   )
HotEncoder)SegmentcudacpuCPUg    eAz.
=== GPU Encoding Benchmark (XCodec2 only) ===zGPU: zVRAM: rR   z GB)rT            )devicer   r   rT   i w g      @)start_send_saudioNi  rQ   )	xcodec_bs
ms_per_segr   vram_mbz
XCodec BS=z: r   z ms/seg, RTF=r   zx, VRAM=z>6.0frS   c                 S  s   | d S )Nr   r   )r   r   r   r   <lambda>H  s    zcmd_bench.<locals>.<lambda>)rj   r   z<============================================================zBest config: XCodec BS=r   rM   r   r   r   z.0fz  Estimated 1hr audio in: r   z s)!r*   r   codecbench.pipeline.encoderr   codecbench.pipeline.vadr   r,   r~   rt   r   torchr   is_availableget_device_nameget_device_propertiestotal_memoryr]   ru   loadr   rangerandnr9   encode_segmentssynchronizetimeperf_counterr;   memory_allocatedempty_cachegccollectmin)r   r   r   r   r>   r   r   
vram_totalresultsxbsencoder
n_segmentssegmentsiwavt0encodedelapsedr   r   	vram_usedbestr   r   r   	cmd_bench  s`   
 


(

.
r  c                   sn  ddl }ddlm} ddlm} |  |jdtjd tjd tjd d	d
}tjdd}dgdd dD dd dD dd dD dd dD d}g }|	 D ]C\ }|D ]<\}	|
d}
|
j||	ddD ]}|dg D ]}|d }|| d qoqgtd t fdd |D  qVqPtd!t| | }|  |  |j|| jd"}td#| dS )$z>Snapshot all SFT shard paths from R2 and ingest into Supabase.r   N)load_dotenvSFTOrchestrators3R2_ENDPOINT_URLR2_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYauto)endpoint_urlaws_access_key_idaws_secret_access_keyregion_nameR2_BUCKET_DESTINATIONfinalsftdata)zhifitts2/lang=en/enc                 S     g | ]
}d | d|fqS )zindicvoices-r/lang=/r   r   lr   r   r   r   d      z$cmd_sft_snapshot.<locals>.<listcomp>)asbnguhiknmlmrorpatatec                 S  r(  )z
josh/lang=r)  r   r*  r   r   r   r   f  r,  )r.  r'  r/  r0  r3  r6  r7  c                 S  r(  )zjoshdelivery/lang=r)  r   r*  r   r   r   r   h  r,  )r.  r'  r/  r0  r7  c                 S  r(  )z$final-export/production/shards/lang=r)  r   r*  r   r   r   r   j  r,  )r-  r.  r'  r/  r0  r1  r2  r3  r4  r5  r6  r7  )hifitts2zindicvoices-rjoshjoshdeliveryzfinal-exportr\   r)  )rJ   Prefix	DelimiterCommonPrefixesr;  )	shard_keydatasetr   z  %s %s: found %d shards so farc                 3  s,    | ]}|d   kr|d krdV  qdS )r?  r   rT   Nr   r   sds_namerE   r   r   r   {  s   * z#cmd_sft_snapshot.<locals>.<genexpr>zTotal shards discovered: %dr(   z Ingested %d shards into Supabase)boto3dotenvr   codecbench.pipeline.sft_supabaser  clientosenvironr8   r   get_paginatorpaginater9   r   r4   r   r;   r.   r/   ingest_shardsr)   )r   rD  r  r  r  rF   datasets_config
all_shardsprefixesprefix	paginatorpagecpr>  r?   rn   r   rB  r   cmd_sft_snapshotR  sb   


rT  c                 C  sJ   ddl m} dd | jdD }| }|j|| j d}td| dS )	z@Reset shard statuses back to PENDING for a fresh production run.r   r  c                 S  s    g | ]}|  r|   qS r   )stripupperr@  r   r   r   r     s     z!cmd_sft_reset.<locals>.<listcomp>r   )statusesclear_outputszReset %d shard rows to PENDINGN)rF  r  rW  splitreset_pendingkeep_outputsr   r4   )r   r  rW  r?   rn   r   r   r   cmd_sft_reset  s
   r\  c                 C  s   ddl m} ddlm} | }| jr| j|j_| jr | j|j	_| j
r(| j
|j_||}|  |j| j| j| jt| ddd dS )z&Start the SFT shard processing worker.r   r   )	SFTWorker	benchmarkF)
max_shardsr?  r   r^  N)r*   r   codecbench.pipeline.sft_workerr]  r,   r)   rt   ru   r}   rw   r~   r   r   r   r_  r?  r   getattr)r   r   r]  r>   rw   r   r   r   cmd_sft_run  s"   




rb  c           
   
   C  s   ddl m} | }| }tdddddddd	d
ddd td i }|D ]3}|d }|d }|d }|d p<dd }	t|dd|dd|d
d|	d ||d| ||< q)td t| D ]\}}tddd|dd|d
 qgt  dS )z Show SFT shard processing stats.r   r  r   Datasetz<20r   r   r   r   r   z	Audio (h)r   z7-------------------------------------------------------r?  r'   cntaudio_sr   z>9.1fr   N)rF  r  r   r]   r8   r   r   )
r   r  r?   rA   totalsr   dsstrd  r   r   r   r   cmd_sft_stats  s"   (& 
ri  c                  C  s  t jdt jd} | jdddd | jddd	}|jd
dd}|jdddd |jdtdd |jddd}|jdddd |jddddd |jdtdd |jddd |jdd d}|jd!td d"d# |jd$td d%d# |jdtd d&d# |jd'dd(d) |jd*td d+d# |jd,td d-d# |jd.td d/d# |jd0td d1d# |jd2td d3d# |jd4td d5d# |jd6d7dd8d |jd9td d:d# |jd;d<d |jd=d>d |jd?d@d}|jd0td d |jdAdBd}|jdtdd |jdCdDd}|jdEtdFdGd# |jdHddId) |jdJdKd}|jdLtd dMd# |jdNtd dOd# |jd!td dPd# |jdtd dQd# |jd.td dRd# |jd0td dSd# |jdTddUd) |jdVdWd}	|  }
t	|
j
 tttttttttttdX}||
j |
 d S )YNzAudio codec encoding pipeline)descriptionformatter_classz-vz	--verbose
store_true)actioncommandT)destrequiredingestz+Upload video metadata from CSVs to Supabase)helpr1   +zCSV file paths)nargsrr  z--batch-sizei  )typedefaultsyncu/   Pull CSVs from R2 metafiles bucket → Supabaser^   r   z6R2 keys (e.g. indic_podcasts.csv english_podcasts.csv)z--listrZ   z'List available CSVs in metafiles bucket)ro  rm  rr  setup-dbz&Create Supabase tables + RPC functionsr   zStart pipeline workerz
--languagez$Filter videos by language (None=any))ru  rv  rr  z--max-videoszStop after N videoszXCodec2 batch size overridez--no-parallelz%Disable parallel dual-stream encoding)rm  rr  z--shard-countzVideos per shard packz
--prefetchzNumber of videos to prefetchz
--offer-idz"Vast.ai offer ID for this instancez--custom-ckptz!Path to custom XCodec2 checkpointz	--tmp-dirz-Local temp directory for downloads/processingz--oom-segment-thresholdz>Force safer OOM-resistant encode mode above this segment countz--asyncr   z@Use async 3-stage pipeline (overlapping download+extract+encode)z--extract-workersz4Number of parallel ffmpeg+VAD workers for async moder   zShow pipeline progress statsfleetz'Show worker fleet status and throughputbenchz#Profile encoding for optimal configsft-snapshotu#   Snapshot R2 SFT shards → Supabase	sft-resetz(Reset SFT shard statuses back to PENDINGz
--statuseszCLAIMED,PROCESSING,DONE,FAILEDz!Comma-separated statuses to resetz--keep-outputsz7Keep output bookkeeping fields instead of clearing themsft-runz!Start SFT shard processing workerz--max-shardszStop after N shardsz	--datasetzFilter by dataset namezFilter by languagezXCodec2 batch sizezVast.ai offer IDzCustom XCodec2 checkpointz--benchmarkzBBenchmark mode: process 1 shard, confirm next download, report ETA	sft-statszShow SFT processing stats)rq  rw  rx  r   r   ry  rz  r{  r|  r}  r~  )argparseArgumentParserRawDescriptionHelpFormatteradd_argumentadd_subparsers
add_parserintstr
parse_argsr   r   rG   ro   rp   r   r   r   r  rT  r\  rb  ri  rn  )parsersubp_ingestp_syncp_runp_bench
p_sft_snapp_sft_reset	p_sft_runp_sft_statsr   cmd_mapr   r   r   main  s   












r  __main__)F)r   r   r   r   )r   r   r   r   )__doc__
__future__r   r  r6   r  r   rH  r_   r  pathlibr   r   r   rG   ro   rp   r   r   r   r  rT  r\  rb  ri  r  __name__r   r   r   r   <module>   s6    
	
(
?

$

?
B
4


l
