o
    پiJ                    @   s  U d Z ddlZddlZddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZ dd	lmZ ddlZddlZdd
lmZmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 e. Z3e* Z4e0 Z5e- Z6edg dZ7e8ej9j:j;Z<eG dd dZ=eG dd dZ>dee?eej@ef f deeee?ef  eej@ f fddZAi ZBee?e8f eCd< de?de?fddZDi ZEee?eg ed f f eCd< dd!d"ZFe2d#gd$e  d#ej@d%e?ddfd&d'ZGe2d#d(d#ej@d%e?d)e?dej@fd*d+ZHe2d,gd$d,ej@d-ej@d%e?ddfd.d/ZIe2d,gd$d,ej@d-ej@d%e?ddfd0d1ZJG d2d dZKdaLeeK eCd3< deKfd4d5ZMd6ee8 d7e8d8e?deKfd9d:ZN			;			<	dd=eee8  d7e8d8e?d>eeO d?eeO d@eOd%ee? dAeeO dBeOdCeeO deKfdDdEZPdaQeeK eCdF< daReeK eCdG< daSeeK eCdH< daTeeK eCdI< d;aUeOeCdJ< dKeOfdLdMZVdeKfdNdOZWdeKfdPdQZXdeKfdRdSZYdaZeeK eCdT< da[eeK eCdU< da\eeK eCdV< deKfdWdXZ]deKfdYdZZ^deKfd[d\Z_eWZ`daaeeK eCd]< deKfd^d_ZbebZcd`da Zde	ddbeejejf fdcddZgeheiZjd<akd;ald;amdeeOfdfdgZndeeOfdhdiZodeeOfdjdkZp	l	l	m	l	n	ddoe8dpe8dqe?d7e8d8e?dree8 fdsdtZq	u	u	u	u	u	u		;ddve8dwe8dxe8dye8dze8d{e8d8ee? d|eOddfd}d~Zr	dd=ee8 d8e?deej9j fddZs	ddve8dwe8dxe8d8ee? ddf
ddZtdd Zud;ave	deKfddZwdd Zxdd Zydd Zzdd Z{dd Z|dd Z}dd Z~dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZddeOfddZddede8deeO fddZdadadaddeOfddZdS )a  Distributed state.
It takes over the control of the distributed environment from PyTorch.
The typical workflow is:

- call `init_distributed_environment` to initialize the distributed environment.
- call `initialize_model_parallel` or `ensure_model_parallel_initialized` to
 initialize the model parallel groups.

- any code dealing with the distributed stuff

- call `destroy_model_parallel` to destroy the model parallel groups.
- call `destroy_distributed_environment` to destroy the distributed environment.

If you only need to use the distributed environment without model/pipeline
 parallelism, you can skip the model parallel initialization and destruction
 steps.
    N)
namedtuple)contextmanagernullcontext)	dataclass)	timedelta)shared_memory)AnyCallableDictListOptionalTupleUnion)patch)BackendProcessGroup)register_split_op)is_in_piecewise_cuda_graph)envs)get_bool_env_varget_current_device_stream_fastget_int_env_varget_local_ip_autois_cpuis_cuda_alikeis_hipis_musais_npuis_shm_availableis_xpu)register_custom_opTensorMetadata)devicedtypesizec                   @   s   e Zd ZU e jed< dS )GraphCaptureContextstreamN)__name__
__module____qualname__torchget_device_moduleStream__annotations__ r.   r.   Y/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/distributed/parallel_state.pyr%   H   s   
 r%   c                   @   s,   e Zd ZU eejj ed< eej ed< dS )P2PWorkworkpayloadN)	r'   r(   r)   r   r*   distributedWorkr-   Tensorr.   r.   r.   r/   r0   M   s   
 r0   tensor_dictreturnc              	   C   sl   g }g }|   D ])\}}t|tjr*|jj}||t||j|	 f || q|||f q||fS )zSplit the tensor dictionary into two parts:
    1. A list of (key, value) pairs. If the value is a tensor, it is replaced
         by its metadata.
    2. A list of tensors.
    )
items
isinstancer*   r5   r"   typeappendr!   r#   r$   )r6   metadata_listtensor_listkeyvaluer"   r.   r.   r/   _split_tensor_dictS   s   r@   _group_name_counternamec                 C   s6   | t vrdt | < |  dt |   }t |   d7  < |S )z|Get a unique name for the group.
    Example:
    _get_unique_name("tp") -> "tp:0"
    _get_unique_name("tp") -> "tp:1"
    r   :   )rA   )rB   newnamer.   r.   r/   _get_unique_namep   s
   rF   GroupCoordinator_groupsgroupc                 C   s   t | t| j< d S N)weakrefrefrH   unique_namerI   r.   r.   r/   _register_group   s   rO   tensor)mutates_args
group_namec                 C   sH   |t v sJ d| dt |  }|d u rtd| d||  d S NzGroup z is not found.z is destroyed.)rH   
ValueError_all_reduce_in_place)rP   rR   rI   r.   r.   r/   inplace_all_reduce   s
   
rV   )	out_shapeoutplace_all_reduce_methodc                 C   sF   |t v sJ d| dt |  }|d u rtd| d|| |S rS   )rH   rT   _all_reduce_out_place)rP   rR   rX   rI   r.   r.   r/   outplace_all_reduce   s
   
