o
    پi#                     @  s   d Z ddlmZ ddlZddlmZmZ ddlZddlm	Z
 ddlmZ ddlmZ ddlmZ ddlmZmZmZmZmZmZ erNdd	lmZ dd
lmZ eeZG dd dZdS )z5
Mixin class providing multiplexing scheduling logic
    )annotationsN)TYPE_CHECKINGOptional)ExternalStream)set_pdmux_status)ForwardMode)get_current_stream_idxget_sm_countsget_stream_groupsinitialize_stream_groupsload_pdmux_configset_current_stream_idx)ScheduleBatch)	Schedulerc                   @  s<   e Zd ZdddZdddZdddZe dddZdS )SchedulerMultiplexMixinselfr   c                 C  s\   d | _ t| jj| _t| j| j t | _t	 | _
t| j| _td| j d| j
  d S )NzPD-Multiplexing enabled with z3 stream groups, sm_counts (prefill_sm, decode_sm): )split_prefill_batchr   server_argspdmux_config_pathpdmux_configr   gpu_idr
   stream_groupsr	   	sm_countslenreal_sm_group_numloggerinfo)r    r   [/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/multiplex/multiplexing_mixin.py
init_pdmux"   s   z"SchedulerMultiplexMixin.init_pdmuxreturn1tuple[int, tuple[ExternalStream, ExternalStream]]c                 C  s   | j  sE| jrE| j  }| jj}|r,tt|D ]}|| \}}}||kr*|d }qntdt	| j
d || j
d  | jj }t| n| j  sRt| j
d  ntd t }| jj| || j| fS )N      r   )running_batchis_emptyr   
batch_sizer   manual_divisionsranger   maxminr   decode_bs_divisorr   r   	tp_workermodel_runnerupdate_decode_attn_backendr   )r   	decode_bsr'   i_	threshold
stream_idxr   r   r   adjust_stream_groups1   s8   

	
z,SchedulerMultiplexMixin.adjust_stream_groupssm_countintboolc                 C  s4   | j rdS |  }|r| stj|_|| _ dS dS )NFT)r   get_new_batch_prefillr%   r   SPLIT_PREFILLforward_mode)r   r5   batchr   r   r   update_split_prefill_batchQ   s   z2SchedulerMultiplexMixin.update_split_prefill_batchc              	   C  s\  d}d}d}d}t  }| j| }|d }|d }tj  td 	 tj| td | 	 }	| 
|	 W d   n1 sAw   Y  tj| td | j| d }
|sa| |
p`|}W d   n1 skw   Y  tj|8 td | | j| _|p|dko| j }| j r| jdu r|   |   | j| _|   W d   n1 sw   Y  |r|  |  |  \}}|d }|d }d}td| d| j| d  d	| j| d   tj|! td | jr| j s| | j}d}nd}W d   n	1 sw   Y  tj|h td | jrw| j sw|swd}| jjdkrDtd| jj| jj n| jj}t| jj | | jj}|| jj  }|| j_!| | j}|| jjkrrd| j_"|# }|| j_ n|r}d}nd}W d   n	1 sw   Y  tj| td |  |r| $| j| W d   n	1 sw   Y  tj|f td |r| jj"rd}|% }|rtj&dd
tj'dntj(dd
tj'd}| j)*|t+j,j-.  |/ | j0kr| $| j| | jr| j s| j1| j n| j| _d| _d}d}W d   n	1 s(w   Y  q#)z%A scheduler loop for pd multiplexing.Fr   r"   z*Starting event loop for pd multiplexing...TNzAdjusting stream groups: z, prefill sm: z, decode sm: cpu)devicedtype)2r   r   torchcudaempty_cacher   debugstreamr   recv_requestsprocess_input_requestsr   r<   update_running_batchr$   r%   r   check_memorycheck_tree_cacheinit_new_token_rationew_token_ratiomaybe_sleep_on_idlesynchronizer4   	run_batchextend_num_tokensr)   r   split_forward_token_budgetmodel_confignum_hidden_layersr*   split_indexsplit_forward_countsplit_prefill_finishedrecord_eventprocess_batch_resultqueryonesint32zerostp_cpu_group	allreducedistReduceOpSUMwaititemtp_sizemerge_batch)r   decode_doneprefill_donewait_prefill_kernel_doneadjust_stream_groupr3   stream_groupprefill_streamdecode_stream	recv_reqsr5   decode_resultforward_countnext_split_indexprefill_resultprefill_exe_doneprefill_exe_done_flagflagsr   r   r   event_loop_pdmux_   s   


(	


%z(SchedulerMultiplexMixin.event_loop_pdmuxN)r   r   )r   r   r    r!   )r   r   r5   r6   r    r7   )	__name__
__module____qualname__r   r4   r<   r@   inference_modert   r   r   r   r   r       s    


 r   ) __doc__
__future__r   loggingtypingr   r   r@   torch.distributeddistributedr^   torch.cuda.streamsr   %sglang.srt.distributed.parallel_stater   ,sglang.srt.model_executor.forward_batch_infor   "sglang.srt.multiplex.pdmux_contextr   r	   r
   r   r   r   "sglang.srt.managers.schedule_batchr   sglang.srt.managers.schedulerr   	getLoggerru   r   r   r   r   r   r   <module>   s     	
