o
    پi                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZNmOZOmPZP d dlQmRZRmSZS d dlTmUZU d d	lVmWZW erd d
lXmYZY edZZe[e\Z]G dd deeZ Z^G dd dZ_dS )    )annotationsN)deque)nullcontext)	TYPE_CHECKINGAnyDequeDictGenericListOptionalTupleTypeVar)9AttachHiCacheStorageReqInputAttachHiCacheStorageReqOutputCheckWeightsReqInputCheckWeightsReqOutputClearHiCacheReqInputClearHiCacheReqOutputCloseSessionReqInput!DestroyWeightsUpdateGroupReqInput"DestroyWeightsUpdateGroupReqOutputDetachHiCacheStorageReqInputDetachHiCacheStorageReqOutputDumperControlReqInputDumperControlReqOutputExpertDistributionReqExpertDistributionReqOutputExpertDistributionReqTypeFlushCacheReqInputFlushCacheReqOutputGetInternalStateReqGetInternalStateReqOutputGetLoadReqInputGetLoadReqOutputGetLoadsReqInputGetLoadsReqOutputGetWeightsByNameReqInputGetWeightsByNameReqOutput-InitWeightsSendGroupForRemoteInstanceReqInput.InitWeightsSendGroupForRemoteInstanceReqOutputInitWeightsUpdateGroupReqInputInitWeightsUpdateGroupReqOutput"LoadLoRAAdapterFromTensorsReqInput#LoadLoRAAdapterFromTensorsReqOutputLoadLoRAAdapterReqInputLoadLoRAAdapterReqOutputLoRAUpdateOutputOpenSessionReqInput
ProfileReqProfileReqOutputProfileReqTypeReleaseMemoryOccupationReqInput ReleaseMemoryOccupationReqOutputResumeMemoryOccupationReqInputResumeMemoryOccupationReqOutput#SendWeightsToRemoteInstanceReqInput$SendWeightsToRemoteInstanceReqOutputSetInternalStateReqSetInternalStateReqOutputSlowDownReqInputSlowDownReqOutputUnloadLoRAAdapterReqInputUnloadLoRAAdapterReqOutput$UpdateWeightsFromDistributedReqInput%UpdateWeightsFromDistributedReqOutputUpdateWeightsFromIPCReqInputUpdateWeightsFromIPCReqOutputUpdateWeightsFromTensorReqInput UpdateWeightsFromTensorReqOutput)LoRARef
ServerArgs)get_bool_env_var)TypeBasedDispatcher)TokenizerManagerTc                   @  sL   e Zd ZdZddddZdddZdd Zdd ZdddZe	dd Z
dS )_CommunicatorzJNote: The communicator now only run up to 1 in-flight request at any time.queueingsender
zmq.Socketfan_outintc                 C  s6   || _ || _|| _d | _d | _t | _|dv sJ d S )N)rN   watching)_sender_fan_out_mode_result_event_result_valuesr   _ready_queue)selfrO   rQ   mode r\   d/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/tokenizer_communicator_mixin.py__init__c   s   z_Communicator.__init__objrL   c                   s   t  }| jd ust| jdkr,| j| | I d H  | jd u s%J | jd u s,J |r4| j	| t  | _g | _| j I d H  | j}d  | _| _t| jdkr[| j
   |S Nr   )asyncioEventrW   lenrY   appendwaitrX   rT   
send_pyobjpopleftset)rZ   r_   ready_eventresult_valuesr\   r\   r]   queueing_callm   s"   
z_Communicator.queueing_callc                   sf   | j d u r| jd u sJ g | _t | _ |r| j| | j  I d H  t| j}d  | _ | _|S N)	rW   rX   ra   rb   rT   rf   re   copydeepcopy)rZ   r_   rj   r\   r\   r]   watching_call   s   