rZ   outputinputc                 C   J   |t v sJ d| dt |  }|d u rtd| d|| | d S rS   )rH   rT   _all_gather_into_tensorr[   r\   rR   rI   r.   r.   r/   reg_all_gather_into_tensor   
   
r`   c                 C   r]   rS   )rH   rT   _reduce_scatter_tensorr_   r.   r.   r/   reg_reduce_scatter_tensor   ra   rc   c                   @   s  e Zd ZU dZeed< ee ed< eed< eed< eed< eed< eed< eed	< eed
< eed< eed< eed< e	e
 ed< e	e
 ed< e	e
 ed< e	e
 ed< dddeddfdeee  dedeeef d	ed
ededededededede	e dedefddZd d! Zed"d# Zed$d% Zed&d' Zed(d) Zed*d+ Zed,d- Ze		d}d.e	e d/e	ejj fd0d1Zd2ejd3ejfd4d5Zd2ejd6ed3ejfd7d8Z d2ejd3dfd9d:Z!d;ejd<ejd3ejfd=d>Z"d;ejd<ejfd?d@Z#d;ejdAeej d3dfdBdCZ$		d}d2ejd;e	ej dDe	ee  d3ejfdEdFZ%d;ejd<ejfdGdHZ&d;ejd<ejfdIdJZ'	d~d;ejd<ejfdKdLZ(	M	dd2ejdNedOe	eej  d3ejfdPdQZ)	d~d2eejeej f dDe	ee  d3eejeej f fdRdSZ*	Mdd2ejdUedNed3e	ej fdVdWZ+dd2ejdXefdYdZZ,dd[e	e
 dXefd\d]Z-	dd^ee
 dXed_e	e fd`daZ.d[e
d3ee
 fdbdcZ/	dd[e
dUedded3ee0 fdedfZ1dXed3e
fdgdhZ2		T		ddie	e3eeeje
f f  dXed_e	e dje	e d3e	e3eeeje
f f  f
dkdlZ4			ddie3eeeje
f f dUe	e dme	d  dded3e	ee0  f
dndoZ5		d}dXe	e dme	d  d3e	e3eeeje
f f  fdpdqZ6drds Z7d~dtejdUe	e d3dfdudvZ8	d~dwej9dxej:dXe	e d3ejfdydzZ;d{d| Z<dS )rG   a  
    PyTorch ProcessGroup wrapper for a group of processes.
    PyTorch ProcessGroup is bound to one specific communication backend,
        e.g. NCCL, Gloo, MPI, etc.
    GroupCoordinator takes charge of all the communication operations among
        the processes in the group. It can route the communication to
        a specific implementation (e.g. switch allreduce implementation
        based on the tensor size and cuda graph mode).
    rankranks
world_size
local_rankrank_in_group	cpu_groupdevice_group
use_pynccluse_pymscclppuse_custom_allreduceuse_torch_symm_mem_all_reduceuse_message_queue_broadcasterpynccl_commca_commtorch_symm_mem_commmq_broadcasterFNi   secondsgroup_rankstorch_distributed_backenduse_hpu_communicatoruse_xpu_communicatoruse_npu_communicatorrR   pynccl_use_current_streamgloo_timeoutc           %   
   C   s  |pd}t || _t|  tj | _|| _d | _d | _	t
dd| _t r8tj r,dn|}td| | _n*trDtd| | _ntrPtd| | _ntr\td| | _ntd| _t| j| _|D ]j}tjt|tj| jd	}tjt|tjd
}d|v rddlm} tjj|d||d}tjj|d||d}ntjj||d}tjj|d|d}| j|v r|| _t|| _|| j| _|| _|| _	|| _ || _!qk| j	d usJ | jd usJ || _"|| _#|| _$|| _%|| _&|| _'|	| _(|
| _)|| _*ddl+m,} ddl-m.} ddl/m0} ddl1m2}m3} ddl4m5} ddl6m7} || _2|| _3|| _7t8 r:ddl9m:}m;} d | _<|rP| jdkrP|| j	| j|d| _<d | _=|re| jdkre|| j	| jd| _=d | _>d | _?|r| jdkrz| }|| j	| jd| _>W n t@y }  ztABd|  d W Y d } ~ nd } ~ ww t8 rz| r|| j	| jd| _?W n+ t@y }  ztABd|   W Y d } ~ nd } ~ ww n| jdkrt8 rtACd d | _D| j&r| jdkr|| j	| jd| _Ddd lEmF}! dd!lGmH}" dd"lImJ}# d | _K|r| jdkr|!| jd#| _Kd | _L|	r,| jdkr,|#| jd#| _Ld | _M|
r?| jdkr?|"| jd#| _Mdd$lNmO}$ d | _P|r\| jdkr^|$Q| j	d%d&| _Pd S d S d S )'N	anonymous
LOCAL_SIZEr   zcuda:znpu:zxpu:zmusa:cpur#   r"   r#   mooncake)MooncakeBackendOptions)backend
pg_optionszmooncake-cpu)r   gloo)r   timeout)dispatch_custom_allreduce)PyMscclppCommunicator)PyNcclCommunicator)is_symmetric_memory_enableduse_symmetric_memory)TorchSymmMemCommunicator)is_allocation_symmetric)QuickAllReduceqr_rocm_arch_availablerD   )rI   r"   use_current_stream)rI   r"   z#Setup Custom allreduce failed with zJ. To silence this warning, specify --disable-custom-all-reduce explicitly.z%Failed to initialize QuickAllReduce: z4[AR] All-reduce call path: NCCL (custom AR disabled))HpuCommunicator)NpuCommunicator)XpuCommunicatorrN   )MessageQueuei  @    )RrF   rM   rO   r*   r3   get_rankrd   rg   rj   ri   r   
local_sizer   r   %SGLANG_ONE_VISIBLE_DEVICE_PER_PROCESSgetr"   _is_npu_is_xpu_is_musar+   device_moduleoneslenint32mooncake.epr   	new_groupre   rf   indexrh   active_ranksactive_ranks_cpurk   r{   rl   rm   rn   rx   ry   rz   ro   =sglang.srt.distributed.device_communicators.custom_all_reducer   5sglang.srt.distributed.device_communicators.pymscclppr   2sglang.srt.distributed.device_communicators.pyncclr   <sglang.srt.distributed.device_communicators.pynccl_allocatorr   r   :sglang.srt.distributed.device_communicators.torch_symm_memr   sglang.srt.layers.dp_attentionr   r   <sglang.srt.distributed.device_communicators.quick_all_reducer   r   rp   pymscclpp_commrq   qr_comm	Exceptionloggerwarninginforr   <sglang.srt.distributed.device_communicators.hpu_communicatorr   <sglang.srt.distributed.device_communicators.npu_communicatorr   <sglang.srt.distributed.device_communicators.xpu_communicatorr   hpu_communicatorxpu_communicatornpu_communicator9sglang.srt.distributed.device_communicators.shm_broadcastr   rs   create_from_process_group)%selfrv   rg   rw   rk   rl   rm   rn   rx   ry   rz   ro   rR   r{   r|   	device_idre   r   r   r   rj   ri   r   r   r   r   r   r   r   r   r   CAClasser   r   r   r   r.   r.   r/   __init__   s  





