o
    پiSI                     @   s*  U d dl Z d dlZd dlmZ d dlmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZmZmZ e eZee	jjge	jjf Z ee	jjgee! f Z"G d
d deZ#G dd de#Z$e$ a%ee# e&d< dd Z'de#fddZ(dede)fddZ*G dd de#Z+G dd de#Z,dd Z-dd Z.G d d! d!eZ/G d"d# d#eZ0G d$d% d%e0Z1G d&d' d'e0Z2G d(d) d)e0Z3d*d+ Z4d,e5fd-d.Z6d,e5fd/d0Z7d1d2 Z8d@d4e	j9fd5d6Z:G d7d8 d8e0Z;d4e	j9d9e)fd:d;Z<d<e	j9d=ee	j9 fd>d?Z=dS )A    N)ABC)Callable	GeneratorListOptional)functional_call)NaiveDistributedget_naive_distributedset_naive_distributed)ModelWeightParameter)
ServerArgs)MultiprocessingSerializeris_pin_memory_available)HostSharedMemoryManagerget_host_shared_memory_managerset_host_shared_memory_managerc                   @   sR   e Zd Z		ddeejjddf dee dee	 fddZ
dd Zed	d
 ZdS )BaseOffloaderNall_modules_generatorsubmodule_accessorwhitelist_param_names_creatorc                 C   s   t |S N)listselfr   r   r    r   N/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/utils/offloader.pywrap_modules   s   zBaseOffloader.wrap_modulesc                 C      d S r   r   r   r   r   r   	post_init&      zBaseOffloader.post_initc                 C   s   dS )NFr   r   r   r   r   forbid_copy_engine_usage)   s   z&BaseOffloader.forbid_copy_engine_usageNN)__name__
__module____qualname__r   torchnnModuler   _SubmoduleAccessor_WhitelistParamNamesCreatorr   r   propertyr!   r   r   r   r   r      s    
r   c                   @   s   e Zd ZdS )NoopOffloaderN)r#   r$   r%   r   r   r   r   r,   .   s    r,   	_instancec                   C   s   t d usJ t S r   r-   r   r   r   r   get_offloader6   s   r/   instancec                 C   s   | a d S r   r.   )r0   r   r   r   set_offloader;   s   r1   server_argsdp_rankc                 C   s^   | j dkrtt| j d dS | jdkr,| j dksJ dt| j| j| j| j|| jdS t	 S )Nr   i   @)cpu_offload_max_bytesz.V2 offload does not support cpu_offload_gb yet)
group_sizenum_in_groupprefetch_stepmoder3   dp_size)
cpu_offload_gbOffloaderV1intoffload_group_sizeOffloaderV2offload_num_in_groupoffload_prefetch_stepoffload_moder9   r,   )r2   r3   r   r   r   !create_offloader_from_server_args@   s"   

rB   c                   @   sf   e Zd ZdefddZ		ddeejjddf de	e
 de	e fdd	Zd
ejjdejjfddZdS )r;   r4   c                 C   s   d| _ || _d S )Nr   )_cpu_offload_bytes_cpu_offload_max_bytes)r   r4   r   r   r   __init__U      
zOffloaderV1.__init__Nr   r   r   c                    s    fdd|D S )Nc                    s   g | ]}  |qS r   )maybe_offload_to_cpu).0moduler   r   r   
<listcomp>_   s    z,OffloaderV1.wrap_modules.<locals>.<listcomp>r   r   r   r   r   r   Y   s   zOffloaderV1.wrap_modulesrI   returnc              	      s   t  d  }d u rS |j  tdkrS | j| jkr!S t }d} D ]9}| j| jkr4 n0tj|j	 |j
 |jj|jjd|d}||j ||_|  j|j |j  7  _d}q*|ruj fdd_S )NcpuFsizestridedtypelayoutdevice
pin_memoryTc                     s:   _  fdd  D }t|| |d}_ |S )Nc                    s    i | ]\}}||j  d dqS )Tnon_blocking)torH   kvrR   r   r   
<dictcomp>   s    zEOffloaderV1.maybe_offload_to_cpu.<locals>.forward.<locals>.<dictcomp>argskwargs)forward
state_dictitemsr   )r]   r^   device_stateoutputrR   r_   rI   original_forwardr   r   r_      s   

