o
    ۷i |                     @   sX  U d Z ddlZddlZddlm  mZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZmZ ej Zed	 Ze	eZdaedB ed
< daedB ed< daedB ed< daedB ed< daedB ed< daedB ed< daedB ed< de de!e  de!e" de!e!e   fddZ#G dd dZ$defddZ%defddZ&dd Z'dd  Z(d!d" Z)d#d$ Z*d%d& Z+d'd( Z,defd)d*Z-d+d, Z.d-d. Z/d/d0 Z0d1d2 Z1defd3d4Z2d5d6 Z3d7d8 Z4defd9d:Z5d;d< Z6d=d> Z7defd?d@Z8dAdB Z9dCdD Z:dEdF Z;dGdH Z<dIe!e  dJe dKe=defdLdMZ>	N	N	O	N	dvde dPe dQe=dJe dKe=dB f
dRdSZ?dTdU Z@dVe!e!e   dJe dKe=dWe=def
dXdYZAdZe dKe=fd[d\ZBd]d^ ZC	_	dwd`e dae dPe de dbe"dce!e!e   dB deDejjEejjEf fdddeZF									dxdfe dge dhe dB die dje dke dle dme dKe=dB ddfdndoZGdpdq ZHdrds ZIdtdu ZJdS )ya  vLLM-Omni distributed state.

It takes over the control of the distributed environment from PyTorch.
The typical workflow is:

- call `init_distributed_environment` to initialize the distributed environment.
- call `initialize_model_parallel` or `ensure_model_parallel_initialized` to
 initialize the model parallel groups.

- any code dealing with the distributed stuff

- call `destroy_model_parallel` to destroy the model parallel groups.
- call `destroy_distributed_environment` to destroy the distributed environment.

If you only need to use the distributed environment without model parallelism,
 you can skip the model parallel initialization and destruction steps.
    N)$get_tensor_model_parallel_world_size)init_logger)envs)current_omni_platform   )GroupCoordinatorPipelineGroupCoordinator SequenceParallelGroupCoordinatorhas_flash_attn_WORLD_SP_PP_CFG_DP_FS_DIT
world_sizeparallel_sizemaskreturnc                    s  ddt t dt t fdd dt t dt t dtfdd}d fd
d	}dd t||D }dd t||D } |}dd t||D }dd t||D }	 |d }
| |
 }g }t|D ]'}|||}g }t|
D ]}|||}|||||||	  qm|| q`|S )aL	  Generate orthogonal parallel groups based on the parallel size and mask.

    Arguments:
        world_size (int): world size

        parallel_size (list[int]):
            The parallel size of each orthogonal parallel type. For example, if
            tensor_parallel_size = 2, pipeline_model_parallel_group = 3, data_parallel_size = 4,
            and the parallel mapping order is tp-pp-dp, then the parallel_size = [2, 3, 4].

        mask (list[bool]):
            The mask controls which parallel methods the generated groups represent. If mask[i] is
            True, it means the generated group contains the i-th parallelism method. For example,
            if parallel_size = [tp_size, pp_size, dp_size], and mask = [True, False , True], then
            the generated group is the `tp-dp` group, if the mask = [False, True, False], then the
            generated group is the `pp` group.

    Algorithm:
        For orthogonal parallelism, such as tp/dp/pp/cp, the global_rank and
        local_rank satisfy the following equation:
            global_rank = tp_rank + dp_rank * tp_size + pp_rank * tp_size * dp_size (1)
                tp_rank \in [0, tp_size)
                dp_rank \in [0, dp_size)
                pp_rank \in [0, pp_size)

        If we want to get the `dp_group` (tp_size * pp_size groups of dp_size ranks each.
        For example,  if the gpu size is 8 and order is 'tp-pp-dp', size is '2-2-2', and the
        dp_group here is [[0, 4], [1, 5], [2, 6], [3, 7]].)
        The tp_rank and pp_rank will be combined to form the `dp_group_index`.
            dp_group_index = tp_rank + pp_rank * tp_size (2)

        So, Given that tp_rank and pp_rank satisfy equation (2), and dp_rank in
        range(0, dp_size), the ranks in dp_group[dp_group_index] satisfies the
        equation (1).

        This function solve this math problem.

    For example, if the parallel_size = [tp_size, dp_size, pp_size] = [2, 3, 4],
    and the mask = [False, True, False]. Then,
        dp_group_index(0) = tp_rank(0) + pp_rank(0) * 2
        dp_group_index(1) = tp_rank(1) + pp_rank(0) * 2
        ...
        dp_group_index(7) = tp_rank(1) + pp_rank(3) * 2

        dp_group[0] = 0 + range(0, 3) * 2 + 0 = [0, 2, 4]
        dp_group[1] = 1 + range(0, 3) * 2 + 0 = [1, 3, 5]
        ...
        dp_group[7] = 1 + range(0, 3) * 2 + 3 * 2 * 3 = [19, 21, 23]
    r   ar   c                 S   s&   |g}| D ]}|| }| | q|S N)append)r   initrv r   d/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/parallel_state.pyprefix_productt   s
   z>generate_masked_orthogonal_rank_groups.<locals>.prefix_productbc                 S   s   t dd t| |D S )Nc                 S      g | ]\}}|| qS r   r   .0xyr   r   r   