zGroupCoordinator.__init__c                 C   sL   d| j  d| j d| j d| j d| j d| j d| j d| j d	| j S )
Nzranks=z rank=z local_rank=z use_pynccl=z device_group=z cpu_group=z unique_name=z world_size=z rank_in_group=)	re   rd   rg   rk   rj   ri   rM   rf   rh   r   r.   r.   r/   __repr__  s   "zGroupCoordinator.__repr__c                 C   
   | j d S )z8Return the global rank of the first process in the groupr   re   r   r.   r.   r/   
first_rank     
zGroupCoordinator.first_rankc                 C   r   )z7Return the global rank of the last process in the groupr   r   r.   r.   r/   	last_rank  r   zGroupCoordinator.last_rankc                 C      | j | jkS )z;Return whether the caller is the first process in the group)rd   r   r   r.   r.   r/   is_first_rank     zGroupCoordinator.is_first_rankc                 C   r   )z:Return whether the caller is the last process in the group)rd   r   r   r.   r.   r/   is_last_rank  r   zGroupCoordinator.is_last_rankc                 C   s   | j }| j}| j|d |  S )z=Return the global rank of the process that follows the callerrD   rh   rf   re   r   rh   rf   r.   r.   r/   	next_rank     zGroupCoordinator.next_rankc                 C   s   | j }| j}| j|d |  S )z>Return the global rank of the process that precedes the callerrD   r   r   r.   r.   r/   	prev_rank  r   zGroupCoordinator.prev_rankgraph_capture_contextr&   c           
      c   sT   |d u r|d u r| j  }t|}n|j}| j}|d u r t n| }t }||kr0|| | j |j |O | j	}|sCt }n|j
dt d}| j}|sTt }	n|j
dd}	| |	 |V  W d    n1 smw   Y  W d    n1 s|w   Y  W d    n1 sw   Y  W d    d S W d    d S 1 sw   Y  d S )NTenabler&   r   )r   r,   r%   r&   rq   r   capturer   wait_streamrp   change_stater   )
r   r   r&   rq   maybe_ca_contextcurr_streamrp   maybe_pynccl_contextr   maybe_pymscclpp_contextr.   r.   r/   graph_capture  s:   


 PzGroupCoordinator.graph_captureinput_r7   c                 C   sV  | j dkr|S tj rtj }ntj }t o|}|rJ|jsJ| jdurJ|	 |
  }|| jjk r<| jj|ddS | j| | jj|ddS |jrkt|j| j | jr`tjj|t |S tjj|| jd |S | jdurz| jjsz| j|S | jdur| jjs| j|S | jdur| jjs| j|S | jdur|  r| jjdt  d | j| |W  d   S 1 sw   Y  d}| jdur| jjs| j!|rd}n?| j"dur| j"js| j"#|rd	}n-| j$dur| j$js| j$%|rd
}n| j&dur| j&js| j&'|rd}nt( rd}|dur"t)|| j*|dS t+|| j*d |S )as  
        User-facing all-reduce function before we actually call the
        all-reduce operation.

        We need this because Dynamo does not support passing an arbitrary
        object (`self` in this case) to a custom op. We need to pass the
         group name as a string, and then look up the group coordinator from
         the group name, dispatch the all-reduce operation to the group
         coordinator.

        In addition, PyTorch custom ops do not support mutation or returning
        a new tensor in the same op. So we need to figure out if the op is
        in-place or out-of-place ahead of time.
        rD   NF)
registeredTrN   r   caqr	pymscclpptorch_symm_mempynccl)rR   rX   rR   ),rf   r   SGLANG_USE_1STAGE_ALLREDUCEis_setr   %SGLANG_ENABLE_DETERMINISTIC_INFERENCEr   r   rq   numelelement_sizemax_sizedeterministic_all_reduceregister_bufferr   r#   r   r*   ops
sgl_kernelshm_allreduceREDUCE_OP_SUMr3   
all_reducerj   r   disabledr   r   rp   r   r   r   should_custom_arr   should_quick_allreducer   should_mscclpp_allreducerr   should_torch_symm_mem_allreducer   rZ   rM   rV   )r   r   use_1stage_aruse_deterministic_arinp_sizerX   r.   r.   r/   r     s   



 







zGroupCoordinator.all_reducerX   c           	      C   s   | j }| j}| j}| j}| j}t|||||gsJ |dkr)|jr#J ||}nN|dkr8|jr2J ||}n?|dkrG|jrAJ |	|}n0|dkrV|jrPJ |	|}n!|dkrw|j
dt d ||}W d    n1 srw   Y  |d us}J |S )Nr   r   r   r   r   Tr   )rq   r   r   rr   rp   anyr   custom_all_reducequick_all_reducer   r   r   rZ   )	r   r   rX   rq   r   r   rr   rp   outr.   r.   r/   rY   u  s4   



z&GroupCoordinator._all_reduce_out_placec                 C   sZ   | j }| j}|d ur|js|| d S |d ur"|js"|| d S tjj|| jd d S NrN   )rp   rr   r   r   r*   r3   rj   )r   r   rp   rr   r.   r.   r/   rU     s   z%GroupCoordinator._all_reduce_in_placer[   r\   c                 C   sv   | j }|d ur/|jr|  r/|jdt d ||| W d    |S 1 s(w   Y  |S tjj||| j	d |S NTr   rN   )
rp   r   r   r   r   reduce_scatterr*   r3   reduce_scatter_tensorrj   r   r[   r\   rp   r.   r.   r/   rb     s$   
z'GroupCoordinator._reduce_scatter_tensorc                 C   s(   t r
| || d S t||| jd d S Nr   )r   rb   rc   rM   r   r[   r\   r.   r.   r/   r    s   z&GroupCoordinator.reduce_scatter_tensor
input_listc                 C   s   t jj||| jd |S r   )r*   r3   r  rj   )r   r[   r  r.   r.   r/   r    s   zGroupCoordinator.reduce_scattersizesc                 C   s  | j }| j}|jdt dl |d ur|jrJ d|d ur7t||ks&J |jd t|ks1J || j }n|jd | dksBJ |jd | }|f|jdd   }|d u rbt	j
||j|jd}n|j|ksiJ |j|||d |W  d    S 1 s}w   Y  d S )NTr   z&pynccl is required for reduce_scattervr   rD   r   r  )rf   rp   r   r   r   r   shapesumrh   r*   emptyr#   r"   r  )r   r   r[   r  rf   rp   
chunk_sizeoutput_shaper.   r.   r/   reduce_scatterv  s.   
$z GroupCoordinator.reduce_scattervc                 C   sv   | j }|d ur/|jr|  r/|jdt d ||| W d    d S 1 s(w   Y  d S tjj||| j	d d S r   )
rp   r   r   r   r   
all_gatherr*   r3   all_gather_into_tensorrj   r  r.   r.   r/   r^     s   "
z(GroupCoordinator._all_gather_into_tensorc                 C   s,   t str| || d S t||| jd d S r  )r   r   r^   r`   rM   r  r.   r.   r/   r    s   z'GroupCoordinator.all_gather_into_tensorc                 C   sP   |dusJ d| d| j }|du s|jr| || dS |j|||d dS )a  
        Implement an asynchronous `allgather` operation on a specified stream.
        (the default `torch.distributed.all_gather_into_tensor` will trigger event synchronization),
        eliminating the CPU-side launch-kernel blocking issue caused by synchronization problems.
        The specific implementation uses the interface provided by pynccl to remove the synchronization logic of events.
        NzInvalid params stream (zQ, Please specify the stream to use when calling cp_all_gather_into_tensor_async.)r&   )rp   r   r  cp_all_gather_into_tensor)r   r[   r\   r&   rp   r.   r.   r/   cp_all_gather_into_tensor_async  s   


z0GroupCoordinator.cp_all_gather_into_tensor_asyncr   dimoutput_tensor_listc           
      C   s  | j }|dkr|d urtd |d | d S |S |d ur)tjj||| jdS |  |  kr8| k sEn J d| d|	  | j
}|d urU|jsU|||S | j}|d ure|jse|||S |dk ro|| 7 }|	 }|d | f|dd   }| j| |   d tj||j|jd}	W d    n1 sw   Y  |jrt|j| j | jrtjj||S tjj|	|| jd n| |	| |	|f| }	|	d|}	|	|d | |||  f ||d d   }	|	S )	NrD   z|Performing in-place all-gather with a group size of 1. This may be unnecessary; consider bypassing it for better efficiency.r   rN   Invalid dim () for input tensor with shape r   r   )rf   r   r   copy_r*   r3   r  rj   r  r$   r   r   r   r   r   r  r#   r"   r   r   r   r   r   shm_allgatherr  reshapemovedim)
r   r   r  r  rf   hpu_commnpu_comm
input_sizeoutput_sizeoutput_tensorr.   r.   r/   r    s\   "