z1OffloaderV1.maybe_offload_to_cpu.<locals>.forward)next
parametersrR   r&   rC   rD   r   empty_strideddatarN   rO   rP   rQ   copy_numelelement_sizer_   )r   rI   paramsrS   offloaded_parameterspcpu_datar   rd   r   rG   a   s:   z OffloaderV1.maybe_offload_to_cpur"   )r#   r$   r%   r<   rE   r   r&   r'   r(   r   r)   r*   r   rG   r   r   r   r   r;   T   s    
r;   c                   @   st   e Zd ZdedededededefddZ				dd
eejj	d	d	f de
e de
e fddZdd Zedd Zd	S )r>   r5   r6   r7   r8   r3   r9   c           	      C   s   || _ || _|| _|| _tjd }| jdv r1ddlm} | dks%J dtt	||d| d | jd	v r=t
t|d
 g | _d S )NSGLANG_RUN_ID>   shm_cpusharded_gpur   $get_tensor_model_parallel_world_size   not yet support tp_size!=1z/tmp/)rank
world_size
rendezvous>   rr   )	base_name)r5   r6   r7   r8   osenvironsglang.srt.distributedru   r
   r   r   r   
offloaders)	r   r5   r6   r7   r8   r3   r9   run_idru   r   r   r   rE      s0   	



zOffloaderV2.__init__Nr   r   r   c                 C   s   t | jdksJ dtj }g }g }t|D ]D\}}|| || j | j| j kr\||}	||	}
t	
d|dt|	 d|
 dtj   ||	 | jt| j|	||
d qt|D ]\}}t||| j| jd qa|S )	Nr   z"should only call wrap_modules oncez![offloader] offload module_index=z submodule=z params=z memory_allocated=)r8   rI   
alt_streamwhitelist_param_names)indexrI   r   r7   )lenr   r&   cudaStream	enumerateappendr5   r6   loggerinfotypememory_allocated_ModuleOffloaderr8   "_hook_module_forward_for_offloaderr7   )r   r   r   r   r   all_modulesoffload_submodulesmodule_indexrI   	submoduler   r   r   r   r   r      s<   

$
	zOffloaderV2.wrap_modulesc                 C   s6   | j D ]}|  qt| jD ]	}| j |   qd S r   )r   r   ranger7   start_onload)r   	offloaderir   r   r   r      s
   

zOffloaderV2.post_initc                 C   s
   | j dkS )NrL   )r8   r   r   r   r   r!      s   
z$OffloaderV2.forbid_copy_engine_usager"   )r#   r$   r%   r<   strrE   r   r&   r'   r(   r   r)   r*   r   r   r+   r!   r   r   r   r   r>      s4    
*
(r>   c                    s,    fdd}t || fddd d S )Nc                      s(     t         d S r   )r   r   offloadr   r   r   r7   r   r   _on_forward_end   s   z;_hook_module_forward_for_offloader.<locals>._on_forward_endc                      s       S r   )wait_and_get_device_tensorsr   )r   r   r   r   <lambda>   s    z4_hook_module_forward_for_offloader.<locals>.<lambda>)on_forward_endget_parameter_and_buffer_dicts)_hook_module_forward_raw)r   rI   r   r7   r   r   r   r   r      s   
r   c                    s$   j  fdd  _ d S )Nc                     s(   _ t | |d}   _ |S )Nr\   )r_   r   )r]   r^   rc   r_   r   rI   r   re   r   r   r_     s   
z)_hook_module_forward_raw.<locals>.forward)r_   )rI   r   r   r   r   r   r     s   
	r   c                	   @   sZ   e Zd Zdedejjdejjde	e fddZ
dd Zd	d
 Zdd Zdd Zdd ZdS )r   r8   rI   r   r   c                    s    | _ | _t j| _|| _| jtdksJ dd | _d | _t	| j
 tfdd|D sCJ d|dt  fdd|D | _d S )	NrL   z9not handled device=cpu case yet (should skip this tensor)c                 3   s    | ]}| v V  qd S r   r   rH   name)
param_dictr   r   	<genexpr>'  s    
z,_ModuleOffloader.__init__.<locals>.<genexpr>zwhitelist_param_names=z list(param_dict.keys())=c                    s   i | ]}|t j |d qS ))rI   
param_name)_BaseParamOffloadercreater   )r8   rI   r   r   r[   +  s    z-_ModuleOffloader.__init__.<locals>.<dictcomp>)r8   rI   rf   rg   rR   r   r&   _device_tensors_load_eventdictnamed_parametersallr   keys_param_offloaders)r   r8   rI   r   r   r   )r8   rI   r   r   rE     s(   
z_ModuleOffloader.__init__c                 C   s    | j  D ]\}}|  qd S r   )r   ra   r   )r   r   param_offloaderr   r   r   r   0  s   
z_ModuleOffloader.post_initc                 C   sf   | j tj  tj| j  |  | _tj | _	| j	
  W d    d S 1 s,w   Y  d S r   )r   wait_streamr&   r   current_streamstream_create_device_tensorsr   Eventr   recordr   r   r   r   r   4  s   
