o
    Tiq                     @   s*  d Z ddlmZ ddlmZ ddlmZmZ ddl	m
Z
 ddlmZ i ai adadadadai adZdrd	d
Zdd ZdadadadadadrddZdd Zdd Zdd Zdd Z dd Z!dd Z"dd Z#dd Z$dd  Z%d!d" Z&d#d$ Z'd%d& Z(d'd( Z)dsd*d+Z*		)dtd,d-Z+dsd.d/Z,d0d1 Z-d2d3 Z.d4d5 Z/d6d7 Z0d8d9 Z1d:d; Z2d<d= Z3d>d? Z4d@dA Z5dBdC Z6dDdE Z7dFdG Z8dHdI Z9dJdK Z:dLdM Z;dNdO Z<dPdQ Z=dRdS Z>dTdU Z?dVdW Z@dXdY ZAdZd[ ZBd\d] ZCd^d_ ZDd`da ZEdbdc ZFddde ZGdfdg ZHdhdi ZIdjdk ZJdldm ZKdndo ZLdpdq ZMdS )ual  
 Support different forms of parallelism in DeepSpeed using multiple process groups.
 Given that there are multiple scenarios and use-cases, this file is going to be updated
 frequently. For now, the group creation needed for the training scenario is being implemented.
 For inference and other new scenarios, the code will be either reused or added to this file.
    )comm)log_dist)$bwc_tensor_model_parallel_world_size bwc_pipeline_parallel_world_sizeDeprecatedException)get_acceleratorN   c                 C   s   t d)z3 Deprecated function. Retained to inform the users.zPlease do not use the groups.initialize() API as it is deprecated. Instead, pass the desired ep_size to deepspeed.moe.layer.MoE(..,ep_size,..)r   )ep_sizempu r   J/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/utils/groups.py
initialize5   s   r   c                 C   s    | | dksJ d | |dS )z6Ensure that numerator is divisible by the denominator.r   z{} is not divisible by {}N)format)	numeratordenominatorr   r   r   _ensure_divisibility<   s    r   c                 C   sT   t durdS |du rt |  }t|| fd}|jdda |jddat attfS )z&Initialize model data parallel groups.N)data_paralleltensor_parallelr   mesh_dimr   )_TENSOR_MODEL_PARALLEL_GROUPdistget_world_sizeinitialize_mesh_device	get_group_DATA_PARALLEL_GROUP_MODEL_PARALLEL_GROUP)tensor_model_parallel_sizedata_parallel_sizemesh_devicer   r   r   _init_tp_mesh_deviceP   s   
r!   c                   C      t dusJ dt S )z?Get the tensor model parallel group the caller rank belongs to.Nz3intra_layer_model parallel group is not initialized)r   r   r   r   r   get_tensor_model_parallel_groupj      
r#   c                   C   r"   )z8Get the model parallel group the caller rank belongs to.Nz'model parallel group is not initialized)r   r   r   r   r   get_model_parallel_groupr   r$   r%   c                   C   r"   )7Get the data parallel group the caller rank belongs to.Nz&data parallel group is not initialized)r   r   r   r   r   get_data_parallel_groupz   s   
r'   c                 C      | a dS )z"Set the tensor model parallel sizeN)%_MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE)
world_sizer   r   r   $set_tensor_model_parallel_world_size      r+   c                   C      t durt S tjt dS )z6Return world size for the tensor model parallel group.Ngroup)r)   r   r   r#   r   r   r   r   $get_tensor_model_parallel_world_size      r0   c                   C      t  S N)r0   r   r   r   r   get_model_parallel_world_size      r4   c                 C   r(   )zSet tensor model parallel rank.N)_MPU_TENSOR_MODEL_PARALLEL_RANK)rankr   r   r   set_tensor_model_parallel_rank   r,   r8   c                   C   r-   )z3Return my rank for the tensor model parallel group.Nr.   )r6   r   get_rankr#   r   r   r   r   get_tensor_model_parallel_rank   r1   r:   c                   C   r2   r3   )r:   r   r   r   r   get_model_parallel_rank   r5   r;   c                  C   s   t  } t }| | | S )zgCalculate the global rank corresponding to the first local rank
    in the tensor model parallel group.)r   r9   r0   )global_ranklocal_world_sizer   r   r   "get_tensor_model_parallel_src_rank   s   r>   c                   C      t jt dS ).Return world size for the data parallel group.r.   )r   r   r'   r   r   r   r   get_data_parallel_world_size      rA   c                   C   r?   +Return my rank for the data parallel group.r.   )r   r9   r'   r   r   r   r   get_data_parallel_rank   rB   rE   c           	      C   s   t d|  dgd t sJ t }t| |}t|| t }d}d}t|D ]}t|||}t|}||| kr?|}q*t|| D ]}t|| |d | }t|}||| kr`|}qF||fS )aA  
    Initialize model data parallel groups.

    Arguments:
        model_parallel_size: number of GPUs used to parallelize model.

    Returns:
        Tuple of data parallel group and model parallel group

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
    use 2 GPUs to parallelize the model. The present function will
    create 4 model parallel groups and 2 data parallel groups as:
        4 model parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 data parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    z(Creating model parallel group with size r   ranksNr	   )	r   r   is_initializedr   minr   r9   range	new_group)	model_parallel_size_r*   model_parallel_sizer7   r   r   irG   r/   r   r   r   _create_model_parallel   s*   