(zGroupCoordinator.all_gatherc                    s   j  j}|jdt di |dur|jrJ d	d
dtjdttt	  f fdd}t
|tjr6|g}g }g }|D ]}|||d	\}}	|| ||	 q<|  t|D ]\}
}|j||
 |||
 d	 qY|  |W  d   S 1 szw   Y  dS )z
        Supports varying sizes per rank and input tensor list.
        `sizes`: a list of len(world_size) with the number of items per rank to gather.
        Tr   Nz"pynccl is required for all_gathervr   r  c                    s   |   } d ur5t ksJ | jd  j ksJ t f|dd   }t fdd D r4d  n|d  f|dd   }j d ud tj|| j	| j
d}W d    | fS 1 scw   Y  | fS )Nr   rD   c                 3   s    | ]	}| d  kV  qdS r   Nr.   ).0sr  r.   r/   	<genexpr>m  s    zTGroupCoordinator.all_gatherv.<locals>._all_gather_allocate_output.<locals>.<genexpr>r  r   )r$   r   r	  rh   r
  allr   r*   r  r#   r"   )r   r  r  r   r!  r   rf   r  r/   _all_gather_allocate_outputd  s"   

zAGroupCoordinator.all_gatherv.<locals>._all_gather_allocate_outputr  rJ   )rf   rp   r   r   r   r*   r5   r   r   intr9   r;   group_start	enumerater  	group_end)r   r   r  rp   r(  output_list	size_listinpr!  r$  ir.   r'  r/   all_gathervQ  s8   	

$zGroupCoordinator.all_gathervr   dstc                    s   | j }|dkr	 S    |  kr  k s%n J d| d   |dk r/|  7 }| jdurB| jjsB| j | j||S | j|krS fddt|D }nd}tj	j || j
| | jd | j|krqtj||d	}|S d}|S )
z
        NOTE: We assume that the input tensor is on the same device across
        all the ranks.
        NOTE: `dst` is the local rank of the destination rank.
        rD   r  r  r   Nc                    s   g | ]}t  qS r.   )r*   
empty_liker#  _r   r.   r/   
<listcomp>  s    z+GroupCoordinator.gather.<locals>.<listcomp>)r2  rI   r  )rf   r  r$   r   r   gatherrh   ranger*   r3   re   rj   cat)r   r   r2  r  rf   gather_listr!  r.   r6  r/   r9    s*   "

zGroupCoordinator.gathersrcc                 C   F   || j k sJ d| d| j dkr|S tjj|| j| | jd |S )z^Broadcast the input tensor.
        NOTE: `src` is the local rank of the source rank.
        Invalid src rank ()rD   r=  rI   )rf   r*   r3   	broadcastre   rj   )r   r   r=  r.   r.   r/   rB    s   
zGroupCoordinator.broadcastobjc                 C   s   || j k sJ d| d| j dkr|S | jdur'|dks!J d| j|S | j|kr<tjj|g| j| | jd |S dg}tjj|| j| | jd |d S )z^Broadcast the input object.
        NOTE: `src` is the local rank of the source rank.
        r?  r@  rD   Nr   z-Message queue broadcaster only supports src=0rA  )	rf   rs   broadcast_objectrh   r*   r3   broadcast_object_listre   ri   )r   rC  r=  recvr.   r.   r/   rD    s    


z!GroupCoordinator.broadcast_objectobj_listrI   c                 C   r>  )zcBroadcast the input object list.
        NOTE: `src` is the local rank of the source rank.
        r?  r@  rD   rA  )rf   r*   r3   rE  re   rj   )r   rG  r=  rI   r.   r.   r/   rE    s   
z&GroupCoordinator.broadcast_object_listc                 C   s$   d g| j  }tjj||| jd |S r   )rf   r*   r3   all_gather_objectri   )r   rC  objsr.   r.   r/   rH    s   z"GroupCoordinator.all_gather_object
async_sendc           
      C   s   || j k sJ d| d|| jksJ d|rtjjntjj}tjt|tj	d}tj
