o
    پi"                  
   @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZ d dlZd dlmZ eeZdd Zdd Z		dd
ejdededeej fddZdedededeeef fddZe jG dd dZdS )    N)deque)AnyDequeDictOptionalSequenceTuple)TCPStorec                 C   s    | | dksJ d | |dS )z6Ensure that numerator is divisible by the denominator.r   z{} is not divisible by {}N)format	numeratordenominator r   P/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/distributed/utils.pyensure_divisibility   s   
r   c                 C   s   t | | | | S )zXEnsure that numerator is divisible by the denominator and return
    the division value.)r   r   r   r   r   divide   s   
r   Ftensornum_partitionscontiguous_split_chunksreturnc                 C   sH   |   d }t|  | |}tj| ||d}|r"tdd |D S |S )a5  Split a tensor along its last dimension.

    Arguments:
        tensor: input tensor.
        num_partitions: number of partitions to split the tensor
        contiguous_split_chunks: If True, make each chunk contiguous
                                 in memory.

    Returns:
        A list of Tensors
       )dimc                 s   s    | ]}|  V  qd S )N)
contiguous).0chunkr   r   r   	<genexpr>:   s    z.split_tensor_along_last_dim.<locals>.<genexpr>)r   r   sizetorchsplittuple)r   r   r   last_dimlast_dim_sizetensor_listr   r   r   split_tensor_along_last_dim#   s   r#   num_hidden_layerspp_rankpp_sizec              
   C   s&  t dd}|durczdd |dD }W n ty* } ztd||d}~ww t||kr>tdt|d|d	t|| krQtd
t|d| d	t|d| }|||  }||fS | | }| | }	|||	 kr||	 }
||d  |
 }||d  }||fS || }|| }||fS )zTry to evenly distribute layers across partitions.
    If the number of layers is not divisible by the number of partitions,
    the last N partitions will have one extra layer, where N = remainder.
    SGLANG_PP_LAYER_PARTITIONNc                 S   s   g | ]}t |qS r   )int)r   layerr   r   r   
<listcomp>J   s    z"get_pp_indices.<locals>.<listcomp>,zInvalid partition string: {}zlen(partitions)=z does not match pp_size=.zsum(partitions)=z" does not match num_hidden_layers=r   )osgetenvr   
ValueErrorr
   lensum)r$   r%   r&   partition_list_str
partitionserrstart_layer	end_layerbase_layers	remainderpartitions_without_extra_layerr   r   r   get_pp_indices?   s:   r:   c                   @   sR  e Zd ZU dZeed< eed< ejjj	ed< dZ
eed< ejedZeeef ed< ejedZeeef ed	< d
Zeed< ejedZeeef ed< ejedZeeeef  ed< dd ZdedefddZdd ZdedefddZdee dedefddZdede e fddZ!dd Z"e#	d%d ed!ededededd fd"d#Z$d$S )&StatelessProcessGroupzA dataclass to hold a metadata store, and the rank, world_size of the
    group. Only use it to communicate metadata between processes.
    For data-plane communication, create NCCL-related objects.
    rank
world_sizestore  data_expiration_seconds)default_factorysend_dst_counterrecv_src_counterr   broadcast_send_counterbroadcast_recv_src_counterentriesc                 C   sV   | j | jk sJ dd t| jD | _dd t| jD | _dd t| jD | _d S )Nc                 S      i | ]}|d qS r   r   r   ir   r   r   
<dictcomp>~       z7StatelessProcessGroup.__post_init__.<locals>.<dictcomp>c                 S   rG   rH   r   rI   r   r   r   rK      rL   c                 S   rG   rH   r   rI   r   r   r   rK      rL   )r<   r=   rangerB   rC   rE   )selfr   r   r   __post_init__|   s   z#StatelessProcessGroup.__post_init__objdstc                 C   s\   |    d| d| j|  }| j|t| | j|  d7  < | j|t	 f dS )z%Send an object to a destination rank.send_to//r   N)
expire_datarB   r>   setpickledumpsrF   appendtimeperf_counter)rN   rP   rQ   keyr   r   r   send_obj   s
   zStatelessProcessGroup.send_objc                 C   sP   | j r&| j d \}}t | | jkr| j| | j   ndS | j sdS dS )zAExpire data that is older than `data_expiration_seconds` seconds.r   N)rF   rY   rZ   r@   r>   
delete_keypopleft)rN   r[   	timestampr   r   r   rT      s   z!StatelessProcessGroup.expire_datasrcr   c              	   C   s<   t | jd| j d| j|  }| j|  d7  < |S )z%Receive an object from a source rank.rR   rS   r   )rV   loadsr>   getr<   rC   )rN   r`   rP   r   r   r   recv_obj   s
   zStatelessProcessGroup.recv_objc                 C   s   | j |kr/|   d| d| j }| j|t| |  jd7  _| j|t	
 f |S d| d| j|  }t| j|}| j|  d7  < |S )zBroadcast an object from a source rank to all other ranks.
        It does not clean up after all ranks have received the object.
        Use it for limited times, e.g., for initialization.
        zbroadcast_from/rS   r   )r<   rT   rD   r>   rU   rV   rW   rF   rX   rY   rZ   rE   ra   rb   )rN   rP   r`   r[   rc   r   r   r   broadcast_obj   s   
z#StatelessProcessGroup.broadcast_objc                 C   sV   g }t | jD ]!}|| jkr|| | j|| jd q| jd|d}|| q|S )z$All gather an object from all ranks.r`   N)rM   r=   r<   rX   rd   )rN   rP   gathered_objsrJ   rc   r   r   r   all_gather_obj   s   

z$StatelessProcessGroup.all_gather_objc                 C   s>   t | jD ]}|| jkr| jd| jd q| jd|d qdS )z#A barrier to synchronize all ranks.Nre   )rM   r=   r<   rd   )rN   rJ   r   r   r   barrier   s
   
zStatelessProcessGroup.barrierhostportc                 C   s$   t | |||dkd}t||||dS )a  A replacement for `torch.distributed.init_process_group` that does not
        pollute the global state.

        If we have process A and process B called `torch.distributed.init_process_group`
        to form a group, and then we want to form another group with process A, B, C,
        D, it is not possible in PyTorch, because process A and process B have already
        formed a group, and process C and process D cannot join that group. This
        function is a workaround for this issue.

        `torch.distributed.init_process_group` is a global call, while this function
        is a stateless call. It will return a `StatelessProcessGroup` object that can be
        used for exchanging metadata. With this function, process A and process B
        can call `StatelessProcessGroup.create` to form a group, and then process A, B,
        C, and D can call `StatelessProcessGroup.create` to form another group.
        r   )	host_namerj   r=   	is_master)r<   r=   r>   r@   )r	   r;   )ri   rj   r<   r=   r@   r>   r   r   r   create   s   zStatelessProcessGroup.createN)r?   )%__name__
__module____qualname____doc__r(   __annotations__r   _C_distributed_c10dStorer@   dataclassesfielddictrB   r   rC   rD   rE   r   rF   r   r   strfloatrO   r   r\   rT   rc   r   rd   listrg   rh   staticmethodrm   r   r   r   r   r;   f   sB   
  r;   )F)rv   loggingr-   rV   rY   collectionsr   typingr   r   r   r   r   r   r   torch.distributedr	   	getLoggerrn   loggerr   r   Tensorr(   boolr#   r:   	dataclassr;   r   r   r   r   <module>   sB    




'