z_Communicator.watching_callc                   s,   | j dkr| |I d H S | |I d H S )NrN   )rV   rk   ro   rZ   r_   r\   r\   r]   __call__   s   
z_Communicator.__call__recv_objc                 C  s.   | j | t| j | jkr| j  d S d S rl   )rX   rd   rc   rU   rW   rh   )rZ   rr   r\   r\   r]   handle_recv   s   z_Communicator.handle_recvc                 C  s2   t dd | D }dd | D }d|}||fS )Nc                 S     g | ]}|j qS r\   )success.0rr\   r\   r]   
<listcomp>       z/_Communicator.merge_results.<locals>.<listcomp>c                 S  rt   r\   )messagerv   r\   r\   r]   ry      rz   z | )alljoin)resultsall_successall_messager\   r\   r]   merge_results   s   
z_Communicator.merge_resultsN)rN   )rO   rP   rQ   rR   )r_   rL   )rr   rL   )__name__
__module____qualname____doc__r^   rk   ro   rq   rs   staticmethodr   r\   r\   r\   r]   rM   `   s    


rM   c                   @  s  e Zd ZdZdddZddd	ZdddZdddZ			ddddZdddZ											ddd-d.Z
dd/d0Zdd3d4Zdd5d6Zdd7d8Zdd9d:Z	ddd@dAZ	dddCdDZ	dddFdGZ	dddIdJZ	dddLdMZ	dddOdPZ	dddRdSZddVdWZ	ddd[d\Z	ddd_d`Z	dddadbZ	dddddeZ	dddgdhZ	dddjdkZ	dddndoZ	dddqdrZddtduZ ddxdyZ!dd|d}Z"dddZ#		ddddZ$	ddddZ%	ddddZ&dddZ'dS )TokenizerCommunicatorMixinzLMixin class for TokenizerManager to handle communication with the scheduler.rZ   rK   server_argsrH   c                 C  s  t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _	t | j|j| _
t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|j| _t | j|jdd| _t | j|j| _t | j|j| _|  j|  7  _d S )NrS   )r[   )rM   send_to_schedulerdp_size&init_weights_update_group_communicator)destroy_weights_update_group_communicator,update_weights_from_distributed_communicator8init_weights_send_group_for_remote_instance_communicator,send_weights_to_remote_instance_communicator'update_weights_from_tensor_communicator$update_weights_from_ipc_communicator get_weights_by_name_communicator&release_memory_occupation_communicator%resume_memory_occupation_communicatorcheck_weights_communicatorslow_down_communicatorflush_cache_communicator"clear_hicache_storage_communicator#attach_hicache_storage_communicator#detach_hicache_storage_communicatorprofile_communicatorget_internal_state_communicatorset_internal_state_communicator expert_distribution_communicator update_lora_adapter_communicatorget_load_communicatorget_loads_communicatordumper_control_communicator_result_dispatcher_get_communicator_dispatcher)rZ   r   r\   r\   r]   init_communicators   s   
z-TokenizerCommunicatorMixin.init_communicatorsc                 C  s   t t| jjft| jjft| jjft| j	jft
| jjft| jjft| jjft| jjft| jjft| jjft| jjft| jjft| jjft| jjft| jjft | j!jft"| j#jft$| j%jft&| j'jft(| j)jft*| j+jft,| j-jft.| j/jft0| j1jfgS rl   )2rJ   r+   r   rs   r   r   rB   r   r)   r   r:   r   rF   r   rD   r   r'   r   r6   r   r8   r   r   r   r>   r   r   r   r   r   r   r   r   r   r3   r   r!   r   r<   r   r   r   r0   r   r#   r   r%   r   r   r   rZ   r\   r\   r]   r      s   z7TokenizerCommunicatorMixin._get_communicator_dispatcherreturnr   c                   s   |  t I d H d S r`   )r   r   r   r\   r\   r]   flush_cacheZ  s   z&TokenizerCommunicatorMixin.flush_cacher   c                   s   |  t I dH d S )z%Clear the hierarchical cache storage.Nr   )r   r   r   r\   r\   r]   clear_hicache_storage]  s   z0TokenizerCommunicatorMixin.clear_hicache_storageNhicache_storage_backendstr)hicache_storage_backend_extra_config_jsonOptional[str]hicache_storage_prefetch_policyhicache_write_policyr   c           	        sx   |  t||||dI dH }t|\}}t||d}|r:|| j_|dur*|| j_|dur2|| j_|dur:|| j_	|S )z3Attach (enable) HiCache storage backend at runtime.)r   r   r   r   Nru   r{   )
r   r   rM   r   r   r   r   $hicache_storage_backend_extra_configr   r   )	rZ   r   r   r   r   r~   r   r   outr\   r\   r]   attach_hicache_storaged  s,   
	z1TokenizerCommunicatorMixin.attach_hicache_storager   c                   sF   |  t I dH }t|\}}t||d}|r!d| j_d| j_|S )z4Detach (disable) HiCache storage backend at runtime.Nr   )r   r   rM   r   r   r   r   r   )rZ   r~   r   r   r   r\   r\   r]   detach_hicache_storage  s   