| gtjdd}g }||| j| | jd}|rN|t|| ||| j| | jd}	|rc|t|	| |S )a  
        Send the input object list to the destination rank.
        This function uses the CPU group for all communications.

        TODO: If you want to use GPU communication, please add a new argument (e.g., data_group, group),
        use other functions (e.g., send), or implement a new function (e.g., send_object_device).

        NOTE: `dst` is the local rank of the destination rank.
        Invalid dst rank (r@  zKInvalid destination rank. Destination rank is the same as the current rank.r   r   r   rN   )rf   rh   r*   r3   isendsend
frombufferpickledumpsuint8rP   r   longre   ri   r;   r0   )
r   rC  r2  rJ  	send_funcobject_tensorsize_tensorp2p_work	size_workobject_workr.   r.   r/   send_object  s2   zGroupCoordinator.send_objectc                 C   s   	 || j k sJ d| d|| jksJ dtjdtjdd}tjj|| j| | jd}|	  tj|
 tjdd}tjj|| j| | jd}|	  t| }|S )z3Receive the input object list from the source rank.r?  r@  zAInvalid source rank. Source rank is the same as the current rank.rD   r   r   rA  )rf   rh   r*   r  rR  r3   irecvre   ri   waititemrQ  rO  loadsnumpy)r   r=  rU  r1   rT  rC  r.   r.   r/   recv_object  s*   zGroupCoordinator.recv_objectr6   metadata_groupc                 C   s  t j r
| jdkr|S | j}| j}|| jk sJ d| d| j}||krg }t|ts6J dt	| t
|\}}| j||d g }|D ],}	|	 dkrPqG|	jrat jj|	| j| |dd}
nt jj|	| j| |dd}
||
 qG|D ]}|  qv|S | jd	|d}i }g }|D ]K\}}t|trt j|j|j|jd
}	|	 dkr|	||< q|	jrt jj|	| j| |dd}
nt jj|	| j| |dd}
||
 |	||< q|||< q|D ]}|  q|S )ziBroadcast the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.
        rD   r?  r@  Expecting a dictionary, got r=  r   T)r=  rI   async_opNr   )r*   r3   is_initializedrf   rj   ri   rh   r9   dictr:   r@   rD  r   r   rB  re   r;   r[  r!   r  r$   r#   r"   )r   r6   r=  rI   r`  rh   r<   r=   async_handlesrP   handleasync_handler>   r?   r.   r.   r/   broadcast_tensor_dict>  sr   
"




z&GroupCoordinator.broadcast_tensor_dictall_gather_groupc                 C   s6  | j dkr|S |du rdn|j }|du rdn|j}| j}| j}|du r+| jd | j  }|| j k s8J d| dt|tsFJ dt| t|\}	}
|rRtj	j
ntj	j}| j|	||d}|
D ]8}| dkriq`|dur}| | dkr}||d| }|jr|n|}||| j| |d	}|r|t|| q`|S )
zdSend the input tensor dictionary.
        NOTE: `dst` is the local rank of the source rank.
        rD   Nr   rK  r@  ra  )r2  rJ  r   rN   )rf   rh   rj   ri   r9   re  r:   r@   r*   r3   rL  rM  rY  r   r  r   re   r;   r0   )r   r6   r2  rj  rJ  all_gather_sizeall_gather_rankrI   r`  r<   r=   rS  	p2p_worksrP   
comm_groupr1   r.   r.   r/   send_tensor_dict  s:   
z!GroupCoordinator.send_tensor_dictc                 C   s`  t j r
| jdkrdS |du rdn|j}|du rdn|j}| j}| j}|du r0| jd | j }|| jk s=J d| d| j|d}i }|D ]f\}	}
t|
t	rt j
|
j|
j|
jd}| dkrf|||	< qG|duoq| | dk}|r|j}||d| }|jr|n|}t jj|| j| |d	}|  |r|j|dd
}||}|||	< qG|
||	< qG|S )zdRecv the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.
        rD   Nr   r?  r@  rb  r   r   rA  r8  )r*   r3   rd  rf   rh   rj   ri   r_  r9   r!   r  r$   r#   r"   r   r	  r  r   rZ  re   r[  r  )r   r=  rj  rk  rl  rI   r`  recv_metadata_listr6   r>   r?   rP   use_all_gather
orig_shapern  r1   r.   r.   r/   recv_tensor_dict  sF   	



z!GroupCoordinator.recv_tensor_dictc                 C   s   t jj| jd dS )a+  Barrier synchronization among the group.
        NOTE: don't use `device_group` here! `barrier` in NCCL is
        terrible because it is internally a broadcast operation with
        secretly created GPU tensors. It is easy to mess up the current
        device. Use the CPU group instead.
        rN   N)r*   r3   barrierri   r   r.   r.   r/   rt    s   zGroupCoordinator.barrierrP   c                 C   sZ   	 |du r| j d | j }| j}|dur|js||| dS tj|| j| | j dS )z<Sends a tensor to the destination rank in a non-blocking wayNrD   )	rh   rf   rp   r   rM  r*   r3   re   rj   )r   rP   r2  rp   r.   r.   r/   rM    s   zGroupCoordinator.sendr$   r#   c                 C   sl   	 |du r| j d | j }tj||| jd}| j}|dur(|js(||| |S tj|| j	| | j
 |S )z'Receives a tensor from the source rank.NrD   r   )rh   rf   r*   r  r"   rp   r   rF  r3   re   rj   )r   r$   r#   r=  rP   rp   r.   r.   r/   rF    s   zGroupCoordinator.recvc                 C   st   | j d urtj| j  d | _ | jd urtj| j d | _| jd ur&d | _| jd ur.d | _| jd ur8d | _d S d S rJ   )rj   r*   r3   destroy_process_groupri   rp   rq   rs   r   r.   r.   r/   destroy(  s   





zGroupCoordinator.destroy)NNrJ   )r   N)r   r   r   Nr   r"  F)Nr   NN)NNF)=r'   r(   r)   __doc__r)  r-   r   r   boolr   r   r   r   strr   r   r   propertyr   r   r   r   r   r   r   r%   r*   cudar,   r   r5   r   rY   rU   rb   r  r  r  r^   r  r  r  r1  r9  rB  rD  rE  rH  r0   rY  r_  r
   ri  ro  rs  rt  rM  Sizer#   rF  rv  r.   r.   r.   r/   rG      s  
 

	

 P






@d





#

M

9
#
	
