o
    پi!                     @   s   d dl Z d dlmZmZmZ d dlZd dlZd dlm	Z
 d dlmZ 		 	ddee dedeej	j ded	ef
d
dZdedee dee deee  fddZG dd deZdS )    N)AnyListOptional)current_platformTdatarank
dist_groupsrcforce_cpu_devicec           
      C   s:  t |stjnd}||kra| du st| dkr,t jdgt j|d}tj|||d | S t	
| }t|}t tj|tjd |}	t j|gt j|d}tj|||d tj|	||d | S t jdgt j|d}tj|||d | }|dkr}g S t j|t j|d}	tj|	||d t|	  }t	|} | S )zBroadcast inputs from src rank to all other ranks with torch.dist backend.
    The `rank` here refer to the source rank on global process group (regardless
    of dist_group argument).
    cpuNr   )dtypedevice)r	   group)r   )torchr   r   device_typelentensorlongdist	broadcastpickledumps
ByteTensornp
frombufferuint8copytoitememptybytesr   numpyloads)
r   r   r   r	   r
   r   tensor_sizeserialized_datasizetensor_data r'   c/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/utils/distributed.pybroadcast_pyobj   s:   

r)   
world_sizeparallel_sizemaskreturnc                    s  ddt t dt t fdd dt t dt t dtfdd}d fd
d	}dd t||D }dd t||D } |}dd t||D }dd t||D }	 |d }
| |
 }g }t|D ]'}|||}g }t|