rO   Fc                 C   s  t  sJ td|  dgd t  }tdu rdntt}t  }|| }t||  d|  }||  }|tvrt	d||D ]@}t	| D ]9}	|rXt	||	|  ||	d |  }
n
t	||	 || | }
t 
|
}td| dt|
 dg ||
v r}|t|< qDq>|tvr|rt	d||D ].}t	|D ]'}	t	||	 || |}
t 
|
}td	| dt|
 dg ||
v r|t|< qqdS t	||  D ]*}	t	|	|  |	d |  }
t 
|
}td	| dt|
 dg ||
v r|t|< qdS dS )
a  
        Create expert and data parallel groups.

        Note: Caller of this function is responsible to check if the groups already exist.

        Example - E + D parallel
        world_size = 16
        expert_parallel_size = 2 # number of experts in same group
        expert_data_parallel_group = [0,2,4,6,8,10,12,14], [1,3,5,7,9,11,13,15] - all reduce is only on MoE params
        expert_parallel_group = [0, 1], [2,3], [4,5], [6,7], [8,9] - no all reduce, but all to all
        data_parallel_group = [0,1,...,15] - all reduce is only on non-MoE
        use_data_before_expert_parallel_ (bool): Use the D + E instead of E + D topology
    z3Creating expert and data parallel groups with size r   rF   Nr	   ep_size_z2Creating expert data parallel process group named z with ranks: z-creating expert parallel process group named )r   rH   r   r   r   r   r9   r   _EXPERT_DATA_PARALLEL_GROUPrJ   rK   list_EXPERT_PARALLEL_GROUP)expert_parallel_size_ use_data_before_expert_parallel_r*   pp_world_sizer7   	pp_stride
group_name	ep_stridepp_stage_startrN   rG   r/   r   r   r    _create_expert_and_data_parallel   sp   

 




r[   c                 C   sd  t | ||  | ||  }t || g }|}| | }|r[| | | | }	td| |D ]/}
|
| }t|D ]$}|t  t|	D ]}|d tt|
| ||  ||	|  q@q4q*n!td| |D ]}
|
| }t|D ]}|tt|
| || qkqag }g }|D ]+}g }td||D ]}|||||   q|| t| D ]	}|t| qq||fS )a  Generate expert parallel and expert data parallel group ranks list.

        Example - E + M + D parallel
        world_size = 16
        model_degree = 2
        expert_degree = 4 # number of experts in same group
        mp_group = [0, 1], [2,3], [4,5] ...
        data_parallel_group =[0,2,4,6,8,10, 12,14],                 [1,3,5,7,9,11,13,15]
        expert_parallel_group = [0,2,4,6], [8,10,12,14]             [1,3,5,7], [9,11,13,15]
        expert_data_parallel_group = [0,8],[2,10],[4,12],[6,14],    [1,9],[3,11],[5,13],[7,15]

    Args:
        world_size (int): Distributed world size.
        tensor_parallel_size_ (int): Tensor parallel group size.
        expert_parallel_size_ (int): Expert parallel group size.
        pipeline_parallel_size_ (int): Pipeline parallel group size
        use_data_before_expert_parallel_ (bool): Use the D + E instead of E + D topology
    Returns:
        Expert parallel group ranks and Expert data parallel group ranks list.
    r   )r   rJ   appendrR   extendzip)r*   tensor_parallel_size_rT   pipeline_parallel_size_rU   dp_world_sizedata_parallel_groupsdp_group_sizerW   	dp_striderZ   pp_stage_nextrN   dsexpert_parallel_groupsexpert_data_parallel_groupsdp_rankspart_ep_groupsexpert_dp_ranksr   r   r   _get_expert_parallel_ranks3  sN   