z1TokenizerCommunicatorMixin.detach_hicache_storageF
output_dir
start_stepOptional[int]	num_steps
activitiesOptional[List[str]]
with_stackOptional[bool]record_shapesprofile_by_stageboolmerge_profilesprofile_prefixprofile_stagesc                   s|   |    tdd}|du s|du rdnd}tdd}|duo |}ttj|||||||tt ||	|
d}| |I d H S )NSGLANG_PROFILE_WITH_STACKtrueFTSGLANG_PROFILE_RECORD_SHAPES)typer   r   r   r   r   r   r   
profile_idr   r   r   )auto_create_handle_looprI   r2   r4   START_PROFILEr   time_execute_profile)rZ   r   r   r   r   r   r   r   r   r   r   env_with_stackenv_record_shapesreqr\   r\   r]   start_profile  s.   

z(TokenizerCommunicatorMixin.start_profilec                   s&   |    ttjd}| |I d H S )N)r   )r   r2   r4   STOP_PROFILEr   rZ   r   r\   r\   r]   stop_profile  s   z'TokenizerCommunicatorMixin.stop_profiler   r2   c                   s*   |  |I d H d }|jst|j|S r`   )r   ru   RuntimeErrorr{   )rZ   r   resultr\   r\   r]   r     s
   