1
&
U

9
<	
_WORLDc                   C      t d usJ dt S )Nzworld group is not initialized)r  r.   r.   r.   r/   get_world_group:     r  re   rg   r   c                 C   s    t | g||dddddddddS )NFworld)rv   rg   rw   rk   rl   rm   rn   rx   ry   rz   rR   )rG   )re   rg   r   r.   r.   r/   init_world_group?  s   r  FTrv   rk   rm   ro   use_mscclpp_allreducer{   use_torch_symm_mem_allreducec
           
      C   s`   |d u rt }|d u rt}|	d u rt}	t| |||d u r#tp!tp!|dk n||||	ddd|||dS )Nr   T)rv   rg   rw   rk   rl   rm   rn   rx   ry   rz   ro   rR   r{   )_ENABLE_CUSTOM_ALL_REDUCE_ENABLE_MSCCLPP_ALL_REDUCE!_ENABLE_TORCH_SYMM_MEM_ALL_REDUCErG   r   r   )
rv   rg   r   rk   rm   ro   rR   r  r{   r  r.   r.   r/   init_model_parallel_groupQ  s.   r  _TP_ATTN_TP_ATTN_CP_PDMUX_PREFILL_TP_GROUP_ENABLE_PDMUX_P_TPenable_prefill_multiplexingc                 C      | a d S rJ   )r  )r  r.   r.   r/   set_pdmux_status     r  c                   C   s,   t rtd us
J dtS td usJ dtS )NzJtensor model parallel group for PD-Multiplexing Prefill is not initializedz.tensor model parallel group is not initialized)r  r  r  r.   r.   r.   r/   get_tp_group  s   
r  c                   C   r  )Nz8attention tensor model parallel group is not initialized)r  r.   r.   r.   r/   get_attn_tp_group     
r  c                   C   r  )Nz9attention context model parallel group is not initialized)r  r.   r.   r.   r/   get_attn_cp_group  r  r  _MOE_DP_MOE_EP_MOE_TPc                   C   r  )Nz*moe data parallel group is not initialized)r  r.   r.   r.   r/   get_moe_dp_group  r  r  c                   C   r  Nz.expert model parallel group is not initialized)r  r.   r.   r.   r/   get_moe_ep_group  r  r  c                   C   r  r  )r  r.   r.   r.   r/   get_moe_tp_group  r  r  _PPc                   C   r  )Nz0pipeline model parallel group is not initialized)r  r.   r.   r.   r/   get_pp_group  r  r  c                  C   s   ddl m}  |  S )z
    Return the shared MooncakeTransferEngine if initialized in device_communicators,
    else None. Used by disaggregation mooncake backend and mem_cache mooncake_store.
    r   )get_mooncake_transfer_engine)Dsglang.srt.distributed.device_communicators.mooncake_transfer_enginer  )_get_enginer.   r.   r/   r    s   r  r&   c              	   c   sx    t  j| d*}t | |V  W d   n1 sw   Y  W d   dS W d   dS 1 s5w   Y  dS )aA  
    `graph_capture` is a context manager which should surround the code that
    is capturing the CUDA graph. Its main purpose is to ensure that the
    some operations will be run after the graph is captured, before the graph
    is replayed. It returns a `GraphCaptureContext` object which contains the
    necessary data for the graph capture. Currently, it only contains the
    stream that the graph capture is running on. This stream is set to the
    current CUDA stream when the context manager is entered and reset to the
    default stream when the context manager is exited. This is to ensure that
    the graph capture is running on a separate stream from the default stream,
    in order to explicitly distinguish the kernels to capture
    from other kernels possibly launched on background in the default stream.
    r  N)r  r   r  )r&   contextr.   r.   r/   r     s   Pr   r   c                 C   r  rJ   )r  r   r.   r.   r/   set_custom_all_reduce  r  r  c                 C   r  rJ   )r  r   r.   r.   r/   set_mscclpp_all_reduce  r  r  c                 C   r  rJ   )r  r   r.   r.   r/   set_torch_symm_mem_all_reduce  r  r  r   env://ncclrf   rd   distributed_init_methodr   c           	   
   C   s.  t d| |||| d|v r.zddlm} W n ty' } ztd|d }~ww |t  tj	 s`|d us;J d|d urUt
|tsHJ d|dksPJ dt|d	}tjj||| ||d
 |dkrt|dkrrttjdd}n|}td u rtttj }t|||ad S tjtj ksJ dd S )NzIworld_size=%d rank=%d local_rank=%d distributed_init_method=%s backend=%sr   r   )epzPlease install mooncake by following the instructions at https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/build.md to run SGLang with Mooncake Backend.zRdistributed_init_method must be provided when initializing distributed environmentztimeout must be a numberztimeout must be positivert   )r   init_methodrf   rd   r   r   r  
LOCAL_RANK0z;world group already initialized with a different world size)r   debugr   r  ImportErrorset_host_ipr   r*   r3   rd  r9   r)  r   init_process_grouposenvironr   r  listr:  get_world_sizer  rf   )	rf   rd   r  rg   r   r   mooncake_epr   re   r.   r.   r/   init_distributed_environment  sZ   


r  rD   tensor_model_parallel_sizeexpert_model_parallel_sizepipeline_model_parallel_sizeattention_data_parallel_size%attention_context_model_parallel_sizemoe_data_model_parallel_sizeduplicate_tp_groupc              
   C   s  t j sJ t j }|pt jt j}|| | kr*td| d|  d| d||  }	tdu s6J dg }
