o
    پiS                     @  sZ  d Z ddlmZ ddlZddlmZmZ ddlmZm	Z	m
Z
 ddlZddlmZmZ ddlmZmZmZmZmZmZmZmZmZmZmZmZ ddlmZmZ dd	lm Z  dd
l!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' ddl(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1 ddl2m3Z3 erddl4m5Z5 ddl6m7Z7 e8e9Z:G dd deZ;G dd de;Z<dS )zA tensor parallel worker.    )annotationsN)ABCabstractmethod)TYPE_CHECKINGListOptional)get_pp_groupget_world_group)!DestroyWeightsUpdateGroupReqInputGetWeightsByNameReqInput-InitWeightsSendGroupForRemoteInstanceReqInputInitWeightsUpdateGroupReqInput"LoadLoRAAdapterFromTensorsReqInputLoadLoRAAdapterReqInput#SendWeightsToRemoteInstanceReqInputUnloadLoRAAdapterReqInputUpdateWeightFromDiskReqInput$UpdateWeightsFromDistributedReqInputUpdateWeightsFromIPCReqInputUpdateWeightsFromTensorReqInput)ModelWorkerBatchScheduleBatch)GenerationBatchResult)BaseTokenToKVPoolAllocator)ReqToTokenPool)ForwardBatchPPProxyTensors)
ServerArgs)MultiprocessingSerializerbroadcast_pyobjset_random_seed)get_processorget_tokenizerget_tokenizer_from_processor)monkey_patch_torch_reductions)LayerDoneCounterModelRunnerc                   @  s   e Zd Zed?ddZeed@ddZedAd
dZedBddZdd Z	dd Z
dd ZdCddZdDddZdEddZdFd d!ZdGd#d$ZdHd&d'ZdId)d*ZdJd,d-ZdKd/d0ZdLd2d3ZdMd5d6ZdNd8d9ZdOd<d=Zd>S )PBaseTpWorkerforward_batchr   c                 C     d S N )selfr)   r,   r,   Q/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/tp_worker.pyforward_batch_generation=   s   z%BaseTpWorker.forward_batch_generationreturn'ModelRunner'c                 C  r*   r+   r,   r-   r,   r,   r.   model_runnerA   s   zBaseTpWorker.model_runnerOptional[int]c                 C     | j jS r+   )r3   sliding_window_sizer2   r,   r,   r.   r6   F      z BaseTpWorker.sliding_window_sizeboolc                 C  r5   r+   )r3   is_hybrid_swar2   r,   r,   r.   r9   J   r7   zBaseTpWorker.is_hybrid_swac                 C     | j j| j jfS r+   )r3   full_max_total_num_tokensswa_max_total_num_tokensr2   r,   r,   r.   get_tokens_per_layer_infoN      z&BaseTpWorker.get_tokens_per_layer_infoc                 C  s   t | jjdd S )Npad_input_ids)getattrr3   modelr2   r,   r,   r.   get_pad_input_ids_funcT   s   z#BaseTpWorker.get_pad_input_ids_funcc                 C  r:   r+   )r3   req_to_token_pooltoken_to_kv_pool_allocatorr2   r,   r,   r.   get_memory_poolW   r>   zBaseTpWorker.get_memory_poolrecv_reqr   c                 C  s$   | j j|j|j|jd\}}||fS )N)recapture_cuda_graph)r3   update_weights_from_disk
model_pathload_formatrG   r-   rF   successmessager,   r,   r.   rH   ]   s   
z%BaseTpWorker.update_weights_from_diskr   c                 C  .   | j |j|j|j|j|j|j\}}||fS r+   )r3   init_weights_update_groupmaster_addressmaster_portrank_offset
world_size
group_namebackendrK   r,   r,   r.   rO   e   s   z&BaseTpWorker.init_weights_update_groupr
   c                 C  s   | j |j\}}||fS r+   )r3   destroy_weights_update_grouprT   rK   r,   r,   r.   rV   p   s   z)BaseTpWorker.destroy_weights_update_groupr   c                 C  rN   r+   )r3   +init_weights_send_group_for_remote_instancerP   ports