z+TokenizerCommunicatorMixin._execute_profilec                   *   |    ttjd}| |I d H  d S N)action)r   r   r   START_RECORDr   r   r\   r\   r]    start_expert_distribution_record     z;TokenizerCommunicatorMixin.start_expert_distribution_recordc                   r   r   )r   r   r   STOP_RECORDr   r   r\   r\   r]   stop_expert_distribution_record  r   z:TokenizerCommunicatorMixin.stop_expert_distribution_recordc                   r   r   )r   r   r   DUMP_RECORDr   r   r\   r\   r]   dump_expert_distribution_record  r   z:TokenizerCommunicatorMixin.dump_expert_distribution_recordr_   r*   requestOptional[fastapi.Request]Tuple[bool, str]c                   @   |    | jjdks| jjsJ d| |I d H }t|S )N   Udp_size must be 1 or dp attention must be enabled for update weights from distributed)r   r   r   enable_dp_attentionr   rM   r   rZ   r_   r   r~   r\   r\   r]   init_weights_update_group     
z4TokenizerCommunicatorMixin.init_weights_update_groupr   c                   r   )Nr   zTdp_size must be 1 or dp attention must be enabled for destroy parameter update group)r   r   r   r   r   rM   r   r   r\   r\   r]   destroy_weights_update_group  r   z7TokenizerCommunicatorMixin.destroy_weights_update_grouprA   c              	       |    | jjdks| jjsJ d|jr| jdd | j4 I d H  | j}W d   I d H  n1 I d H s7w   Y  |sB| jj	nt
 }|4 I d H  | |I d H }W d   I d H  n1 I d H sdw   Y  t|\}}|r|jd ur| |j |d|j d7 }||fS )Nr   r   T	abort_all Weight version updated to .)r   r   r   r   abort_all_requestsabort_requestis_pause_condis_pausemodel_update_lockwriter_lockr   r   rM   r   weight_version"_update_weight_version_if_providedrZ   r_   r   	is_pausedlock_contextr~   ru   r{   r\   r\   r]   update_weights_from_distributed  (   ((z:TokenizerCommunicatorMixin.update_weights_from_distributedr(   c                   >   |    | jjdksJ d| |I d H d }|j|jfS )Nr   zAdp_size must be 1 for init_weights_send_group_for_remote_instancer   )r   r   r   r   ru   r{   rZ   r_   r   r   r\   r\   r]   +init_weights_send_group_for_remote_instance  s   zFTokenizerCommunicatorMixin.init_weights_send_group_for_remote_instancer9   c                   r   )Nr   z5dp_size must be 1 for send_weights_to_remote_instancer   )r   r   r   r   ru   r{   r   r\   r\   r]   send_weights_to_remote_instance  s   z:TokenizerCommunicatorMixin.send_weights_to_remote_instancerE   c              	     r   )Nr   zPdp_size must be 1 or dp attention must be enabled for update weights from tensorTr   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   rM   r   r   r   r   r\   r\   r]   update_weights_from_tensor)  r   z5TokenizerCommunicatorMixin.update_weights_from_tensorrC   c              
     s  |    zD| jjdks| jjsJ dtd | jj4 I dH  | |I dH d }|j	|j
}}W d  I dH  n1 I dH sCw   Y  W n# tyl } zdt| }t| d|}}W Y d}~nd}~ww |r|jdur| |j |d|j d	7 }||fS )
z9Update weights via IPC for checkpoint-engine integration.r   zMdp_size must be 1 or dp attention must be enabled for update weights from IPCzStarting IPC weight updateNr   zIPC weight update failed: Fr   r   )r   r   r   r   loggerinfor   r   r   ru   r{   	Exceptionr   errorr   r   )rZ   r_   r   r   ru   r{   e	error_msgr\   r\   r]   update_weights_from_ipcG  s,   
(
z2TokenizerCommunicatorMixin.update_weights_from_ipcr?   r@   c                   sX   | j  s
J d| j|jI d H }||_| j|I d H  | |I d H d }|S )Nzaself.lora_update_lock must be locked in order for self._unload_lora_adapter_locked() to be calledr   )lora_update_locklockedlora_registry
unregister	lora_namelora_idwait_for_unloadr   )rZ   r_   r  r   r\   r\   r]   _unload_lora_adapter_lockedc  s   z6TokenizerCommunicatorMixin._unload_lora_adapter_lockedr.   _r/   c              
     s  |    z| jjstd| jjdksJ dtd|j|j | j	4 I d H  t
|j|j|jd}|j|_| |I d H d }|jrS| j|I d H  || j|j< | jjd ur| jj| jjkr| jjddI d H }|d u rxtd	| jj td
| d| jj d| jj d | t|dI d H }|jstd| d|j |j|= | jj| jjksa|W  d   I d H  W S 1 I d H sw   Y  W d S  ty } ztdt|dW  Y d }~S d }~ww )N?LoRA is not enabled. Please set `--enable-lora` to enable LoRA.r   *dp_size must be 1 for dynamic lora loadingz.Start load Lora adapter. Lora name=%s, path=%sr  	lora_pathpinnedr   Texclude_pinnedWDidn't find any LoRA adapters when trying to evict LRU LoRA adapter. LoRA registry is: ,Unloading least recently used LoRA adapter '' (current number of adapters: , max allowed: )r  (Error while unloading LRU LoRA adapter '': Fru   error_message)r   r   enable_lora
ValueErrorr   r  r  r  r  r  rG   r  r  r   ru   r
  registerlora_ref_cachemax_loaded_lorasnum_registered_loraslru_lora_name	_registryr  r?   r!  loaded_adaptersr/   r   rZ   r_   r  new_adapterr   r(  unload_resultr  r\   r\   r]   load_lora_adapterw  s   
