o
    TiC                     @   sl   d dl mZ d dlmZ d dlmZ G dd dZdd Z	G dd	 d	eZ
G d
d deZG dd dZdS )    )comm)
namedtuple)productc                   @   st   e Zd ZdZdd Zdd Zdd Zdd	gd
dfddZdd Zdd Z	dd Z
dd Zdd Zdd Zdd ZdS )ProcessTopologyai   Manages the mapping of n-dimensional Cartesian coordinates to linear
    indices. This mapping is used to map the rank of processes to the grid
    for various forms of parallelism.

    Each axis of the tensor is accessed by its name. The provided ordering
    of the axes defines the layout of the topology. ProcessTopology uses a "row-major"
    layout of the tensor axes, and so axes=['x', 'y'] would map coordinates (x,y) and
    (x,y+1) to adjacent linear indices. If instead axes=['y', 'x'] was used, coordinates
    (x,y) and (x+1,y) would be adjacent.

    Some methods return ProcessCoord namedtuples.
    c                    sv   |_ |_td|_i _dd |D }tt| D ]\}  fddj D }jdi |}|j|< qdS )zCreate a mapping of n-dimensional tensor coordinates to linear indices.

        Arguments:
            axes (list): the names of the tensor axes
            dims (list): the dimension (length) of each axis of the topology tensor
        ProcessCoordc                 S   s   g | ]}t |qS  )range).0dr   r   S/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/pipe/topology.py
<listcomp>)   s    z,ProcessTopology.__init__.<locals>.<listcomp>c                    s   i | ]}| j | qS r   )axesindex)r	   axiscoordselfr   r   
<dictcomp>,   s    z,ProcessTopology.__init__.<locals>.<dictcomp>Nr   )r   dimsr   r   mapping	enumeratecartesian_product)r   r   r   rangesglobal_rankkeyr   r   r   __init__   s   zProcessTopology.__init__c                 K   sN   t |t | jkrtd| jdi |}|| jv s"J d| d| j| S )zReturn the global rank of a process via its coordinates.

        Coordinates are specified as kwargs. For example:

            >>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
            >>> X.get_rank(x=0, y=1)
            1
        z7get_rank() does not support slices. Use filter_match())zkey z invalidNr   )lenr   
ValueErrorr   r   )r   coord_kwargsr   r   r   r   get_rank1   s
   	