rm   c                 C   s  t  sJ dt|}|at  }t  }| }|du r dnt|}t|| t||  t	d| d| d|  d| d| 
d	g d
|  }|t
vr|tvrt||| ||\}	}
|	D ]}t |}|t|v rm|t|< q\|
D ]}t |}|t|v r|t
|< qpdS dS dS )a  
        Create expert and data parallel groups based on MPU (model parallel) group.

        Note: Caller of this function is responsible to check if the groups already exist.

        Example - E + M + D parallel
        world_size = 16
        model_degree = 2
        expert_degree = 4 # number of experts in same group
        mp_group = [0, 1], [2,3], [4,5] ...
        data_parallel_group =[0,2,4,6,8,10, 12,14],                 [1,3,5,7,9,11,13,15]
        expert_parallel_group = [0,2,4,6], [8,10,12,14]             [1,3,5,7], [9,11,13,15]
        expert_data_parallel_group = [0,8],[2,10],[4,12],[6,14],    [1,9],[3,11],[5,13],[7,15]
    dist is not initializedNr	   z3Creating deepspeed groups with model parallel size z, pipeline parallel size z, expert parallel size z, world size z, dp world size r   rP   )r   rH   r   !expert_tensor_parallel_world_sizer   r9   rA   r   r   r   rQ   rS   rm   rK   rR   )rT   r   rU   r`   r*   r7   rb   rV   rX   rh   ri   rG   r/   r   r   r   &_create_expert_data_and_model_parallelx  sL   





rp   c                  C   sV   t dusJ dg } t  D ]}d}| t|d|  qt| dkr)t| S dS )z4Get the maximum ep_size from all the created groups.Nz&Warning! Process group not initialized   _r   )rS   keysr]   intsplitlenmax)keylistkeyindexr   r   r   _get_max_expert_size  s   r{   c                   C   s   dt   S )z+Get the name of the group with max. ep_sizerP   )r{   r   r   r   r   _get_max_expert_size_name  s   r|   c                   C   s
   t t S )z!Get the max expert parallel size.)_get_expert_parallel_groupr|   r   r   r   r   _get_max_expert_parallel_group  s   
r~   c                 C      | t v sJ dt |  S )z9Get the expert parallel group the caller rank belongs to.z(expert parallel group is not initializedrS   rX   r   r   r   r}        
r}   c                   C      t S )z#Get the expert parallel group dict.r   r   r   r   r   _get_expert_parallel_group_dict     r   c                 C   r   )z>Get the expert data parallel group the caller rank belongs to.z-expert data parallel group is not initializedrQ   r   r   r   r   _get_expert_data_parallel_group  r   r   c                   C   r   )z(Get the expert data parallel group dict.r   r   r   r   r   $_get_expert_data_parallel_group_dict  r   r   c                   C   s0   t  sJ dtdu rt jtt  datS )a  Create a clone of the world group
        Note: We need to clone the dist world group because we
        use dist.get_global_rank() utility function in DeepSpeed at many places.
        As that function does not work on dist.group.WORLD, we
        need to keep a clone of it.
    rn   NrF   )r   rH   _WORLD_GROUPrK   rJ   r   r   r   r   r   _clone_world_group  s   r   c                     s\  t  sJ dt   t    } | dkrAt  dkrAt  dks'J dg }tt  D ]| q/t j|dtd< tS | dkr`t   ksOJ dt jdd	 t D dtd< tS t   ksjJ d