40z,TokenizerCommunicatorMixin.load_lora_adapterr,   r-   c              
     s  |    z| jjstd| jjdksJ dtd|j | j4 I d H  t	|jd|j
d}|j|_| |I d H d }|jrP| j|I d H  || j|j< | jjd ur| jj| jjkr| jjdd	I d H }|d u rutd
| jj td| d| jj d| jj d | t|dI d H }|jstd| d|j |j|= | jj| jjks^|W  d   I d H  W S 1 I d H sw   Y  W d S  ty } ztdt|dW  Y d }~S d }~ww )Nr  r   r  z2Start load Lora adapter from tensors. Lora name=%s
__tensor__r  r   Tr  r  r  r  r  r  r  r  r  Fr   )r   r   r"  r#  r   r  r  r  r  rG   r  r  r   ru   r
  r$  r%  r&  r'  r(  r)  r  r?   r!  r*  r-   r   r+  r\   r\   r]   load_lora_adapter_from_tensors  s   
4+z9TokenizerCommunicatorMixin.load_lora_adapter_from_tensorsc              
     s   |    zK| jjstd|jd usJ d| jjdks!J dtd|j | j4 I d H  | 	|I d H W  d   I d H  W S 1 I d H sIw   Y  W d S  tyj } zt
dt|dW  Y d }~S d }~ww )Nr  z1lora_name must be provided to unload LoRA adapterr   r  z'Start unload Lora adapter. Lora name=%sFr   )r   r   r"  r#  r  r   r  r  r  r  r@   r   )rZ   r_   r  r  r\   r\   r]   unload_lora_adapter  s0   4z.TokenizerCommunicatorMixin.unload_lora_adapterr&   c                   s@   |    | |I d H }dd |D }| jjdkr|d S |S )Nc                 S  rt   r\   )	parameterrv   r\   r\   r]   ry   2  rz   zBTokenizerCommunicatorMixin.get_weights_by_name.<locals>.<listcomp>r   r   )r   r   r   r   )rZ   r_   r   r~   all_parametersr\   r\   r]   get_weights_by_name+  s   z.TokenizerCommunicatorMixin.get_weights_by_namer5   c                      |    | |I d H  d S rl   )r   r   rZ   r_   r   r\   r\   r]   release_memory_occupation8     z4TokenizerCommunicatorMixin.release_memory_occupationr7   c                   r5  rl   )r   r   r6  r\   r\   r]   resume_memory_occupation@  r8  z3TokenizerCommunicatorMixin.resume_memory_occupationr   r   c                   s$   |    | |I d H }t|S rl   )r   r   rM   r   r   r\   r\   r]   check_weightsH  s   
z(TokenizerCommunicatorMixin.check_weightsr=   c                   r5  rl   )r   r   r6  r\   r\   r]   	slow_downQ  r8  z$TokenizerCommunicatorMixin.slow_downList[Dict[Any, Any]]c                   s&   t  }| |I d H }dd |D S )Nc                 S  rt   r\   )internal_staterw   resr\   r\   r]   ry   _  rz   zATokenizerCommunicatorMixin.get_internal_state.<locals>.<listcomp>)r    r   )rZ   r   	responsesr\   r\   r]   get_internal_stateY  s
   z-TokenizerCommunicatorMixin.get_internal_stater;   