group_rankrS   rT   rU   rK   r,   r,   r.   rW   v   s   
z8BaseTpWorker.init_weights_send_group_for_remote_instancer   c                 C  s"   | j |j|j|j\}}||fS r+   )r3   send_weights_to_remote_instancerP   rX   rT   rK   r,   r,   r.   rZ      s   z,BaseTpWorker.send_weights_to_remote_instancer   c                 C  s*   | j |j|j|j|j|j\}}||fS r+   )r3   update_weights_from_distributednamesdtypesshapesrT   rJ   rK   r,   r,   r.   r[      s   z,BaseTpWorker.update_weights_from_distributedr   c                 C  s2   t   | jjt|j| j |jd\}}||fS )N)named_tensorsrJ   )r$   r3   update_weights_from_tensorr   deserializeserialized_named_tensorstp_rankrJ   rK   r,   r,   r.   r`      s   

z'BaseTpWorker.update_weights_from_tensorr   c                 C  s   | j |\}}||fS )z:Update weights from IPC for checkpoint-engine integration.)r3   update_weights_from_ipcrK   r,   r,   r.   rd      s   z$BaseTpWorker.update_weights_from_ipcr   c                 C  s   | j |j|j}|S r+   )r3   get_weights_by_namenametruncate_size)r-   rF   	parameterr,   r,   r.   re      s   z BaseTpWorker.get_weights_by_namer   c                 C     | j | }|S r+   )r3   load_lora_adapterto_refr-   rF   resultr,   r,   r.   rj         zBaseTpWorker.load_lora_adapterr   c                 C  ri   r+   )r3   unload_lora_adapterrk   rl   r,   r,   r.   ro      rn   z BaseTpWorker.unload_lora_adapterr   c                 C  s*   t |j}| j| ||j|j}|S r+   )r   ra   serialized_tensorsr3   load_lora_adapter_from_tensorsrk   config_dictadded_tokens_config)r-   rF   tensorsrm   r,   r,   r.   rq      s   z+BaseTpWorker.load_lora_adapter_from_tensorsmodel_worker_batchr   c                 C  s&   t || j}| j|j}|j}|S r+   )r   init_newr3   forwardlogits_output
embeddings)r-   ru   r)   rx   ry   r,   r,   r.   forward_batch_embedding   s   z$BaseTpWorker.forward_batch_embeddingN)r)   r   r0   r1   )r0   r4   )r0   r8   )rF   r   )rF   r   )rF   r
   )rF   r   )rF   r   )rF   r   )rF   r   )rF   r   )rF   r   )rF   r   )rF   r   )rF   r   )ru   r   )__name__