zProcessTopology.get_rankc                 C      | j S )zAReturn a list of the axis names in the ordering of the topology. )r   r   r   r   r   get_axis_namesA      zProcessTopology.get_axis_namesdatapipe_-c           	         s`   t    fdd|  D }g }|D ]}t| j|d|}|| | |d q||S )a  Return a string representation of a rank.

        This method is primarily used for checkpointing model data.

        For example:
            >>> topo = Topo(axes=['a', 'b'], dims=[2, 2])
            >>> topo.get_rank_repr(rank=3)
            'a_01-b_01'
            >>> topo.get_rank_repr(rank=3, omit_axes=['a'])
            'b_01'

        Args:
            rank (int): A rank in the topology.
            omit_axes (list, optional): Axes that should not be in the representation. Defaults to ['data', 'pipe'].
            inner_sep (str, optional): [description]. Defaults to '_'.
            outer_sep (str, optional): [description]. Defaults to '-'.

        Returns:
            str: A string representation of the coordinate owned by ``rank``.
        c                    s   g | ]}| vr|qS r   r   r	   a	omit_axesr   r   r   [       z1ProcessTopology.get_rank_repr.<locals>.<listcomp>rank02d)	frozensetr"   getattr	get_coordappendjoin)	r   r.   r+   	inner_sep	outer_sepr   namesaxax_rankr   r*   r   get_rank_reprE   s   
zProcessTopology.get_rank_reprc                 C   s    || j vrdS | j| j | S )zReturn the number of processes along the given axis.

        For example:
            >>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
            >>> X.get_dim('y')
            3
        r   )r   r   r   )r   r   r   r   r   get_dimb   s   
zProcessTopology.get_dimc                 C   s4   | j  D ]\}}||kr|  S qtd| d)aW  Return the coordinate owned by a process rank.

        The axes of the returned namedtuple can be directly accessed as members. For
        example:
            >>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
            >>> coord = X.get_coord(rank=1)
            >>> coord.x
            0
            >>> coord.y
            1
        zrank z not found in topology.)r   itemsr   )r   r.   r   idxr   r   r   r2   n   s
   zProcessTopology.get_coordc                    s    j vrg S  fddj D g }fddD }t| D ]1fddD }g }t D ]}jdi | |i}|j|  q5|| q |S )a   Construct lists suitable for a communicator group along axis ``axis``.

        Example:
            >>> topo = Topo(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
            >>> topo.get_axis_comm_lists('pipe')
            [
                [0, 4], # data=0, model=0
                [1, 5], # data=0, model=1
                [2, 6], # data=1, model=0
                [3, 7], # data=1, model=1
            ]

        Returns:
            A list of lists whose coordinates match in all axes *except* ``axis``.
        c                    s   g | ]}| kr|qS r   r   r(   )r   r   r   r      r,   z7ProcessTopology.get_axis_comm_lists.<locals>.<listcomp>c                    s   g | ]	}t  |qS r   )r   r;   r(   r!   r   r   r      s    c                    s   i | ]
}|  | qS r   )r   r(   )r   
other_axesr   r   r      s    z7ProcessTopology.get_axis_comm_lists.<locals>.<dictcomp>Nr   )r   r   r   r;   r   r3   r   )r   r   listsr   
other_keyssub_listaxis_keyr   r   )r   r   r>   r   r   get_axis_comm_lists   s   
z#ProcessTopology.get_axis_comm_listsc                    s.    fdd}t |j }fdd|D S )aV  Return the list of ranks whose coordinates match the provided criteria.

        Example:
            >>> X = ProcessTopology(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
            >>> X.filter_match(pipe=0, data=1)
            [2, 3]
            >>> [X.get_coord(rank) for rank in X.filter_match(pipe=0, data=1)]
            [ProcessCoord(pipe=0, data=1, model=0), ProcessCoord(pipe=0, data=1, model=1)]

        Arguments:
            **filter_kwargs (dict): criteria used to select coordinates.

        Returns:
            The list of ranks whose coordinates match filter_kwargs.
        c                    s*      D ]\}}t| ||kr dS qdS )NFT)r<   r1   )xr   val)filter_kwargsr   r   _filter_helper   s
   z4ProcessTopology.filter_match.<locals>._filter_helperc                    s   g | ]} j | qS r   r   )r	   r   r!   r   r   r      s    z0ProcessTopology.filter_match.<locals>.<listcomp>)filterr   keys)r   rF   rG   coordsr   )rF   r   r   filter_match   s   zProcessTopology.filter_matchc                    s,   j |  fddj D }|S )a/  Returns the list of global ranks whose coordinate in an axis is idx.

        For example:
            >>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
            >>> X.get_axis_list(axis='x', idx=0)
            [0, 1, 2]
            >>> X.get_axis_list(axis='y', idx=0)
            [0, 3]
        c                    s"   g | ]}|  krj | qS r   rH   )r	   kaxis_numr=   r   r   r   r      s   " z1ProcessTopology.get_axis_list.<locals>.<listcomp>)r   r   r   rJ   )r   r   r=   ranksr   rN   r   get_axis_list   s   zProcessTopology.get_axis_listc                 C   
   t | jS N)r   r   r!   r   r   r   
world_size      
zProcessTopology.world_sizec                 C   rR   rS   )strr   r!   r   r   r   __str__   rU   zProcessTopology.__str__N)__name__
__module____qualname____doc__r   r   r"   r:   r;   r2   rC   rL   rQ   rT   rW   r   r   r   r   r      s    (r   c                 C   s^   | dkrt dg }| dkr-td| d D ]}| | dkr(|| | | }  nq| dks|S )z8 Returns the prime factorization of positive integer N. r   z!Values must be strictly positive.      )r   r   r3   )Nprimes	candidater   r   r   _prime_factors   s   
ra   c                           e Zd ZdZ fddZ  ZS )PipeDataParallelTopologya&   A topology specialization for hybrid data and pipeline parallelism.

        Uses data parallelism on the last dimension to encourage gradient
        reductions to use high-bandwidth intra-node links and lower-volume
        pipeline communications to use low-bandwidth inter-node links.
    c                    s   t  jddg||gd d S )Nr%   r$   r   r   superr   )r   num_ppnum_dp	__class__r   r   r      s   z!PipeDataParallelTopology.__init__rX   rY   rZ   r[   r   __classcell__r   r   ri   r   rc      s    rc   c                       rb   )PipeModelDataParallelTopologyz> A topology for hybrid pipeline, model, and data parallelism. c                    s   t  jg d|||gd d S )N)r%   r$   modelrd   re   )r   rg   num_mprh   ri   r   r   r      s   z&PipeModelDataParallelTopology.__init__rk   r   r   ri   r   rm      s    rm   c                   @   s   e Zd ZdZd+ddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdS ),PipelineParallelGrida  Implements a grid object that stores the data parallel ranks
    corresponding to each of the model parallel stages

    The grid object organizes the processes in a distributed pytorch job
    into a 2D grid, of stage_id and data_parallel_id.

    self.stage_id and self.data_parallel_id stores the stage id
    and the data parallel id of current process.

    self.dp_group groups the processes by stage_id.
    self.dp_group[i], is a list containing all process ranks whose
    stage_id is i.

    self.p2p_groups stores a list of tuple, where each tuple
    stores process ranks of adjacent stages for a given data_parallel_id.
    For example if num_stage is 5 then a tuple [7,8] represents stages [3, 4],
    with data_parallel id = 1. A stage wrap around will appear as non-adjacent ranks,
    for example tuple [4,0] with representing wrap-around stage 4 and 0, for
    data_parallel_id = 0, or similarly [9,5] represents wrapped around stages [4,0]
    for data_parallel_id = 1.
    Nc                 C   s  t  | _t  | _|d ur| jdkrtd| || _n&d}d}tt| jD ]\}}|d dkr6||9 }q'||9 }q't	||d| _t
| jdd| _t
| jdd| _t
| jdd| _| j| _|  slJ d	|  | _|  | _d | _d
| _t| jD ],}t| jjd|d}| jdkr	 t j|d}	| j|v r|	| _t|| _|| j| _q| jd
ksJ | jd usJ g | _| jd| _ | j D ]}
t j|
d}	| j|
v r|
| _|	| _!q| jdk| _"| j| jd k| _#| $ | _%g | _&d | _'| jd| _(| j(D ]}| jdkr
	 t j|d}	| j|v r|| _&|	| _'q| j'd us&J | jdkrNt| jD ]}|g}t j|d}|d | jkrJ|| _)|| _*q1d S g | _+| jd| _,| j,D ]}
t j|
d}	| j|
v ro|
| _)|	| _*q[d S )Nr   zUsing topology:r\   r]   )rh   rg   r$   r%   rn   zInvalid Grid)r   r=   )rP   )-distr   r   get_world_sizerT   print_topor   ra   rc   maxr;   data_parallel_sizepipe_parallel_sizemodel_parallel_sizeslice_parallel_size_is_grid_validget_stage_idstage_idget_data_parallel_iddata_parallel_idds_model_proc_groupds_model_rankr   sortedrQ   	new_groupr   ds_model_world_sizer   dp_grouprC   	dp_groupsdp_proc_groupis_first_stageis_last_stage_build_p2p_groups
p2p_groupspp_grouppp_proc_grouppipe_groupsslice_groupslice_proc_groupmp_groupmodel_groups)r   topologyprocess_grouprg   rh   r=   primedprP   
proc_groupg
group_rankgroupr   r   r   r     s   















zPipelineParallelGrid.__init__c                 C      | j j| jdjS Nr-   )ru   r2   r   r%   r!   r   r   r   r|   o     z!PipelineParallelGrid.get_stage_idc                 C   r   r   )ru   r2   r   r$   r!   r   r   r   r~   r  r   z)PipelineParallelGrid.get_data_parallel_idc                 C   s   | j d}g }t| jD ]+}|D ]&}t|| jksJ ||v r7||}||d | j  }|||g  nqqt|| jksBJ |S )ziGroups for sending and receiving activations and gradients across model
        parallel stages.
        r%   r\   )ru   rC   r   rT   r   rx   r   r3   )r   
comm_lists	p2p_listsr.   lr=   
buddy_rankr   r   r   r   u  s   
z&PipelineParallelGrid._build_p2p_groupsc                 C   s0   d}| j  D ]
}|| j |9 }q|t kS )Nr\   )ru   r"   r;   rr   rs   )r   rP   r8   r   r   r   r{     s   z#PipelineParallelGrid._is_grid_validc                 K   s8   | j | j}|jdd|i| }| j jdi |S )Nr%   r   )ru   r2   r   _replace_asdictr   )r   r}   kwargsme	transformr   r   r   stage_to_global  s   z$PipelineParallelGrid.stage_to_globalc                 C   r    rS   )ru   r!   r   r   r   r        zPipelineParallelGrid.topologyc                 C   r    rS   )r   r!   r   r   r   get_global_rank  r   z$PipelineParallelGrid.get_global_rankc                 C   s   |   S )z1 The stage of the pipeline this rank resides in. )r|   r!   r   r   r   get_pipe_parallel_rank  s   z+PipelineParallelGrid.get_pipe_parallel_rankc                 C   r    )z' The number of stages in the pipeline. )rx   r!   r   r   r   get_pipe_parallel_world_size  r#   z1PipelineParallelGrid.get_pipe_parallel_world_sizec                 C   r    )z. The group of ranks within the same pipeline. )r   r!   r   r   r   get_pipe_parallel_group  r#   z,PipelineParallelGrid.get_pipe_parallel_groupc                 C   r    )z& Which pipeline this rank resides in. )r   r!   r   r   r   get_data_parallel_rank  r#   z+PipelineParallelGrid.get_data_parallel_rankc                 C   r    )z The number of pipelines. )rw   r!   r   r   r   get_data_parallel_world_size  r#   z1PipelineParallelGrid.get_data_parallel_world_sizec                 C   r    )z< The group of ranks within the same stage of all pipelines. )r   r!   r   r   r   get_data_parallel_group  r#   z,PipelineParallelGrid.get_data_parallel_groupc                 C   r    rS   )r   r!   r   r   r   get_model_parallel_rank  r   z,PipelineParallelGrid.get_model_parallel_rankc                 C   r    rS   )r   r!   r   r   r   get_model_parallel_world_size  r   z2PipelineParallelGrid.get_model_parallel_world_sizec                 C   r    rS   )r   r!   r   r   r   get_model_parallel_group  r   z-PipelineParallelGrid.get_model_parallel_groupc                 C   s$   d| j  v r| j j| jdjS dS )Nrn   r-   r   )ru   r"   r2   r   rn   r!   r   r   r   get_slice_parallel_rank  s   z,PipelineParallelGrid.get_slice_parallel_rankc                 C   r    rS   )rz   r!   r   r   r   get_slice_parallel_world_size  r   z2PipelineParallelGrid.get_slice_parallel_world_sizec                 C   r    rS   )r   r!   r   r   r   get_slice_parallel_group  r   z-PipelineParallelGrid.get_slice_parallel_group)NN)rX   rY   rZ   r[   r   r|   r~   r   r{   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rp      s,    
]rp   N)	deepspeedr   rr   collectionsr   	itertoolsr   r   r   ra   rc   rm   rp   r   r   r   r   <module>   s    N