<listcomp>|       zQgenerate_masked_orthogonal_rank_groups.<locals>.inner_product.<locals>.<listcomp>)sumzip)r   r   r   r   r   inner_product{   s   z=generate_masked_orthogonal_rank_groups.<locals>.inner_productNc                    sj   |du r|} fddt ||D }tdd t ||dd D  ks3J d  d| d| |S )	aB  
        This function solve the math problem below:
            There is an equation:
                index = sum(idx[i] * stride[i])
            And given the value of index, stride.
            Return the idx.
        This function will used to get the pp/dp/pp_rank
        from group_index and rank_in_group.
        Nc                    s   g | ]
\}} | | qS r   r   )r"   sdindexr   r   r%      s    zMgenerate_masked_orthogonal_rank_groups.<locals>.decompose.<locals>.<listcomp>c                 S   r    r   r   r!   r   r   r   r%      r&   zidx z with shape z mismatch the return idx )r(   r'   )r-   shapestrideidxr   r,   r   	decompose~   s   
&z9generate_masked_orthogonal_rank_groups.<locals>.decomposec                 S      g | ]\}}|r|qS r   r   r"   r*   mr   r   r   r%      r&   z:generate_masked_orthogonal_rank_groups.<locals>.<listcomp>c                 S      g | ]\}}|s|qS r   r   r5   r   r   r   r%      r&   c                 S   r4   r   r   r"   r+   r6   r   r   r   r%      r&   c                 S   r7   r   r   r8   r   r   r   r%      r&   r.   )r   r   )listintr(   ranger   )r   r   r   r)   r3   masked_shapeunmasked_shapeglobal_stridemasked_strideunmasked_stride
group_sizenum_of_groupranksgroup_indexdecomposed_group_idxrankrank_in_groupdecomposed_rank_idxr   r2   r   &generate_masked_orthogonal_rank_groups?   s*   5

rI   c                   @   sd   e Zd Z			ddededededed	ed
ededdfddZd
edefddZddefddZdS )RankGeneratorr   tp-sp-pp-cfg-dpr   tpspppcfgdpfsorderrank_offsetr   Nc	              	   C   s   || _ || _|| _|| _|| _|| _|| _|| | | | | _| j | j| j| j| j| jd| _|	 }| j
 D ]-}	|	dkr@q9|	|vr\| j|	 dkr\td|	 d| j|	  d| d|	|vrf|d |	 }q9|| _g | _|dD ]}
