o
    پiw*                     @   sf  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZ ddlmZ ddlmZ ddlZddlZddlmZ ddlmZ zddlmZ dd	lmZ W n eyl   dZddlZeeZY nw ed
efddZ 	d2dede!dedB fddZ"dede!de!de#e fddZ$dede!de!de%eej&f fddZ'			d3dede!de(dedB dedB dee#e)eef  gdf fddZ*			d4d ed!e#e d"e%eej&f d#ee#e)eef  gdf de!ded$edB d%ed& dedB fd'd(Z+	d2d)ed ed*ed#ee#e)eef  gdf de!dededB fd+d,Z,d-d. Z-d/d0 Z.ed1kr1e.  dS dS )5a  
Usage:
1) Launch the server with wait-for-initial-weights option in one terminal:
   python -m sglang.launch_server --model-path /workspace/Qwen/Qwen3-4B/ --tensor-parallel-size 2 --port 19730 --load-format dummy --checkpoint-engine-wait-weights-before-ready --mem-fraction-static 0.7

2) Torchrun this script in another terminal:
    torchrun --nproc-per-node 2 update.py --update-method broadcast --checkpoint-path /workspace/Qwen/Qwen3-4B/  --inference-parallel-size 2

Or use the integrated entry point:
    python -m sglang.srt.checkpoint_engine.update --update-method broadcast --checkpoint-path /workspace/Qwen/Qwen3-4B/  --inference-parallel-size 2
    N)defaultdict)Callable)contextmanager)Literal)	safe_open)ParameterServer)loggermsgc                 c   s8    t  }d V  t  }t|  d|| dd d S )Nz duration: z.2fz seconds)timeperf_counterr   info)r	   startend r   W/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/checkpoint_engine/update.pytimer)   s
    r   endpointinference_parallel_sizeudsc           	      C   s   t tdd}||| | krd S d}d }|d ur tj|d}tj|dL}	 z|j|  ddd}|  W n1 tjtj	fyh } z |d dkrUt
d	| d
|  |d7 }td W Y d }~nd }~ww q(W d    d S 1 suw   Y  d S )NRANKr   r   	transportTz/ping
   )timeoutz"fail to check sglang ready, retry z times, error:    g?)intosgetenvhttpxHTTPTransportClientgetraise_for_statusConnectErrorHTTPStatusErrorr   warningr
   sleep)	r   r   r   rank	retry_numr   clientresponseer   r   r   check_sglang_ready1   s2   "r-   checkpoint_pathr(   
world_sizereturnc                    sN    fddt dd t D }t|| d | }||| |d |  S )Nc                    s   g | ]	}t j |qS r   )r   pathjoin).0fr.   r   r   
<listcomp>M   s    z*split_checkpoint_files.<locals>.<listcomp>c                 S   s
   |  dS )Nz.safetensors)endswith)xr   r   r   <lambda>P   s   
 z(split_checkpoint_files.<locals>.<lambda>r   )filterr   listdirlen)r.   r(   r/   checkpoint_filesfiles_per_rankr   r5   r   split_checkpoint_filesJ   s   
r?   c              	   C   s  t j| d}t|}t|d }W d    n1 sw   Y  t|| d | }tt}t|	 }||| |d |  D ]\}	}
||
 
|	 qBi }|	 D ]+\}
}tt j| |
dd}|D ]	}	||	||	< qfW d    n1 szw   Y  qT|S )Nmodel.safetensors.index.json
weight_mapr   pt)	framework)r   r1   r2   openjsonloadr<   r   listitemsappendr   
get_tensor)r.   r(   r/   index_fnr4   rA   weights_per_rank
fn_tensorsweight_keysnamefilenamed_tensorsnamesr   r   r   split_tensorsW   s(   
rS        r@r   weight_versionc                    sJ   t tdd  dttttf  f fdd}|S )Nr   r   socket_pathsc                    s|   kr<t jt jdd$}|j  dt|   ddd}|  W d    d S 1 s5w   Y  d S d S )Nr   r   z/update_weights_from_ipcT)zmq_handlesflush_cacherU   )rE   r   )r   r!   r    postdictr#   )rV   r*   respr   r   r(   srcr   r   rU   r   r   req_funcv   s   
"zreq_inference.<locals>.req_func)r   r   r   rG   tuplestr)r   r   r   r   rU   r^   r   r\   r   req_inferencel   s   *ra   	broadcastcheckpoint_namer=   rQ   r^   save_metas_fileupdate_method)rb   p2pallc
                 C   sZ  | j |||d |   t|||	 t  td | | W d    n1 s*w   Y  |rWtt	ddkrWt