t	|	D ]}t
t	||  |d |  }|
| q<t|
t j|tdd	d
|da|rtdu sjJ dt|
t j|tdd	dddatjrdtj_dtj_|}|}| | | }tdu sJ d|| krtanHg }
t	|	D ]7}t	|D ]0}t	|D ])}||  || |  | }||  |d | |  | }t
t	|||}|
| qqqt|
t j|ddaddlm} tdu sJ d|| krtan@g }
t	|	D ]+}t	|| D ]!}||  ||  }||  |d |  }t
t	||}|
| qq t|
t j||ddddda|}|}| | | }tdu sMJ d|| krUtan;g }
t	|	D ]*}t	|| D ] }||  | }|d |  | }t
t	|||| }|
| qcq[t|
t j|ddatdu sJ d|| krtanCg }
t	|	D ]2}t	|D ]*}t	|D ]"}||  || |  | }|||  }t
t	|||}|
| qqqt|
t j|ddatdu sJ d|| krtan<g }
t	|	D ]+}t	|| D ]!}||  ||  }||  |d |  }t
t	||}|
| qqt|
t j|dda|| }tdu s>J dg }
t	|D ]}t
t	|||}|
| qDt|
t j|dddadS ) a  
    Initialize model parallel groups.

    Arguments:
        tensor_model_parallel_size: number of GPUs used for tensor model
            parallelism.
        expert_model_parallel_size: number of GPUs used for expert model
            parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model
            parallelism.
        attention_data_parallel_size: number of GPUs used for attention data
            parallelism.
        attention_context_model_parallel_size: number of GPUs used for attention context
            parallelism.
        moe_data_model_parallel_size: number of GPUs used for moe data
            parallelism.

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
        4 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 pipeline model-parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]

    Let's say we use 2 GPUs for attention context parallelism (attn_cp_size=2) and 4 GPUs for
    attention tensor parallelism (attn_tp_size=4). As for MoE part, we use 2 GPUs for moe data
    parallelism (moe_dp_size=2) and 4 GPUs for moe expert parallelism (moe_ep_size=4). The present
    function will create the following groups:
        2 tensor model-parallel groups:
            [g0, g1, g2, g3], [g4, g5, g6, g7]
        4 attention context-parallel groups:
            [g0, g4], [g1, g5], [g2, g6], [g3, g7]
        2 moe expert-parallel groups:
            [g0, g1, g2, g3], [g4, g5, g6, g7]
        4 moe data-parallel groups:
            [g0, g4], [g1, g5], [g2, g6], [g3, g7]

    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    zworld_size (z.) is not equal to tensor_model_parallel_size (z") x pipeline_model_parallel_size (r@  Nz2tensor model parallel group is already initializedrD   $SGLANG_USE_MESSAGE_QUEUE_BROADCASTERtruetp)ro   rR   r{   zNtensor model parallel group for PD-Multiplexing Prefill is already initializedpdmux_prefill_tpTFz=attention context model parallel group is already initializedattn_cpr   r   )SYNC_TOKEN_IDS_ACROSS_TPz<attention tensor model parallel group is already initializedattention_tp)rk   r  rm   r  rR   z.moe data parallel group is already initializedmoe_dpz2expert model parallel group is already initializedmoe_epmoe_tpz4pipeline model parallel group is already initializedpp)rm   rR   )r*   r3   rd  r  get_backendr  rj   RuntimeErrorr  r:  r  r;   r  rg   r   r  rp   r   r  sglang.srt.layers.samplerr  r  r  r  r  r  )r  r  r  r  r  r  r   r  rf    num_tensor_model_parallel_groupsrv   tp_group_idxre   attn_dp_sizeattn_cp_sizeattn_tp_sizedp_idxattn_tp_idxstenr  cp_dp_combined_idxmoe_ep_sizemoe_dp_sizemoe_tp_sizetp_ep_combined_idx
moe_dp_idx
moe_tp_idxep_dp_combined_idx"num_pipeline_model_parallel_groupspp_group_idxr.   r.   r/   initialize_model_parallel;  s  7















r  r   c                 C   s   t j sJ t j }t j }ttt| }dd t|D }t j	|| g }t }|D ]}t
|}	|	|vrF||	 |t|	 q0|jdd d d}
|D ]}t jj||d}t|t|krr|}
td| d	|  qS|
S )
a  
    Create a custom parallel group based on the provided ranks.

    Args:
        group_ranks: The list of ranks that the CURRENT process wants to join.
                     (e.g., Rank 0 passes [0...7], Rank 8 passes [8...15])
        backend: The communication backend (default: "gloo").

    Returns:
        The ProcessGroup if the current rank is in group_ranks, else None.
    c                 S   s   g | ]}d qS rJ   r.   r4  r.   r.   r/   r7  b  s    z0create_custom_parallel_group.<locals>.<listcomp>c                 S   s   | d S rx  r.   )xr.   r.   r/   <lambda>o  s    z.create_custom_parallel_group.<locals>.<lambda>)r>   N)re   r   zRank z+ successfully created/joined custom group: )r*   r3   rd  r  r   sortedr  setr:  rH  tupleaddr;   sortr   r   r  )rv   r   rf   rd   local_configgathered_configsunique_groupsseen_signaturesconfigconfig_tuplemy_new_groupg_ranksrI   r.   r.   r/   create_custom_parallel_groupN  s2   


r  c                 C   st   |p	t jt j}t st| ||| dS t | ks&J dt d| t j	}||ks8J d|d|dS )zHelper to initialize model parallel groups if they are not initialized,
    or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
    values if the model parallel groups are initialized.
    Nzjtensor parallel group already initialized, but of unexpected size: get_tensor_model_parallel_world_size()=z  vs. tensor_model_parallel_size=zSpipeline parallel group already initialized, but of unexpected size: pp_world_size=z" vs. pipeline_model_parallel_size=)
r*   r3   r  r  rj   model_parallel_is_initializedr  $get_tensor_model_parallel_world_sizer  rf   )r  r  r  r   pp_world_sizer.   r.   r/   !ensure_model_parallel_initialized  s0   