List[bool]c                   s    |  |I d H }dd |D S )Nc                 S  rt   r\   )updatedr>  r\   r\   r]   ry   g  rz   zATokenizerCommunicatorMixin.set_internal_state.<locals>.<listcomp>)r   )rZ   r_   r@  r\   r\   r]   set_internal_statea  s   z-TokenizerCommunicatorMixin.set_internal_stater   List[DumperControlReqOutput]c                   s   |  |I d H S rl   )r   rp   r\   r\   r]   dumper_controli  s   z)TokenizerCommunicatorMixin.dumper_controlList[GetLoadReqOutput]c                   s   t  }| |I d H S rl   )r"   r   r   r\   r\   r]   get_loadn  s   z#TokenizerCommunicatorMixin.get_loadincludedp_rankList[GetLoadsReqOutput]c                   sF   t |r|ndg d}| |I dH } dur! fdd|D }|S )ah  
        Get comprehensive load metrics for /v1/loads endpoint.

        Args:
            include: List of sections to include. Options: core, memory, spec, lora, disagg, queues, all
            dp_rank: Optional filter for specific DP rank

        Returns:
            List of GetLoadsReqOutput, one per scheduler (filtered by dp_rank if specified)
        r|   )rI  rJ  Nc                   s   g | ]	}|j  kr|qS r\   rJ  rv   rL  r\   r]   ry     s    z8TokenizerCommunicatorMixin.get_loads.<locals>.<listcomp>)r$   r   )rZ   rI  rJ  r   r~   r\   rL  r]   	get_loadsr  s   z$TokenizerCommunicatorMixin.get_loadsr1   c                   sn   |    |jd u rt j|_n|j| jv rd S | j| t	 | j|j< | j|j I d H }| j|j= |S rl   )
r   
session_iduuiduuid4hexsession_futuresr   rf   ra   Future)rZ   r_   r   rN  r\   r\   r]   open_session  s   

z'TokenizerCommunicatorMixin.open_sessionr   c                   s   | j |I d H  d S rl   )r   rf   r6  r\   r\   r]   close_session  s   z(TokenizerCommunicatorMixin.close_sessionr   Nonec                 C  s   |dur
|| j _dS dS )z"Update weight version if provided.N)r   r   )rZ   r   r\   r\   r]   r     s   z=TokenizerCommunicatorMixin._update_weight_version_if_provided)rZ   rK   r   rH   )rZ   rK   )rZ   rK   r   r   )rZ   rK   r   r   )NNN)rZ   rK   r   r   r   r   r   r   r   r   r   r   )rZ   rK   r   r   )
NNNNNNFFNN)rZ   rK   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )rZ   rK   r   r2   rl   )rZ   rK   r_   r*   r   r   r   r   )r_   r   r   r   r   r   )rZ   rK   r_   rA   r   r   r   r   )r_   r(   r   r   r   r   )r_   r9   r   r   r   r   )rZ   rK   r_   rE   r   r   r   r   )r_   rC   r   r   r   r   )rZ   rK   r_   r?   r   r@   )rZ   rK   r_   r.   r  r   r   r/   )rZ   rK   r_   r,   r  r   r   r-   )rZ   rK   r_   r?   r  r   r   r@   )rZ   rK   r_   r&   r   r   )rZ   rK   r_   r5   r   r   )rZ   rK   r_   r7   r   r   )rZ   rK   r_   r   r   r   r   r   )rZ   rK   r_   r=   r   r   )rZ   rK   r   r<  )rZ   rK   r_   r;   r   rB  )rZ   rK   r_   r   r   rE  )rZ   rK   r   rG  )NN)rZ   rK   rI  r   rJ  r   r   rK  )r_   r1   r   r   )r_   r   r   r   )r   r   r   rV  )(r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r.  r0  r1  r4  r7  r9  r:  r;  rA  rD  rF  rH  rM  rT  rU  r   r\   r\   r\   r]   r      s    

M
f


#
$



!!
QI#



r   )`
__future__r   ra   rm   loggingr   rO  collectionsr   
contextlibr   typingr   r   r   r   r	   r
   r   r   r   fastapizmqsglang.srt.managers.io_structr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   sglang.srt.server_argsrG   rH   sglang.srt.utilsrI   sglang.utilsrJ   %sglang.srt.managers.tokenizer_managerrK   rL   	getLoggerr   r  rM   r   r\   r\   r\   r]   <module>   s*    ,;
D