__module____qualname__r   r/   propertyr3   r6   r9   r=   rB   rE   rH   rO   rV   rW   rZ   r[   r`   rd   re   rj   ro   rq   rz   r,   r,   r,   r.   r(   <   s4    












r(   c                   @  s   e Zd ZdZ				dCdDddZdd Zdd Zdd Zdd  Ze	dEd#d$Z
dFd'd(ZdGd*d+Zd,d- Zd.d/ ZdHd3d4Zd5d6 Z				dIdJd=d>ZdKdAdBZdS )LTpModelWorkerzA tensor parallel model worker.FNserver_argsr   gpu_idintrc   moe_ep_rankpp_rankattn_cp_rankmoe_dp_rankdp_rankr4   	nccl_portis_draft_workerr8   rC   Optional[ReqToTokenPool]rD   $Optional[BaseTokenToKVPoolAllocator]is_multi_layer_eaglec                 C  s  || _ |j| _|j| _|j| _|| _|| _|| _|| _|| _|	| _	|
| _
|| _|| _|| _|| _|| _g | _|   |   |rD|   |   |jrRd  | _| _n%| jjrjt|j|j|j|jd| _t| j| _nt |j|j|j|jd| _| j!j"| _"t# | _$t% | _&| j!j'| _'|j(| _(| j!j)| _)| j)dksJ d|j*| _*| j*d u s| j*dksJ dt+| jj,d | j!j-d | _.| j.d | _/| j.dkr| j/dksJ dt0|j1g| j| j | | j&j2| j&j3d dd | _1t4| j1 |j5 | _6|j7d u| _8d | _9d S )	N)tokenizer_modetrust_remote_coderevisionr   zmax_running_request is zero   zSIf configured, max_queued_requests must be at least 1 for any work to be scheduled.   zMemory pool size is too small)src):r   tp_sizeep_sizepp_sizerc   r   r   r   r   r   r   r   rC   rD   r   r   model_runner_list_init_model_config_init_model_runner%_init_multi_layer_eagle_model_runners_init_dllm_algorithmskip_tokenizer_init	tokenizer	processormodel_configis_multimodalr!   tokenizer_pathr   r   r   r#   r"   r3   devicer   pp_groupr	   world_groupmax_total_num_tokensmax_prefill_tokensmax_running_requestsmax_queued_requestsmincontext_lenmax_token_pool_sizemax_req_lenmax_req_input_lenr   random_seed	cpu_groupranksr    disable_overlap_scheduleenable_overlapspeculative_algorithmenable_spechicache_layer_transfer_counter)r-   r   r   rc   r   r   r   r   r   r   r   rC   rD   r   r,   r,   r.   __init__   s   








zTpModelWorker.__init__c                 C  sL   ddl m} |j| j| js| jjn| jj| js| jjn| jj| jd| _	d S )Nr   )ModelConfig)rI   model_revisionis_draft_model)
sglang.srt.configs.model_configr   from_server_argsr   r   rI   speculative_draft_model_pathr    speculative_draft_model_revisionr   )r-   r   r,   r,   r.   r   5  s   

z TpModelWorker._init_model_configc                 C  s   ddl m} |di d| jd| jjd| jd| jd| jd| jd	| j	d
| j
d| jd| jd| jd| jd| jd| jd| jd| jrLdnd | _d S | _d S )Nr   r&   r   mem_fraction_staticr   rc   r   r   moe_ep_sizer   r   r   r   r   r   rC   rD   draft_model_idxr,   )&sglang.srt.model_executor.model_runnerr'   r   r   r   r   rc   r   r   r   r   r   r   r   r   rC   rD   r   _model_runner)r-   r'   r,   r,   r.   r   G  sH   	


z TpModelWorker._init_model_runnerc                 C  s   ddl m} | j| j td| jjD ]K}| j|di d| jd| jj	d| j
d| jd| jd	| jd
| jd| jd| jd| jd| jd| jd| jd| jd| jd| qd S )Nr   r&   r   r   r   r   rc   r   r   r   r   r   r   r   r   r   rC   rD   r   r,   )r   r'   r   appendr3   ranger   speculative_num_stepsr   r   r   rc   r   r   r   r   r   r   r   r   rC   rD   )r-   r'   ir,   r,   r.   r   ]  sN   	
z3TpModelWorker._init_multi_layer_eagle_model_runnersc                 C  s4   ddl m} | jjd ur|| j| _d S d | _d S )Nr   )DllmAlgorithm)sglang.srt.dllm.algorithm.baser   r   dllm_algorithmr   )r-   r   r,   r,   r.   r   w  s   
z"TpModelWorker._init_dllm_algorithmr0   r1   c                 C  s   | j S r+   )r   r2   r,   r,   r.   r3     s   zTpModelWorker.model_runnercounterr%   c                 C  s
   || _ d S r+   )r   )r-   r   r,   r,   r.   'register_hicache_layer_transfer_counter     
z5TpModelWorker.register_hicache_layer_transfer_counterconsumer_indexc                 C  s   | j d ur| j | d S d S r+   )r   set_consumer)r-   r   r,   r,   r.   set_hicache_consumer  s   
z"TpModelWorker.set_hicache_consumerc                 C  sB   | j | j| j| j| j| j| j| j| jj	| jj
j| jj
j| jjjfS r+   )r   r   r   r   r   r   r   r   r3   forward_streamrC   sizemax_context_lentoken_to_kv_poolr2   r,   r,   r.   get_worker_info  s   zTpModelWorker.get_worker_infoc                 C  s
   | j d uS r+   )r   r2   r,   r,   r.   is_dllm  r   zTpModelWorker.is_dllmr)   r   r   c                 C  s$   | j | j|\}}}t|||dS )N)rx   next_token_idscan_run_cuda_graph)r   runr3   r   )r-   r)   rx   r   r   r,   r,   r.   _forward_batch_generation_dllm  s   
z,TpModelWorker._forward_batch_generation_dllmc                 C  r:   r+   )r3   *remote_instance_transfer_engine_session_id+remote_instance_transfer_engine_weight_infor2   r,   r,   r.   (get_remote_instance_transfer_engine_info  r>   z6TpModelWorker.get_remote_instance_transfer_engine_inforu   r   Optional[ForwardBatch]pp_proxy_tensorsOptional[PPProxyTensors]	is_verifyc           	        s@  |d ur |j t|jnd usJ  r!S jjrjj	||d}|j
|j}t||jd |rA S jr[js[|jjd ur[ fdd}| _ S |jshj _ S tjt|jtj|jjd _|jrjd urj|  S jj	||d}|j
|j}}t|||jdS )N)r   skip_attn_backend_initrx   r   expert_distribution_metricsc                     s   j  _ S r+   )r3   sampler   r,   batch_resultr)   rx   r-   r,   r.   sample_batch_func  s   zATpModelWorker.forward_batch_generation.<locals>.sample_batch_func)dtyper   )pp_hidden_states_proxy_tensorsr   r   ) r   hicache_consumer_indexr   rv   r3   r   r   r   is_last_rankrw   rx   can_run_graphr   r   r   r   sampling_infogrammarsdelay_sample_funcis_prefill_onlyr   r   torchzeroslenseq_lenslong	input_idsr   return_logprobnext_token_logitscompute_logprobs_only)	r-   ru   r)   r   r   r   outr   r   r,   r   r.   r/     sp   

z&TpModelWorker.forward_batch_generationbatchr   c           	      C  s   |j dkr| }t|| j}||_|j|_n||j}| jj|j|j	d}|j
|j}}|r9| j||}nd }t|||jd}||_|S )Nr   )split_forward_countr   )split_indexget_model_worker_batchr   rv   r3   split_forward_batchseq_lens_cpuseq_lens_cpu_cacherw   r   rx   r   r   r   r   r   )	r-   r   ru   r)   r   rx   r   r   r   r,   r,   r.   forward_batch_split_prefill  s(   

z)TpModelWorker.forward_batch_split_prefill)FNNF)r   r   r   r   rc   r   r   r   r   r   r   r   r   r   r   r4   r   r   r   r8   rC   r   rD   r   r   r8   r{   )r   r%   )r   r   )r)   r   r0   r   )NNFF)
ru   r   r)   r   r   r   r   r8   r0   r   )r   r   )r|   r}   r~   __doc__r   r   r   r   r   r   r3   r   r   r   r   r   r   r/   r  r,   r,   r,   r.   r      s2    d


	\r   )=r  
__future__r   loggingabcr   r   typingr   r   r   r   sglang.srt.distributedr   r	   sglang.srt.managers.io_structr
   r   r   r   r   r   r   r   r   r   r   r   "sglang.srt.managers.schedule_batchr   r   sglang.srt.managers.schedulerr   sglang.srt.mem_cache.allocatorr    sglang.srt.mem_cache.memory_poolr   ,sglang.srt.model_executor.forward_batch_infor   r   sglang.srt.server_argsr   sglang.srt.utilsr   r   r    &sglang.srt.utils.hf_transformers_utilsr!   r"   r#   sglang.srt.utils.patch_torchr$   $sglang.srt.managers.cache_controllerr%   r   r'   	getLoggerr|   loggerr(   r   r,   r,   r,   r.   <module>   s0   8
 