t| D ] fdd	t D }t j|dtd < qnt D ]g }t| D ]}||    qt j|dtd < qtS )Nrn   r   r	   z/num_gpus must >=1, cannot initialize All-To-AllrF   local_0zCnum_gpus not equal to device per node, cannot initialize All-To-Allc                 S   s   g | ]}|qS r   r   ).0rN   r   r   r   
<listcomp>  s    z/_get_local_all_to_all_group.<locals>.<listcomp>z(num_nodes<2 cannot initialize All-To-Allc                    s   g | ]}|   qS r   r   )r   jdevice_per_noderN   r   r   r     s    local_global_)	r   rH   r   device_countr   rJ   r]   rK   _ALL_TO_ALL_GROUP)	num_localcur_rank
local_rankr   r   r   r   _get_local_all_to_all_group  s8   
r   c                   C   s:   t  sJ dtdurtjddS tdurt S t S )r&   rn   Nr   r   )r   rH   r    r   r   r'   r   r   r   r   r   _get_data_parallel_group  s   r   c                   C   s   t t dS Nr   )r   get_global_rank!_get_sequence_data_parallel_groupr   r   r   r   _get_broadcast_src_rank  s   r   c                 C   s   t t| dS r   )r   r   r   r   r   r   r   _get_expert_broadcast_src_rank  s   r   c                 C      t jt| dS )z0Return world size for the expert parallel group.r.   )r   r   r}   r   r   r   r   _get_expert_parallel_world_size     r   c                 C   r   )z5Return world size for the expert data parallel group.r.   )r   r   r   r   r   r   r   $_get_expert_data_parallel_world_size!  r   r   c                 C   r   )z-Return my rank for the expert parallel group.r.   )r   r9   r}   r   r   r   r   _get_expert_parallel_rank&  r   r   c                 C   s   t  }t| }|| | S )z^Calculate the global rank corresponding to a local rank zero
    in the expert parallel group.)r   r9   r   )rX   r<   r=   r   r   r   _get_expert_parallel_src_rank+  s   r   c                 C   r   )z2Return my rank for the expert data parallel group.r.   )r   r9   r   r   r   r   r   _get_expert_data_parallel_rank3  r   r   c                   C   s8   t durtt jddS tdurt S tjt dS )r@   Nr   r   r.   )r    r   r   r   r   rA   r   r   r   r   r   _get_data_parallel_world_size8  s
   r   c                   C   s   t durt  S dS )/Return world size for the model parallel group.Nr	   )r   r4   r   r   r   r   _get_model_parallel_world_sizeB  s   r   c                   C   r?   rC   )r   r9   r   r   r   r   r   _get_data_parallel_rankJ  rB   r   c                   C   s8   t durtt jddS tdurttdrt S dS )z2Return world size for the sequence parallel group.Nsequence_parallelr    get_sequence_parallel_world_sizer	   )r    r   r   r   r   hasattrr   r   r   r   r   !_get_sequence_parallel_world_sizeO  s
   r   c                   C   s8   t durtt drt  S tdurttjddS dS )z/Return my rank for the sequence parallel group.Nget_sequence_parallel_rankr   r   r   )r   r   r   r    r   r9   r   r   r   r   r   _get_sequence_parallel_rankY  s
   r   c                   C   s6   t d u s	tt dstd u rtdtjddS t  S )Nget_sequence_parallel_groupz No sequence parallel group foundr   r   )r   r   r    KeyErrorr   r   r   r   r   r   _get_sequence_parallel_groupc  s
   r   c                   C       t durtt drt  S t S )r   N%get_sequence_data_parallel_world_size)r   r   r   r   r   r   r   r   &_get_sequence_data_parallel_world_sizel     r   c                   C   r   )rD   Nget_sequence_data_parallel_rank)r   r   r   r   r   r   r   r    _get_sequence_data_parallel_rankt  r   r   c                   C   s    t d urtt drt  S t S )N get_sequence_data_parallel_group)r   r   r   r   r   r   r   r   r   |  s   r   c                   C   r   r3   )ro   r   r   r   r   %_get_expert_model_parallel_world_size  r   r   c                 C   s   t  sJ tdu sJ dt  }t  }t| |}t|| t|| D ]}t|| |d | }t |}||| kr@|aq&dS )a  
        Create parameter partitioning group within ZeRO data parallel groups.

        Example - ZP + D parallel
        world_size = 16
        zero_hpz_partition_size = 2 # number of ranks with replicated params (dual partitioning)
        zero_param_intra_parallel_group = [0, 1], [2,3], [4,5], [6,7], [8,9] - segmented (subgroup) with rep partition
        data_parallel_group = [0,1,...,15] - all reduce is on ZeRO model
    Nz:ZeRO parameter intra parallel group is already initializedr	   )	r   rH    _ZERO_PARAM_INTRA_PARALLEL_GROUPr   r9   rI   r   rJ   rK   )
group_sizer*   r7   zero_param_parallel_size_rN   rG   r/   r   r   r   !_create_zero_param_parallel_group  s   




r   c                   C   r   )zTGet the ZeRO parameter partitioning intra parallel group the caller rank belongs to.)r   r   r   r   r   $_get_zero_param_intra_parallel_group  s   r   c                   C   s   t du r
tdu rdS dS dS )zQCheck if ZeRO data parallel with parameter partititioning groups are initialized.NF)r   r   r   r   r   r   #_zero_param_parallel_is_initialized  s   r   c                   C   r?   )z;Return my rank for the ZeRO parameter inter parallel group.r.   )r   r9   r   r   r   r   r   ._get_zero_param_intra_parallel_rank_in_mygroup  rB   r   c                   C   r?   )z8Return world size for the ZeRO parameter parallel group.r.   )r   r   r   r   r   r   r   /_get_zero_param_intra_parallel_group_world_size  rB   r   c                   C   r?   )z=Return all ranks for the ZeRO parameter intra parallel group.r.   )r   get_all_ranks_from_groupr   r   r   r   r   *_get_zero_param_intra_parallel_group_ranks  rB   r   )r	   N)F)r	   F)N__doc__	deepspeedr   r   deepspeed.utilsr   deepspeed.utils.bwcr   r   deepspeed.utils.exceptionsr   deepspeed.acceleratorr   rS   rQ   r   r   r   ro   r   r    r   r   r   r   r   r)   r6   r!   r#   r%   r'   r+   r0   r4   r8   r:   r;   r>   rA   rE   rO   r[   rm   rp   r{   r|   r~   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s   

	
1J

E6


		