r  c                   C   s   t duotduS )z=Check if tensor and pipeline parallel groups are initialized.N)r  r  r.   r.   r.   r/   r    s   r  tp_groupc                 c   s<    t rJ dda t }| az
dV  W da |adS da |aw )a  Patch the tp group temporarily until this function ends.

    This method is for draft workers of speculative decoding to run draft model
    with different tp degree from that of target model workers.

    Args:
        tp_group (GroupCoordinator): the tp group coordinator
    z)Should not call when it's already patchedTNF)_TP_STATE_PATCHEDr  r  )r  old_tp_groupr.   r.   r/   patch_tensor_parallel_group  s   r  c                   C      t  jS )z&Return world size for the world group.)r  rf   r.   r.   r.   r/   r    r  r  c                   C   r  )z#Return my rank for the world group.)r  rh   r.   r.   r.   r/   get_world_rank  r  r  c                   C   r  )z6Return world size for the tensor model parallel group.)r  rf   r.   r.   r.   r/   r    r  r  c                   C   r  )z3Return my rank for the tensor model parallel group.)r  rh   r.   r.   r.   r/   get_tensor_model_parallel_rank  r  r  c                   C   r  )z@Return world size for the attention tensor model parallel group.)r  rf   r.   r.   r.   r/   )get_attn_tensor_model_parallel_world_size  r  r  c                   C   r  )z=Return my rank for the attention tensor model parallel group.)r  rh   r.   r.   r.   r/   #get_attn_tensor_model_parallel_rank  r  r  c                   C   r  )zAReturn world size for the attention context model parallel group.)r  rf   r.   r.   r.   r/   *get_attn_context_model_parallel_world_size  r  r   c                   C   r  )z>Return my rank for the attention context model parallel group.)r  rh   r.   r.   r.   r/   $get_attn_context_model_parallel_rank  r  r  c                   C   r  )z8Return world size for the pipeline model parallel group.)r  rf   r.   r.   r.   r/   &get_pipeline_model_parallel_world_size  r  r  c                   C   r  )z5Return my rank for the pipeline model parallel group.)r  rh   r.   r.   r.   r/    get_pipeline_model_parallel_rank  r  r  c                   C   r  )z2Return world size for the moe data parallel group.)r  rf   r.   r.   r.   r/    get_moe_data_parallel_world_size  r  r  c                   C   r  )z/Return my rank for the moe data parallel group.)r  rh   r.   r.   r.   r/   get_moe_data_parallel_rank  r  r  c                   C   r  )z4Return world size for the moe expert parallel group.)r  rf   r.   r.   r.   r/   "get_moe_expert_parallel_world_size  r  r  c                   C   r  )z1Return my rank for the moe expert parallel group.)r  rh   r.   r.   r.   r/   get_moe_expert_parallel_rank  r  r  c                   C   r  )z4Return world size for the moe tensor parallel group.)r  rf   r.   r.   r.   r/   "get_moe_tensor_parallel_world_size  r  r  c                   C   r  )z1Return my rank for the moe tensor parallel group.)r  rh   r.   r.   r.   r/   get_moe_tensor_parallel_rank  r  r	  c                   C   st   t rt   da trt  datrt  datrt  datr&t  datr.t  datr6t  dadS )z(Set the groups to none and destroy them.N)r  rv  r  r  r  r  r  r  r.   r.   r.   r/   destroy_model_parallel  s*   r
  c                   C   s,   t rt   d a tj rtj  d S d S rJ   )r  rv  r*   r3   rd  ru  r.   r.   r.   r/   destroy_distributed_environment<  s   
r  shutdown_rayc                 C   s  t   t  tt tj  W d    n1 sw   Y  | r*dd l}|	  t
  tsttdrStj rStj  ttjdrLtj  d S td d S ttdrdtj rdtj  d S ttdrutj rutj  d S ttdrtj rtj  d S d S d S d S )Nr   r~  _host_emptyCachez;torch._C._host_emptyCache() only available in Pytorch >=2.5xpunpumusa)r
  r  
contextlibsuppressAssertionErrorr*   r3   ru  rayshutdowngccollect_is_cpuhasattrr~  is_availableempty_cache_Cr  r   r   r  r  r  )r  r  r.   r.   r/   cleanup_dist_env_and_memoryE  s2   
r  pgsource_rankc              
   C   s  t j| t jjjksJ dt jj| d}t jj| d}t jdg| t jd}t j	| }d}d}zz{t
tl ||kr`tjddd	}||jdt|< t jj|jg|| | d
 d||< n>dg}t jj||| | d
 |d }	tddd  tj|	d}W d   n1 sw   Y  |jdt| |krd||< W d   n1 sw   Y  W n ty }
 ztd|
 W Y d}
~
nd}
~
ww W |r|  n|r|  w w t jj| d t
t ||kr|r|  W d   n1 sw   Y  t jj|| d dd | D S )z
    This is a collective operation that returns if each rank is in the same node
    as the source rank. It tests if processes are attached to the same
    memory system (shared access to shared memory).
    z;in_the_same_node_as should be tested with a non-NCCL group.rN   r   r   s   magic_messageNT   )creater$   rA  rD   z)multiprocessing.resource_tracker.registerc                  _   s   d S rJ   r.   )argskwargsr.   r.   r/   r    s    z%in_the_same_node_as.<locals>.<lambda>)rB   z(Error ignored in is_in_the_same_node: %sc                 S   s   g | ]}|d kqS )rD   r.   )r#  r  r.   r.   r/   r7    s    z'in_the_same_node_as.<locals>.<listcomp>)r*   r3   r  r   NCCLr   r  rP   r   get_process_group_ranksr  r  OSErrorr   SharedMemorybufr   rE  rB   r   r   r   errorclosert  unlinkr   tolist)r  r  rd   rf   is_in_the_same_nodere   magic_messageshmrF  rB   r   r.   r.   r/   in_the_same_node_as`  sj   


r0  reversec                 C   s   zdd l m  m} W n
 ty   Y d S w td u r#|ja|ja|ja	| r9t
|dt t
|dt t
|dt	 d S t
|dt t
|dt t
|dt d S )Nr   r  r  r  )vllm.distributed.parallel_stater3   parallel_stater  vllm_get_pp_groupr  r  vllm_get_tp_groupr  vllm_get_world_groupsetattr)r1  vllm_parrlel_stater.   r.   r/    monkey_patch_vllm_parallel_state  s    r9  )rI   rG   r7   N)NNFNNTNrJ   )r   r   r  r   r  N)rD   rD   rD   rD   rD   rD   NF)r   ry  rw  )rz  r  r  loggingr  rO  rK   collectionsr   r   r   dataclassesr   datetimer   multiprocessingr   typingr   r	   r
   r   r   r   r   unittest.mockr   r*   torch.distributedr   r   )sglang.srt.compilation.compilation_configr   0sglang.srt.compilation.piecewise_context_managerr   sglang.srt.environr   sglang.srt.utilsr   r   r   r   r   r   r   r   r   r   r   sglang.srt.utils.custom_opr    r   r  r   r   r!   r)  r3   ReduceOpSUMr   r%   r0   r|  r5   r@   rA   r-   rF   rH   rO   rV   rZ   r`   rc   rG   r  r  r  r{  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  get_tensor_model_parallel_groupr  r  !get_pipeline_model_parallel_groupr  r~  r,   r   	getLoggerr'   r   r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r  r  r	  r
  r  r  r0  r4  r5  r6  r9  r.   r.   r.   r/   <module>   s  $4
 






         

	

'


C	
  

5
!&	B