| j| j|
  qrd S )	N)rL   rM   rN   rO   rP   rQ   rQ   r   zThe size of (z) is (z(), but you haven't specified the order ().-)rL   rM   rN   rO   rP   rQ   rS   r   name_to_sizelowerkeysRuntimeErrorrR   ordered_sizesplitr   )selfrL   rM   rN   rO   rP   rQ   rR   rS   nametokenr   r   r   __init__   sB   zRankGenerator.__init__r^   c                 C   s>   | d}| d}dgt| }|D ]	}d|||< q|S )NrU   FT)r[   lenr-   )r\   rR   r^   ordered_tokenr   tr   r   r   get_mask   s   

zRankGenerator.get_maskFindependent_ranksc           	      C   s   |r1|dkr1g }| j | j }t|D ]}tt|| j | j |d | j | j }|| q|S | | j|}t| j | j	|}| jdkr\|D ]}tt
|D ]}||  | j7  < qOqG|S )a  Get rank group by input token.

        Arguments:
            token (str):
                Specify the ranks type that want to get. If we want
                to obtain multiple parallel types, we can use a hyphen
                '-' to separate them. For example, if we want to obtain
                the TP_DP group, the token should be 'tp-dp'.
            independent_ranks (bool):
                If True, generate independent rank groups that divide the world
                into groups of the specified size. Used for FS (fully shard) groups
                which operate independently from the main parallelism hierarchy.
        rQ   r   r   )r   rQ   r;   r9   rS   r   rc   rR   rI   rZ   r`   )	r\   r^   rd   rC   
num_groupsigroupr   
rank_groupr   r   r   	get_ranks   s   *
zRankGenerator.get_ranks)r   rK   r   )F)	__name__
__module____qualname__r:   strr_   rc   boolri   r   r   r   r   rJ      s2    	

1rJ   c                   C      t d usJ dt S )Nzworld group is not initialized)r   r   r   r   r   get_world_group     rp   c                   C   ro   Nz0pipeline model parallel group is not initialized)r   r   r   r   r   get_sp_group  rq   rs   c                   C      t  jS )z2Return world size for the sequence parallel group.)rs   r   r   r   r   r    get_sequence_parallel_world_size     ru   c                   C   rt   )z/Return my rank for the sequence parallel group.)rs   rG   r   r   r   r   get_sequence_parallel_rank  rv   rw   c                   C   rt   r   )rs   ulysses_world_sizer   r   r   r   get_ulysses_parallel_world_size     ry   c                   C   rt   r   )rs   ulysses_rankr   r   r   r   get_ulysses_parallel_rank   rz   r|   c                   C   rt   r   )rs   ring_world_sizer   r   r   r   get_ring_parallel_world_size$  rz   r~   c                   C   rt   r   )rs   	ring_rankr   r   r   r   get_ring_parallel_rank(  rz   r   c                   C   ro   rr   )r   r   r   r   r   get_pp_group-  rq   r   c                   C   rt   )z8Return world size for the pipeline model parallel group.)r   r   r   r   r   r    get_pipeline_parallel_world_size2  rv   r   c                   C   rt   )z5Return my rank for the pipeline model parallel group.)r   rG   r   r   r   r   get_pipeline_parallel_rank7  rv   r   c                   C   s
   t  dkS )zKReturn True if in the first pipeline model parallel stage, False otherwise.r   )r   r   r   r   r   is_pipeline_first_stage<  s   
r   c                   C   s   t  t d kS )zJReturn True if in the last pipeline model parallel stage, False otherwise.r   )r   r   r   r   r   r   is_pipeline_last_stageA  s   r   c                   C   ro   )Nz:classifier_free_guidance parallel group is not initialized)r   r   r   r   r   get_cfg_groupG  rq   r   c                   C   rt   )zBReturn world size for the classifier_free_guidance parallel group.)r   r   r   r   r   r   'get_classifier_free_guidance_world_sizeL  rv   r   c                   C   rt   )z?Return my rank for the classifier_free_guidance parallel group.)r   rG   r   r   r   r   !get_classifier_free_guidance_rankQ  rv   r   c                   C   ro   rr   )r   r   r   r   r   get_dp_groupW  rq   r   c                   C   rt   )z.Return world size for the data parallel group.)r   r   r   r   r   r   get_data_parallel_world_size\  rv   r   c                   C   rt   )z+Return my rank for the data parallel group.)r   rG   r   r   r   r   get_data_parallel_ranka  rv   r   c                   C   ro   )Nz$fully shard group is not initialized)r   r   r   r   r   get_fs_groupg  rq   r   c                   C   rt   )z,Return world size for the fully shard group.)r   r   r   r   r   r   get_fully_shard_world_sizel  rv   r   c                   C   rt   )z)Return my rank for the fully shard group.)r   rG   r   r   r   r   get_fully_shard_rankq  rv   r   c                   C   s0   t  t d kot t d kot t d kS )z@Return True if in the last data parallel group, False otherwise.r   )rw   ru   r   r   r   r   r   r   r   r   is_dp_last_groupv  s
   r   c                   C   s   t  t  t  t  t  S )z$Return world size for the DiT model.)r   r   ru   r   r   r   r   r   r   get_dit_world_size  s   r   rC   
local_rankbackendc                 C   s   t | g||dS )Ngroup_ranksr   torch_distributed_backend)r   )rC   r   r   r   r   r   init_world_group  s
   r   r.   env://rF   distributed_init_methodc                 C   s   |d u rt j}td| |||| tj s9|d usJ dtjj||| |d tj t 	  }t 
t | |dkrG|dkrEtj}n|}td u r\tttj }t|||ad S tjtj kshJ dd S )NzIworld_size=%d rank=%d local_rank=%d distributed_init_method=%s backend=%szRdistributed_init_method must be provided when initializing distributed environment)r   init_methodr   rF   r.   r   z;world group already initialized with a different world size)r   dist_backendloggerdebugtorchdistributedis_initializedinit_process_groupget_rankget_device_count
set_deviceget_torch_devicer   
LOCAL_RANKr   r9   r;   get_world_sizer   r   )r   rF   r   r   r   	device_idrC   r   r   r   init_distributed_environment  s@   

r   c                   C   s*   t duotduotduotduotjduS )z=Check if tensor and pipeline parallel groups are initialized.N)r   r   r   r   vllm_parallel_state_TPr   r   r   r   model_parallel_is_initialized  s   r   r   parallel_modec                 K   sZ   |dv sJ d| d|dkrt | ||dS |dkr&td| ||d|S t| ||dS )N)datapipelinetensorsequenceclassifier_free_guidancefully_shardzparallel_mode z is not supportedr   r   r   r   )r   r	   r   )r   r   r   r   kwargsr   r   r   init_model_parallel_group  s,   

r   dit_parallel_sizec                 C   s   t jjtt| |dad S )N)rC   r   )r   r   	new_groupr9   r;   r   )r   r   r   r   r   init_dit_group  s   r   c                   C   ro   )NzDIT group is not initialized)r   r   r   r   r   get_dit_group  rq   r   Tsp_ulysses_degreesp_ring_degreeuse_ulysses_lowsp_group_ranksc                 C   sx  ||  }|| }|| dksJ d| d|  d|}| }	|durt || |kr<td||  d| dt | d	td
| d|  d| d| d	 d}
d}d}|D ]}t ||krktd| dt | d||v rst|}
|rt|D ]}|||  |d |   }tj|}||v r|}t|}qyt|	D ]}||d|	 }tj|}||v r|}t|}qqVt|	D ]}||| |d |  }tj|}||v r|}t|}qt|D ]}||d| }tj|}||v r|}t|}qqV|
durtd||
|| ||fS |ret|D ]N}|| }t|D ]!}tt||  | |d |  | }tj|}||v r<|}qt|	D ]}tt|| || |	}tj|}||v r]|}qBq||fS t|D ]N}|| }t|	D ]!}tt|| | |d | | }tj|}||v r|}qst|D ]}tt|| || |}tj|}||v r|}qqi||fS )aa  
    Initialize sequence-parallel Ulysses and Ring process groups.

    This builds sequence-parallel (SP) subgroups inside each data-parallel (DP)
    slice. The SP group size is sp_ulysses_degree * sp_ring_degree, and
    world_size must be divisible by that size.

    Args:
        sp_ulysses_degree: Size of each Ulysses subgroup.
        sp_ring_degree: Size of each Ring subgroup.
        rank: Global rank of the current process.
        world_size: Total number of processes.
        use_ulysses_low: If True, Ulysses groups are contiguous chunks and Ring
            groups are strided within each SP group. If False, the opposite.
        sp_group_ranks: Optional explicit SP groups. Each entry must be a list
            of length sp_ulysses_degree * sp_ring_degree. When provided, groups
            are built from these ranks instead of auto-generated contiguous
            ranges.

    Returns:
        ulyssess_pg (torch.distributed.ProcessGroup): The Ulysses process group
            for this rank.
        ring_pg (torch.distributed.ProcessGroup): The Ring process group for
            this rank.

    Raises:
        ValueError: If sp_group_ranks length does not match world_size or any
            entry has the wrong size.
        AssertionError: If world_size is not divisible by sp_size.

    Behavior:
        - If sp_group_ranks is provided, groups are built per entry and each
          entry is further split into Ulysses/Ring groups according to
          use_ulysses_low.
        - If sp_group_ranks is None, groups are auto-generated within each DP
          slice using offsets of size sp_size.
    r   zworld_size z % sp_size z == 0Nz!Invalid sp_group_ranks: expected z groups of size z
, but got z groups.z<Building SP subgroups from explicit sp_group_ranks (sp_size=z
, ulysses=z, ring=z, use_ulysses_low=rT   z,Invalid sp_group_ranks entry: expected size z, got .r   zJSP group details for rank %d: sp_group=%s, ulysses_group=%s, ring_group=%s)	r`   
ValueErrorr   infor9   r;   r   r   r   )r   r   rF   r   r   r   sp_sizedp_sizenum_ulysses_pgsnum_ring_pgslocal_sp_grouplocal_ulysses
local_ringr   rf   ulysses_ranksrg   ulyssess_pg
ring_ranksring_pgdp_rankoffsetr   r   r   set_seq_parallel_pg  s   -"