D ]}|||}|||||||	  qm|| q`|S )aF  Generate orthogonal parallel groups based on the parallel size and mask.

    Arguments:
        world_size (int): world size

        parallel_size (List[int]):
            The parallel size of each orthogonal parallel type. For example, if
            tensor_parallel_size = 2, pipeline_model_parallel_group = 3, data_parallel_size = 4,
            and the parallel mapping order is tp-pp-dp, then the parallel_size = [2, 3, 4].

        mask (List[bool]):
            The mask controls which parallel methods the generated groups represent. If mask[i] is
            True, it means the generated group contains the i-th parallelism method. For example,
            if parallel_size = [tp_size, pp_size, dp_size], and mask = [True, False , True], then
            the generated group is the `tp-dp` group, if the mask = [False, True, False], then the
            generated group is the `pp` group.

    Algorithm:
        For orthogonal parallelism, such as tp/dp/pp/cp, the global_rank and

        If we want to get the `dp_group` (tp_size * pp_size groups of dp_size ranks each.
        For example,  if the gpu size is 8 and order is 'tp-pp-dp', size is '2-2-2', and the
        dp_group here is [[0, 4], [1, 5], [2, 6], [3, 7]].)
        The tp_rank and pp_rank will be combined to form the `dp_group_index`.
            dp_group_index = tp_rank + pp_rank * tp_size (2)

        So, Given that tp_rank and pp_rank satisfy equation (2), and dp_rank in
        range(0, dp_size), the ranks in dp_group[dp_group_index] satisfies the
        equation (1).

        This function solve this math problem.

    For example, if the parallel_size = [tp_size, dp_size, pp_size] = [2, 3, 4],
    and the mask = [False, True, False]. Then,
        dp_group_index(0) = tp_rank(0) + pp_rank(0) * 2
        dp_group_index(1) = tp_rank(1) + pp_rank(0) * 2
        ...
        dp_group_index(7) = tp_rank(1) + pp_rank(3) * 2

        dp_group[0] = 0 + range(0, 3) * 2 + 0 = [0, 2, 4]
        dp_group[1] = 1 + range(0, 3) * 2 + 0 = [1, 3, 5]
        ...
        dp_group[7] = 1 + range(0, 3) * 2 + 3 * 2 * 3 = [19, 21, 23]
       ar-   c                 S   s&   |g}| D ]}|| }| | q|S N)append)r/   initrvr'   r'   r(   prefix_productm   s
   z>generate_masked_orthogonal_rank_groups.<locals>.prefix_productbc                 S   s   t dd t| |D S )Nc                 S      g | ]\}}|| qS r'   r'   .0xyr'   r'   r(   
<listcomp>u       zQgenerate_masked_orthogonal_rank_groups.<locals>.inner_product.<locals>.<listcomp>)sumzip)r/   r6   r'   r'   r(   inner_productt   s   z=generate_masked_orthogonal_rank_groups.<locals>.inner_productNc                    sb   |du r|} fddt ||D }tdd t ||dd D  ks/J d |||S )aB  
        This function solve the math problem below:
            There is an equation:
                index = sum(idx[i] * stride[i])
            And given the value of index, stride.
            Return the idx.
        This function will used to get the pp/dp/pp_rank
        from group_index and rank_in_group.
        Nc                    s   g | ]
\}} | | qS r'   r'   )r9   sdindexr'   r(   r<      s    zMgenerate_masked_orthogonal_rank_groups.<locals>.decompose.<locals>.<listcomp>c                 S   r7   r'   r'   r8   r'   r'   r(   r<      r=   z/idx {} with shape {} mismatch the return idx {})r?   r>   format)rD   shapestrideidxr5   rC   r(   	decomposew   s   
&z9generate_masked_orthogonal_rank_groups.<locals>.decomposec                 S      g | ]\}}|r|qS r'   r'   r9   rA   mr'   r'   r(   r<      r=   z:generate_masked_orthogonal_rank_groups.<locals>.<listcomp>c                 S      g | ]\}}|s|qS r'   r'   rM   r'   r'   r(   r<      r=   c                 S   rL   r'   r'   r9   rB   rN   r'   r'   r(   r<      r=   c                 S   rO   r'   r'   rP   r'   r'   r(   r<      r=   rE   )r.   r0   )r   intr?   ranger1   )r*   r+   r,   r@   rK   masked_shapeunmasked_shapeglobal_stridemasked_strideunmasked_stride
group_sizenum_of_groupranksgroup_indexdecomposed_group_idxr   rank_in_groupdecomposed_rank_idxr'   rJ   r(   &generate_masked_orthogonal_rank_groups=   s.   0

r_   c                   @   sT   e Zd Z	ddededededededed	d
fddZdedefddZdd Zd
S )RankGeneratorr   tpspppcfgdporderrank_offsetr-   Nc           
   	   C   s   || _ || _|| _|| _|| _|| _|| | | | | _| j | j| j| j| jd| _| }| j	 D ])}||vrS| j| dkrSt
d| d| j|  d| j d||vr]|d | }q4|| _g | _|dD ]}	| j| j|	  qid S )N)ra   rb   rc   rd   re   r.   zThe size of (z) is (z(), but you haven't specified the order (z).-)ra   rb   rc   rd   re   rg   r*   name_to_sizelowerkeysRuntimeErrorrf   ordered_sizesplitr1   )
selfra   rb   rc   rd   re   rf   rg   nametokenr'   r'   r(   __init__   s6   
zRankGenerator.__init__rq   c                 C   s>   | d}| d}dgt| }|D ]	}d|||< q|S )Nrh   FT)rn   r   rD   )ro   rf   rq   ordered_tokenr,   tr'   r'   r(   get_mask   s   

zRankGenerator.get_maskc                 C   sZ   |  | j|}t| j| j|}| jdkr+|D ]}tt|D ]}||  | j7  < qq|S )af  Get rank group by input token.

        Arguments:
            token (str):
                Specify the ranks type that want to get. If we want
                to obtain multiple parallel types, we can use a hyphen
                '-' to separate them. For example, if we want to obtain
                the TP_DP group, the token should be 'tp-dp'.

        r   )ru   rf   r_   r*   rm   rg   rR   r   )ro   rq   r,   rZ   
rank_groupir'   r'   r(   	get_ranks   s   

zRankGenerator.get_ranks)r   )__name__
__module____qualname__rQ   strrr   ru   rx   r'   r'   r'   r(   r`      s*    		
)r`   )Nr   T)r   typingr   r   r   r!   r   r   torch.distributeddistributedr   'sglang.multimodal_gen.runtime.platformsr   rQ   ProcessGroupboolr)   listr_   objectr`   r'   r'   r'   r(   <module>   s<   

0

h