"z_ModuleOffloader.start_onloadc                 C   s   d | _ d | _d S r   )r   r   r   r   r   r   r   ;  rF   z_ModuleOffloader.offloadc                 C   s   | j d usJ | j  | j S r   )r   r   waitr   r   r   r   r   ?  s   
z,_ModuleOffloader.wait_and_get_device_tensorsc                 C   s   dd | j  D S )Nc                 S   s   i | ]	\}}||  qS r   )create_device_tensorrW   r   r   r   r[   E  s    z;_ModuleOffloader._create_device_tensors.<locals>.<dictcomp>)r   ra   r   r   r   r   r   D     z'_ModuleOffloader._create_device_tensorsN)r#   r$   r%   r   r&   r'   r(   r   r   r   rE   r   r   r   r   r   r   r   r   r   r     s    
r   c                   @   sF   e Zd Zededd fddZdd Zedd Zd	d
 Z	dd Z
dS )r   r8   rK   c                 K   s   t tttd|  di |S )N)metarL   rr   rs   r   )_MetaParamOffloader_CpuParamOffloader_ShmCpuParamOffloader_ShardedGpuParamOffloader)r8   r^   r   r   r   r   I  s   z_BaseParamOffloader.createc                 C   s   || _ || _d S r   )_module_param_namer   rI   r   r   r   r   rE   R  rF   z_BaseParamOffloader.__init__c                 C   s   t | j| jS r   )getattrr   r   r   r   r   r   _paramV  s   z_BaseParamOffloader._paramc                 C   r   r   r   r   r   r   r   r   Z  r    z_BaseParamOffloader.post_initc                 C   s   t r   )NotImplementedErrorr   r   r   r   r   ]  r    z(_BaseParamOffloader.create_device_tensorN)r#   r$   r%   staticmethodr   r   rE   r+   r   r   r   r   r   r   r   r   H  s    
r   c                       s(   e Zd ZdZ fddZdd Z  ZS )r   zUsually used for debugging.c                    s   t  || t|| d S r   )superrE   _move_param_to_metar   	__class__r   r   rE   d  s   z_MetaParamOffloader.__init__c                 C   s   t j| jjddS Nr   rZ   )r&   
empty_liker   ri   r   r   r   r   r   h  s   z(_MetaParamOffloader.create_device_tensor)r#   r$   r%   __doc__rE   r   __classcell__r   r   r   r   r   a  s    r   c                       s$   e Zd Z fddZdd Z  ZS )r   c                    s    t  || t| jdd d S )NTrS   )r   rE   _move_param_to_cpur   r   r   r   r   rE   m  s   z_CpuParamOffloader.__init__c                 C      | j jdddS Nr   TrT   )r   rV   r   r   r   r   r   q     z'_CpuParamOffloader.create_device_tensor)r#   r$   r%   rE   r   r   r   r   r   r   r   l  s    r   c                       ,   e Zd Z fddZdd Zdd Z  ZS )r   c                    s   t  || t  | _t  | _ddlm} | dks"J d| j	j
 s7J d| j	jd| j	 t j| j	j| j	jd| _| jdkrZ| j| j	j
d | j| j	_
nt| j| j t   d S )	Nr   rt   rv   rw   8not yet support non-contiguous tensor self._param.shape= self._param.stride()=)shaperP   rL   )r   rE   r	   get_rank_rankget_world_size_world_sizer~   ru   r   ri   is_contiguousr   rO   r   mallocrP   shm_cpu_datarj   rV   r   r   r   barrierr   rI   r   ru   r   r   r   rE   v  s"   

z_ShmCpuParamOffloader.__init__c              	   C   sf   | j dkr*| j | jj ks*J d| j d| jj d| jd| jjt| j| j d S )Nr   zself.shm_cpu_data.data_ptr()=z self._param.data.data_ptr()=z self.shm_cpu_data=z self._param.data=)r   r   data_ptrr   ri   r   r   r   r   r   r   r   r     s
   