+


"

r   data_parallel_sizecfg_parallel_sizesequence_parallel_sizeulysses_degreering_degreetensor_parallel_sizepipeline_parallel_sizefully_shard_degreec	                 C   s<  |d u rt j}	 tj sJ tj }	|ptjt j}|d u r.|| }t	
d|  ||| krAtd| d| d| t  rM|dksMJ d| | | | | }
|	|
k rrtd|	 d| d	| d
| d| d|  dt||||| |dd}|d}td u sJ dt|dt j|ddatt_td u sJ dt|dt j|ddatd u sJ dt|dt j|ddatt_td u sJ dt||t j|
|d\}}t|t j|d||datjd u sJ dt|d t j|d!dt_td u sJ d"t|jd#d$d%t j|d&dat|
| d S )'NzMsequence_parallel_size is not provided, using ring_degree * ulysses_degree = zMsequence_parallel_size is not equal to ring_degree * ulysses_degree, but got z != z * r   z'Current pipefusion is not ready for NPUzworld_size (z%) is less than tensor_parallel_size (z) x pipeline_parallel_size (z) xsequence_parallel_size (z) xcfg_parallel_size (z) xdata_parallel_size ()rK   )rQ   rR   rM   z*data parallel group is already initializedrP   r   )r   r   r   r   z5classifier_free_guidance group is already initializedrO   r   z4pipeline model parallel group is already initializedrN   r   z.sequence parallel group is already initialized)r   r   rF   r   r   r   )r   r   r   r   ulysses_group
ring_groupz,Tensor parallel group is already initializedrL   r   z(fully shard group is already initializedrQ   T)rd   r   )r   r   r   r   r   r   get_backendrp   device_groupr   r   r   is_npurY   rJ   ri   r   r   r   r   r   r   r   r   rG   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   rank_generatorr   
ulysses_pgr   r   r   r   initialize_model_parallel  s   *


	
	r   c                   C   sj   t rt   da trt  datrt  datjr tj  dt_tr)t  datr1t  dadS )z(Set the groups to none and destroy them.N)r   destroyr   r   r   r   r   r   r   r   r   r   destroy_model_parallel>  s$   
r   c                   C   s,   t rt   d a tj rtj  d S d S r   )r   r   r   r   r   destroy_process_groupr   r   r   r   destroy_distributed_environment^  s   
r   c                   C   s   t  rt  t  d S r   )r   r   r   r   r   r   r   destroy_distributed_envg  s   
r   )r.   r.   r   r.   N)TN)	r   r   Nr   r   r   r   r   N)K__doc__r   torch.distributedvllm.distributed.parallel_stater   parallel_stater   r   vllm.loggerr   vllm_omni.diffusionr   vllm_omni.platformsr   group_coordinatorr   r   r	   PACKAGES_CHECKERget_packages_infoenv_infoHAS_FLASH_ATTNrj   r   r   __annotations__r   r   r   r   r   r   r:   r9   rn   rI   rJ   rp   rs   ru   rw   ry   r|   r~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rm   r   r   r   r   r   r   tupleProcessGroupr   r   r   r   r   r   r   r   r   <module>   s  


l\		
2

$

 	

 ( 	