o
    پi                  
   @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	Z	d dl
mZ d dlmZ eeZddd	Zd
ededefddZ	dde	jdededee	j fddZe jG dd dZdS )    N)deque)Sequence)Any)TCPStore)init_loggerreturnc                 C   s    | | dksJ d | |dS )z6Ensure that numerator is divisible by the denominator.r   z{} is not divisible by {}N)format	numeratordenominator r   c/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/distributed/utils.pyensure_divisibility   s   
r   r
   r   c                 C   s   t | | | | S )zXEnsure that numerator is divisible by the denominator and return
    the division value.)r   r	   r   r   r   divide    s   
r   Ftensornum_partitionscontiguous_split_chunksc                 C   sL   |   d }t|  | |}tj| ||d}|r"tdd |D S t|S )a5  Split a tensor along its last dimension.

    Arguments:
        tensor: input tensor.
        num_partitions: number of partitions to split the tensor
        contiguous_split_chunks: If True, make each chunk contiguous
                                 in memory.

    Returns:
        A list of Tensors
       )dimc                 s   s    | ]}|  V  qd S )N)
contiguous).0chunkr   r   r   	<genexpr>>   s    z.split_tensor_along_last_dim.<locals>.<genexpr>)r   r   sizetorchsplittuple)r   r   r   last_dimlast_dim_sizetensor_listr   r   r   split_tensor_along_last_dim'   s   r    c                   @   sT  e Zd ZU dZeed< eed< ejjj	ed< dZ
eed< ejedZeeef ed< ejedZeeef ed	< d
Zeed< ejedZeeef ed< ejedZeeeef  ed< dd ZdedefddZd%ddZdedefddZdedB dedefddZdedee fddZdd  Ze 	d&d!ed"ededededd fd#d$Z!dS )'StatelessProcessGroupzA dataclass to hold a metadata store, and the rank, world_size of the
    group. Only use it to communicate metadata between processes.
    For data-plane communication, create NCCL-related objects.
    rank
world_sizestore  data_expiration_seconds)default_factorysend_dst_counterrecv_src_counterr   broadcast_send_counterbroadcast_recv_src_counterentriesc                 C   sV   | j | jk sJ dd t| jD | _dd t| jD | _dd t| jD | _d S )Nc                 S      i | ]}|d qS r   r   r   ir   r   r   
<dictcomp>[       z7StatelessProcessGroup.__post_init__.<locals>.<dictcomp>c                 S   r-   r.   r   r/   r   r   r   r1   \   r2   c                 S   r-   r.   r   r/   r   r   r   r1   ]   r2   )r"   r#   ranger(   r)   r+   )selfr   r   r   __post_init__Y   s   z#StatelessProcessGroup.__post_init__objdstc                 C   s\   |    d| d| j|  }| j|t| | j|  d7  < | j|t	 f dS )z%Send an object to a destination rank.send_to//r   N)
expire_datar(   r$   setpickledumpsr,   appendtimeperf_counter)r4   r6   r7   keyr   r   r   send_obj_   s
   zStatelessProcessGroup.send_objr   Nc                 C   sP   | j r&| j d \}}t | | jkr| j| | j   ndS | j sdS dS )zAExpire data that is older than `data_expiration_seconds` seconds.r   N)r,   r?   r@   r&   r$   
delete_keypopleft)r4   rA   	timestampr   r   r   r:   g   s   z!StatelessProcessGroup.expire_datasrcc              	   C   s<   t | jd| j d| j|  }| j|  d7  < |S )z%Receive an object from a source rank.r8   r9   r   )r<   loadsr$   getr"   r)   )r4   rF   r6   r   r   r   recv_objr   s
   zStatelessProcessGroup.recv_objc                 C   s   | j |kr/|   d| d| j }| j|t| |  jd7  _| j|t	
 f |S d| d| j|  }t| j|}| j|  d7  < |S )zBroadcast an object from a source rank to all other ranks.
        It does not clean up after all ranks have received the object.
        Use it for limited times, e.g., for initialization.
        zbroadcast_from/r9   r   )r"   r:   r*   r$   r;   r<   r=   r,   r>   r?   r@   r+   rG   rH   )r4   r6   rF   rA   rI   r   r   r   broadcast_objz   s   
z#StatelessProcessGroup.broadcast_objc                 C   sV   g }t | jD ]!}|| jkr|| | j|| jd q| jd|d}|| q|S )z$All gather an object from all ranks.rF   N)r3   r#   r"   r>   rJ   )r4   r6   gathered_objsr0   rI   r   r   r   all_gather_obj   s   

z$StatelessProcessGroup.all_gather_objc                 C   s>   t | jD ]}|| jkr| jd| jd q| jd|d qdS )z#A barrier to synchronize all ranks.NrK   )r3   r#   r"   rJ   )r4   r0   r   r   r   barrier   s
   
zStatelessProcessGroup.barrierhostportc                 C   s$   t | |||dkd}t||||dS )a  A replacement for `torch.distributed.init_process_group` that does not
        pollute the global state.

        If we have process A and process B called `torch.distributed.init_process_group`
        to form a group, and then we want to form another group with process A, B, C,
        D, it is not possible in PyTorch, because process A and process B have already
        formed a group, and process C and process D cannot join that group. This
        function is a workaround for this issue.

        `torch.distributed.init_process_group` is a global call, while this function
        is a stateless call. It will return a `StatelessProcessGroup` object that can be
        used for exchanging metadata. With this function, process A and process B
        can call `StatelessProcessGroup.create` to form a group, and then process A, B,
        C, and D can call `StatelessProcessGroup.create` to form another group.
        r   )	host_namerP   r#   	is_master)r"   r#   r$   r&   )r   r!   )rO   rP   r"   r#   r&   r$   r   r   r   create   s   zStatelessProcessGroup.creater   N)r%   )"__name__
__module____qualname____doc__int__annotations__r   _C_distributed_c10dStorer&   dataclassesfielddictr(   r)   r*   r+   r   r,   r   strfloatr5   r   rB   r:   rI   rJ   listrM   rN   staticmethodrS   r   r   r   r   r!   C   sB   
  
r!   rT   )F)r^   r<   r?   collectionsr   collections.abcr   typingr   r   torch.distributedr   1sglang.multimodal_gen.runtime.utils.logging_utilsr   rU   loggerr   rY   r   Tensorboolr    	dataclassr!   r   r   r   r   <module>   s0   	