|d}
t|  |
 W d    n1 sRw   Y  |dks_|dkrytd | || W d    n1 stw   Y  |d	ks|dkr|rtd
 td | j||tt|d W d    d S 1 sw   Y  d S d S )N)filesrQ   zGather metasr   r   wbrb   rg   z$Update weights without setting ranksrf      z!Update weights with setting ranksranks)register_checkpointinit_process_groupr-   distbarrierr   gather_metasr   r   r   rD   pickledump	get_metasupdater
   r'   rG   range)psrc   r=   rQ   r^   r   r   rd   re   r   r4   r   r   r   update_weights   s4   



"rx   rw   load_metas_filec           	      C   s   |sJ dt |d}t|}W d    n1 sw   Y  |   t||| t  td | | W d    n1 sBw   Y  | 	| td| d | j
||tt|d W d    d S 1 slw   Y  d S )Nzload_metas_file is requiredrbzGather metas before joinz.Update weights with setting ranks as range(0, z) by using p2prk   )rD   rr   rF   rn   r-   ro   rp   r   rq   
load_metasru   rG   rv   )	rw   rc   ry   r^   r   r   r   r4   metasr   r   r   r2      s    	


"r2   c               	   C   sH  d} t jdd }t|D ]C\}}|dkr3|d t|k r3z
t||d  } W n	 ty0   Y nw  n|drPzt|ddd } W n	 tyM   Y nw  nqdd|  tg| }t	d	d

| t jd ztj|dd}t |j W dS  ty   t	dt jd t d Y dS  ty   t	dt jd t d Y dS w )z2Run the update script with torchrun automatically.   r   N--inference-parallel-sizez--inference-parallel-size==torchrunz--nproc-per-node=z	Running:  rP   F)checkzFError: torchrun command not found. Please ensure PyTorch is installed.z
Interrupted by user   )sysargv	enumerater<   r   
ValueError
startswithsplit__file__printr2   stderr
subprocessrunexit
returncodeFileNotFoundErrorKeyboardInterrupt)r   argsiargcmdresultr   r   r   run_with_torchrun   sB   
r   c                  C   s  t dd u rt  d S tjdd} | jdtd d | jdtd d | jdtd d | jdtd	d | jd
tdd | jdtdd | jdtdd | jdtdd | jdtd d | jdtd d |  }tt dd	}tt dd}t	|j
|j|j|jd}td u rtdtjd td tdd}d |_|jrt||j|j||j|j
|j n:|jrt jt j|jdrt|j||}g }n|jrt|j||ng }i }t||j||||j|j
|j|j|j
 t |j! d S )Nr   zUpdate weights example)descriptionz--checkpoint-path)typedefaultz--save-metas-filez--load-metas-filez--sleep-timer   z
--endpointzhttp://localhost:19730r~   r}   z--checkpoint-namezmy-checkpoint-iter-0z--update-methodrb   z--udsz--weight-version
WORLD_SIZEr   )r   rU   z.Error: checkpoint_engine package not availabler   T)auto_pgr@   )"r   r   r   argparseArgumentParseradd_argumentr`   r   
parse_argsra   r   r   r   rU   r   r   r   r   r   
_p2p_storery   r2   rc   r.   r1   existsrS   r?   rx   rd   re   r
   r'   
sleep_time)parserr   r(   r/   r^   rw   rQ   r=   r   r   r   main   sz   


r   __main__)N)rT   NN)Nrb   N)/__doc__r   rE   r   rr   r   r   r
   collectionsr   collections.abcr   
contextlibr   typingr   r   torchtorch.distributeddistributedro   safetensorsr   checkpoint_engine.psr   logurur   ImportErrorlogging	getLogger__name__r`   r   r   r-   rG   r?   rZ   TensorrS   floatr_   ra   rx   r2   r   r   r   r   r   r   <module>   s    



%	

-
)
L