.z_ShmCpuParamOffloader.post_initc                 C   r   r   )r   rV   r   r   r   r   r     r   z*_ShmCpuParamOffloader.create_device_tensorr#   r$   r%   rE   r   r   r   r   r   r   r   r   u  s    r   c                 C   sP   | j |j kr|| _dS | j t dksJ d| j d|j t|dd| _dS )zXUpdate parameter while keeping properties needed by Offloader (e.g. pinned host memory).rL   zparam.device=z new_tensor.device=Tr   N)rR   ri   r&   _create_cpu_data)param
new_tensorr   r   r   update_param  s   
r   rS   c                 C   s   t | j|d| _d S )Nr   )r   ri   )r   rS   r   r   r   r     r   r   c                 C   s   t | d|d}||  |S )NrL   )rR   rS   )_empty_strided_likerj   )ri   rS   rp   r   r   r   r     s   
r   c                    s   t | | t } jd}|tkr$td
d|i fdddD }n|tjjkr3tjj|dd}n
td|d	 t	| || d S )Nr   ri   c                    s   i | ]}|t  |qS r   )r   )rH   rX   	old_paramr   r   r[     s    
z'_move_param_to_meta.<locals>.<dictcomp>)	input_dim
output_dimweight_loaderF)ri   requires_gradzUnknown old_param_type=z old_param=r   )
r   r   ri   rV   r   r&   r'   	Parameter
ValueErrorsetattr)rI   r   old_param_typenew_data	new_paramr   r   r   r     s$   

r   Fxc                 C   s"   t j|  |  | j| j||dS )NrM   )r&   rh   rN   rO   rP   rQ   )r   rR   rS   r   r   r   r     s   r   c                       r   )r   c                    s   t  || t  | _t  | _ddlm} | dks"J d| j	j
 s7J d| j	jd| j	 | jdkrDt| j	dd nt| j| j d | _d S )	Nr   rt   rv   rw   r   r   Tr   )r   rE   r	   r   r   r   r   r~   ru   r   ri   r   r   rO   r   r   r   r   sharded_param_handlesr   r   r   r   rE     s   


z"_ShardedGpuParamOffloader.__init__c                 C   s   | j j sJ d| j jd| j  | j j}td|jd|jd|jdt	j
  | jdkr:|d}t|| j}t	j|d j|d jdd	}t|d
| _t || jdkr_|nd  t| j| j d S )Nr   r   z)[offloader] post_init scatter_src.nbytes=z scatter_src.dtype=z scatter_src.shape=z torch.cuda.memory_allocated()=r   r   )rP   rR   )local_tensor)r   ri   r   r   rO   r   r   nbytesrP   r&   r   r   r   rV   _even_chunkr   empty_create_shared_buffer_tensorsr   r	   scatterr   r   r   )r   scatter_srcscatter_listsharded_paramr   r   r   r     s*   
&

z#_ShardedGpuParamOffloader.post_initc                 C   sV   t | jdd}|| j}t| jD ]}| j| | j }| j| }|| | q|S r   )r   r   chunkr   r   r   r   rj   )r   rc   output_chunksr   src_ranksrc_bufr   r   r   r     s   
z._ShardedGpuParamOffloader.create_device_tensorr   r   r   r   r   r     s    r   chunksc                 C   s6   | j d | dksJ d| j d|t| |S )Nr   zx.shape=z chunks=)r   r   r   )r   r  r   r   r   r     s   (r   r   rK   c                    s   t   t   }t  t fddt|D d}g }t|D ]"}|| d  }|kr=|d u s7J |  q#|t| q#|S )Nc                    s"   g | ]}|kr
d nt  qS r   )r   	serialize)rH   interesting_rankr   	self_rankr   r   rJ   $  s    z1_create_shared_buffer_tensors.<locals>.<listcomp>)dup_serialized_local_tensorr  )	r	   r   r   all_gather_objectr   r   r   r   deserialize)r   ry   object_listoutput_tensorsoutput_rankremote_serialized_tensorr   r  r   r     s.   

r   )F)>loggingr|   abcr   typingr   r   r   r   r&   
torch.funcr   (sglang.srt.distributed.naive_distributedr   r	   r
   sglang.srt.layers.parameterr   sglang.srt.server_argsr   sglang.srt.utilsr   r   #sglang.srt.utils.host_shared_memoryr   r   r   	getLoggerr#   r   r'   r(   r)   r   r*   r   r,   r-   __annotations__r/   r1   r<   rB   r;   r>   r   r   r   r   r   r   r   r   boolr   r   r   Tensorr   r   r   r   r   r   r   r   <module>   sJ   
 
E\6	$
=