o
    پi%                     @  s   d dl mZ d dlmZ d dlmZmZmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ erRd dlmZ d dlmZ ej ZeG dd dZ	d)d*ddZd+d%d&ZG d'd( d(Z dS ),    )annotations)	dataclass)TYPE_CHECKINGCallableOptionalN)TboDPAttentionPreparer)get_tp_group)envs)ScheduleBatch)DPCooperationInfo)ForwardMode)require_mlp_tp_gather)GroupCoordinator)	Schedulerc                   @  s   e Zd ZU ded< ded< ded< ded< ded< ded< ded	< ded
< ded< dZded< dZded< dZded< dZded< dZded< dZ	ded< e
jfdddZe
jfdddZd ddZdS )!MLPSyncBatchInfointdp_sizetp_sizecp_size
num_tokensnum_tokens_for_logprobboolcan_cuda_graphis_extend_in_batchlocal_can_run_tbolocal_forward_modeNtorch.Tensortp0_infoz	list[int]global_num_tokensglobal_num_tokens_for_logprobtbo_split_seq_indexglobal_forward_modezOptional[DPCooperationInfo]dp_cooperation_inforeturnc                 C  s4   t j| j| jt| jt| jt| j| jg||dS )Ndevicedtype)	torchtensorr   r   r   r   r   r   r   selfr%   r&    r+   _/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/scheduler_dp_attn_mixin.py_get_local_tensor-   s   z"MLPSyncBatchInfo._get_local_tensorc                 C  s    t jdddddtjjg||dS )Nr      r$   )r'   r(   r   IDLEvaluer)   r+   r+   r,   _get_fallback_tensor;   s   z%MLPSyncBatchInfo._get_fallback_tensorgrouptorch.distributed.ProcessGroupc                 C  s8  | j |d}tj| j| j| j dftj|d}tjj|	 ||d |dkr+t
 j}nt
 j}|| j| j | j d}| j|d||dk< |d d dd d f }|| _|d d df  | _|d d df  | _t|d d df   | _t|d d d	f   | _trt|d d d
f  | _d S d S )N)r%      )r&   r%   )r2   cpur   r.            )r-   r'   emptyr   r   r   int64distributedall_gather_into_tensorflattenr   active_ranks_cpuactive_ranksviewr1   r   tolistr   r   r   minitemr   maxr   _ENABLE_METRICS_DP_ATTENTIONr   creater"   )r*   r%   r2   local_info_tensorglobal_info_tensortp_active_rankstp_infor   r+   r+   r,   
all_gatherI   s2   
 zMLPSyncBatchInfo.all_gather)r#   r   )r2   r3   )__name__
__module____qualname____annotations__r   r   r   r    r!   r"   r'   r:   r-   r1   rK   r+   r+   r+   r,   r      s&   
 r   Fbatchr
   mlp_sync_infor   r   c                 C  sR   |s|j g| _|jg| _n|j| _|j| _|s#|j| _|j| _|j| _|j| _d S N)	r   r   r   r   r   r    r!   r   can_run_dp_cuda_graph)rP   rQ   r   skip_all_gatherr+   r+   r,   _update_gather_batchi   s   
rU   local_batchr   r   attn_tp_sizeattn_cp_sizetp_groupr   get_idle_batchCallable[[], ScheduleBatch]disable_cuda_graphdisable_overlap_scheduleoffload_tagsset[str]c
                 C  s  | d u s	| j  rd}
d}n(| j  r|  }
|
}n| j}
tdd t| j| jD }| j	s6||  ks6J t
j }| d u pH| j  pH| j  oK| }| rS| j  nd}| d ur\|| _t }t|	dkrs|slt
j rs|j}|j}n|j}d}|| \}}t||||
|||||d	}|s|j||d ||jd d dd	f \|_|_|pt|jdk}|r| }| d u r|  }} n| j  r|  }| _t |||| t!r| d ur|j"| _"| S )
Nr   c                 s  s"    | ]\}}t || d V  qdS )r.   N)rD   ).0logprob_start_len
extend_lenr+   r+   r,   	<genexpr>   s
    
z-prepare_mlp_sync_batch_raw.<locals>.<genexpr>Fr5   )	r   r   r   r   r   r   r   r   r   )r%   r2      r4   )#forward_modeis_prebuilt	is_decode
batch_sizeextend_num_tokenssumzipextend_logprob_start_lensextend_lensreturn_logprobr	    SGLANG_SCHEDULER_SKIP_ALL_GATHERgetis_decode_or_idle	is_extendr   r   len6SGLANG_NCCL_ALL_GATHER_IN_OVERLAP_SCHEDULER_SYNC_BATCHdevice_groupr%   	cpu_groupprepare_all_gatherr   rK   compute_outputr   r    r!   rD   r   inner_idle_batchrU   rE   r"   )rV   r   rW   rX   rY   rZ   r\   r   r]   r^   r   r   rT   r   r   tbo_preparerr2   r%   r   r   rQ   need_idle_batchbatch_to_gatherr+   r+   r,   prepare_mlp_sync_batch_raw   s   
	


r}   c                   @  s.   e Zd ZdddZ	ddddZdddZdS )SchedulerDPAttnMixinr*   r   rV   r
   c                 C  s8   t || jj| j| j| j| j| jjt| j| jj	| j
d
S )N)	r   rW   rX   rY   rZ   r\   r   r]   r^   )r}   server_argsr   rW   rX   rY   rZ   r\   r   r]   r^   )r*   rV   r+   r+   r,   prepare_mlp_sync_batch   s   z+SchedulerDPAttnMixin.prepare_mlp_sync_batchNrP   Optional[ScheduleBatch]	need_syncOptional[bool]r#   c                 C  s"   |dur|rn| j r| |}|S )a  
        Helper to prepare MLP sync batch for DP attention.
        Should be called after get_new_batch_prefill().

        Args:
            batch: The batch to process
            need_sync: If specified, overrides self.require_mlp_sync for prepare_mlp_sync_batch decision
        N)require_mlp_syncr   )r*   rP   r   r+   r+   r,   maybe_prepare_mlp_sync_batch   s   
z1SchedulerDPAttnMixin.maybe_prepare_mlp_sync_batchc              	   C  s.   t g | j| j| j| j| j| j}|  |S rR   )	r
   init_newreq_to_token_pooltoken_to_kv_pool_allocator
tree_cachemodel_configenable_overlapspec_algorithmprepare_for_idle)r*   
idle_batchr+   r+   r,   rZ     s   	z#SchedulerDPAttnMixin.get_idle_batch)r*   r   rV   r
   rR   )r*   r   rP   r   r   r   r#   r   )r*   r   r#   r
   )rL   rM   rN   r   r   rZ   r+   r+   r+   r,   r~      s
    
r~   )F)rP   r
   rQ   r   r   r   )rV   r
   r   r   rW   r   rX   r   rY   r   rZ   r[   r\   r   r   r   r]   r   r^   r_   )!
__future__r   dataclassesr   typingr   r   r   r'   *sglang.srt.batch_overlap.two_batch_overlapr   %sglang.srt.distributed.parallel_stater   sglang.srt.environr	   "sglang.srt.managers.schedule_batchr
   sglang.srt.metrics.collectorr   ,sglang.srt.model_executor.forward_batch_infor   sglang.srt.utils.commonr   r   sglang.srt.managers.schedulerr   "SGLANG_ENABLE_METRICS_DP_ATTENTIONrp   rE   r   rU   r}   r~   r+   r+   r+   r,   <module>   s*    
T
a