o
    Tiv                     @   s   d dl Z d dlZd dlZd dlmZ d dlZd dlmZ d dl	m
Z d dlmZ ddlmZ ddlmZ dd	lmZmZ d d
lmZ d dlmZ d dlmZ G dd deZG dd dZG dd deZG dd dej Z!dS )    N)partial)comm)logger   )utils)checkpointing   )PipeDataParallelTopologyPipelineParallelGrid)SDLoaderFactory)get_accelerator)clone_tensors_for_torch_savec                   @   s   e Zd ZdZdS )PipelineErrorz6Errors related to the use of deepspeed.PipelineModule N)__name__
__module____qualname____doc__ r   r   Q/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/pipe/module.pyr      s    r   c                   @   s*   e Zd ZdZdd Zdd Zd
ddZd	S )	LayerSpecaU  Building block for specifying pipeline-parallel modules.

    LayerSpec stores the type information and parameters for each stage in a
    PipelineModule. For example:

    .. code-block:: python

        nn.Sequence(
            torch.nn.Linear(self.in_dim, self.hidden_dim, bias=False),
            torch.nn.Linear(self.hidden_hidden, self.out_dim)
        )

    becomes

    .. code-block:: python

        layer_specs = [
            LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False),
            LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)]
        ]
    c                 O   sF   || _ || _|| _t|tjstdt rt	 | _
d S d| _
d S )Nz.LayerSpec only supports torch.nn.Module types.)typenamemodule_argsmodule_kwargs
issubclassnnModuleRuntimeErrordistis_initializedget_rankglobal_rank)selfr   r   r   r   r   r   __init__5   s   
zLayerSpec.__init__c                 C   s   t | jj| j| jS N)ds_utilscall_to_strr   r   r   r   r"   r   r   r   __repr__B   s   zLayerSpec.__repr__Fc                 C   s4   |rt d| j dt|   | j| ji | jS )zBuild the stored specification.zRANK=z
 building )r   infor!   reprr   r   r   )r"   logr   r   r   buildE   s   zLayerSpec.buildNF)r   r   r   r   r#   r(   r,   r   r   r   r   r      s
    r   c                       s&   e Zd Zddgd fdd
Z  ZS )TiedLayerSpecNweight)
forward_fntied_weight_attrc                   sH   t  j|g|R i | || _|| _t|tkr|g| _d S || _d S r$   )superr#   keyr0   typestrr1   )r"   r3   r   r0   r1   r   r   	__class__r   r   r#   O   s   "zTiedLayerSpec.__init__)r   r   r   r#   __classcell__r   r   r6   r   r.   M   s    r.   c                       s2  e Zd ZdZddddddddejddf fdd	Zd	d
 Zdd Zdd Z	dd Z
dd Zdd ZdDddZedejjdedejfddZdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( ZdEd)d*Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Z d5d6 Z!d7d8 Z"dFd9d:Z#dGd<d=Z$d>d? Z%d@dA Z&dBdC Z'  Z(S )HPipelineModuleaK	  Modules to be parallelized with pipeline parallelism.

    The key constraint that enables pipeline parallelism is the
    representation of the forward pass as a sequence of layers
    and the enforcement of a simple interface between them. The
    forward pass is implicitly defined by the module ``layers``. The key
    assumption is that the output of each layer can be directly fed as
    input to the next, like a ``torch.nn.Sequence``. The forward pass is
    implicitly:

    .. code-block:: python

        def forward(self, inputs):
            x = inputs
            for layer in self.layers:
                x = layer(x)
            return x

    .. note::
        Pipeline parallelism is not compatible with ZeRO-2 and ZeRO-3.

    Args:
        layers (Iterable): A sequence of layers defining pipeline structure. Can be a ``torch.nn.Sequential`` module.
        num_stages (int, optional): The degree of pipeline parallelism. If not specified, ``topology`` must be provided.
        topology (``deepspeed.runtime.pipe.ProcessTopology``, optional): Defines the axes of parallelism axes for training. Must be provided if ``num_stages`` is ``None``.
        loss_fn (callable, optional): Loss is computed ``loss = loss_fn(outputs, label)``
        seed_layers(bool, optional): Use a different seed for each layer. Defaults to False.
        seed_fn(type, optional): The custom seed generating function. Defaults to random seed generator.
        base_seed (int, optional): The starting seed. Defaults to 1234.
        partition_method (str, optional): The method upon which the layers are partitioned. Defaults to 'parameters'.
        activation_checkpoint_interval (int, optional): The granularity activation checkpointing in terms of number of layers. 0 disables activation checkpointing.
        activation_checkpoint_func (callable, optional): The function to use for activation checkpointing. Defaults to ``deepspeed.checkpointing.checkpoint``.
        checkpointable_layers (list[str], optional): List of layer class names that are eligible for checkpointing. For GPT models,
            ParallelTransformerLayerPipe is always checkpointed regardless of this list. If None, all layers with parameters are
            considered checkpointable. Defaults to None.
        dynamic_shape: Allows dynamic shapes of inputs. This might have a performance impact.
    NFi  
parametersr   c                    s2  t    |d u r|d u rtdd| _|| _|| _|d ur't|ts'J d|| _|| _	|| _
t dkrWz| j	j}W n tyG   d }Y nw td| j d| j
 d|  tjtt d| _tj| jd| _tj| jd| _ttjd	d | _| jd usJ |r|| _| jd
| _n*|| _|d u r| j| j dkrtd| j d| j d| j| }t||d}|| _t| j| jd| _ | j!| jj"| _#t|| _$t%| j$| _&d| _'d | _(| j)|d g | _*i | _+t,- | _.i | _/|	| _0|
| _1g | _2d | _3| 4  | 5t6 7| j | 8 | _9| :  || _;d S )Nz#must provide num_stages or topologyr   z3param `checkpointable_layers` must be type of list.zSEED_LAYERS=z BASE_SEED=z	 SEED_FN=ranksgroup
LOCAL_RANKpipeznum_stages (z&) must divide distributed world size ())num_ppnum_dp)process_grouptopology)method)<r2   r#   r   micro_offsetloss_fncheckpointable_layers
isinstancelistseed_layersseed_fn	base_seedr   r    r   AttributeErrorprint	new_grouprangeget_world_sizeworld_groupr!   
world_sizeintosenvironget
local_rank_topoget_dim
num_stagesr	   r
   _grid	get_coordr@   stage_id_layer_specslen_num_layers_local_start_local_stop_partition_layersforward_funcsfwd_mapr   
ModuleDicttied_modulestied_weight_attrsactivation_checkpoint_intervalactivation_checkpoint_funcis_checkpointable_results"is_checkpointable_results_interval_buildtor   device_name_index_tied_modules
tied_comms_synchronize_tied_weightsdynamic_shape)r"   layersr]   rE   rH   rL   rM   rN   partition_methodrl   rm   rI   rv   seed_strdpr6   r   r   r#   }   sn   





zPipelineModule.__init__c                 C   s~   | j dkr;| j| j kr=t| j}d| _td|| j D ]}t|| j  |}| j|| }| j| 	| q| j | _d S d S d S )Nr   F)
rl   ro   rb   rg   interval_was_zerorR   minrn   append_is_checkpointable)r"   
num_layers	start_idxend_idxfuncsr   r   r   !_precompute_checkpointable_values   s   
z0PipelineModule._precompute_checkpointable_valuesc                 C   s  | j }t|| j| j D ]\}}|| j }| jr-| jr%| | j|  nt| j|  t	|t
r6tdt	|tjrZt|}| j| | j|t| jd i | || qt	|tr|j| jvrt| | j|j< |j| j|j< |jd u r| j| j|j  q| jt|j| j|j  qt	|tr| }t|}| j| | j|t| jd i | || q| j| q|  D ]}d|_qd S )Nz#RECURSIVE BUILD NOT YET IMPLEMENTEDr   F) ra   	enumeraterd   re   rL   rM   rN   r%   set_random_seedrJ   r9   NotImplementedErrorr   r   r5   rg   r}   rh   updaterb   
add_moduler.   r3   rj   r,   r1   rk   r0   r   r   r:   ds_pipe_replicated)r"   specs	local_idxlayer	layer_idxnamemodulepr   r   r   rp      s>   




zPipelineModule._buildc                 C   sF   t |tr| }dd | D S t |tjr!dd | D S g S )z} Get names of frozen parameters in the layer.

            Returns:
                A list of frozen parameter names
        c                 S      g | ]	\}}|j s|qS r   requires_grad.0nr   r   r   r   
<listcomp>&      z>PipelineModule._get_frozen_parameter_names.<locals>.<listcomp>c                 S   r   r   r   r   r   r   r   r   (  r   )rJ   r   r,   named_parametersr   r   )r"   r   lr   r   r   _get_frozen_parameter_names  s   
z*PipelineModule._get_frozen_parameter_namesc                 C   s   dgt | j }t| jD ]<\}}t|tr/| }tdd | }tdd |D ||< qt|t	j
rItdd | }tdd |D ||< q|S )zCount the trainable parameters in individual layers.

        This routine will only build one layer at a time.

        Returns:
            A list of the number of parameters in each layer.
        r   c                 S      | j S r$   r   r   r   r   r   <lambda>8      z4PipelineModule._count_layer_params.<locals>.<lambda>c                 s       | ]}|  V  qd S r$   numelr   r   r   r   r   	<genexpr>9      z5PipelineModule._count_layer_params.<locals>.<genexpr>c                 S   r   r$   r   r   r   r   r   r   ;  r   c                 s   r   r$   r   r   r   r   r   r   <  r   )rb   ra   r   rJ   r   r,   filterr:   sumr   r   )r"   param_countsidxr   r   paramsr   r   r   _count_layer_params,  s   
z"PipelineModule._count_layer_paramsc              	   C   s   g }t |t j}t| jD ]4\}}d }t|tr|jj}nt|t	j
r)|jj}nz|j}W n	 ty7   Y qw ||rB|| qt|dkrQtd| d|S )Nr   zPartitioning 'z%' found no valid layers to partition.)regexcompile
IGNORECASEr   ra   rJ   r   r   r   r   r   r7   rO   searchr}   rb   r   )r"   	layernameidxs	typeregexr   r   r   r   r   r   _find_layer_type?  s&   





zPipelineModule._find_layer_typec           
         s     j d7  _  fdd} jdkr |dt j}||}|S t j}|}ttd| j jD ]1\}}t| j |} j|| }	t|t	sM|f}|r\ j
|||g|R  }q2|||| }q2|S )Nr   c                    s    j d  fdd}|S )z Helper function to be used with checkpoint()
            Adapted from torch.utils.checkpoint:checkpoint_sequential()
            r   c                     sz   t | dkr
| d } tj  D ]'\}}|j _jr6j j }jr1| nt	| || } q| S )Nr   r   )
rb   r   rg   rd   
curr_layerrL   rN   rM   r%   r   )inputsr   r   new_seed)endlocal_micro_offsetr"   startr   r   	exec_func`  s   

zBPipelineModule.forward.<locals>.exec_range_func.<locals>.exec_func)rG   )r   r   r   r'   )r   r   r   r   exec_range_funcZ  s   
z/PipelineModule.forward.<locals>.exec_range_funcr   )rG   rl   rb   rg   ziprR   rn   r|   rJ   tuplerm   )
r"   forward_inputr   funcxr   r   is_checkpointable_resultr   r   r   r'   r   forwardT  s$   



zPipelineModule.forwarduniformc              
   C   s6  | j d}| j | jj}| jdkrtd|  | }|dkr1t| j	}t
j||d| _nN|dkrB|  }t
j||d| _n=|drk|d	d
 }dgt| j	 }| |D ]}d
||< q[t
j||d| _n|dkrwtd| dtd| d| jdkr
t|D ]\}	| j|	 }
| j|	d
  }td|	 d||
   t| j	|
| D ]8\}}t|}t|tr|jj}t|tjr|jj}nz|j}W n	 ty   Y nw td||
 dd|  qq| jr
ztd| jj  W n ty	   td| jjj  Y nw | j | j| | j|d
  d d S )Nr@   r   z)Partitioning pipeline stages with method r   )	num_items	num_partsr:   )weightsr   ztype::r   profilezPartitioning method z not implemented.zstage=z layers=z    2dz: z  loss: )r   stop)!r[   r\   r_   r!   r@   r   r)   lowerrb   ra   r%   partition_uniformpartsr   partition_balanced
startswithsplitr   r   rR   rP   r   r5   rJ   r   r   r   r   r   r7   rO   rH   _set_bounds)r"   rF   r]   r`   r   r   	layertypebinary_weightsr   stager   r   r   r   r   r   r   rf     sZ   







"z PipelineModule._partition_layersr   	attr_namereturnc                 C   s"   | }| dD ]}t||}q|S )z/Allow getting an attribute like "linear.weight".)r   getattr)r   r   r/   itemr   r   r   _recursive_getattr  s   z!PipelineModule._recursive_getattrc                 C   sL   | j  D ]\}}|d D ]}| | j| |}tj|j|d d qqdS )z@All reduce the gradients of the tied weights between tied stagesweight_attrr>   r=   N)rt   itemsr   rj   r   
all_reducegrad)r"   r3   r   r   r/   r   r   r   allreduce_tied_weight_gradients  s   z.PipelineModule.allreduce_tied_weight_gradientsc                 C   sN   g }| j  D ]\}}|d D ]}| | j| |}|||d f qq|S )Nr   r>   )rt   r   r   rj   r}   )r"   weight_group_listr3   r   r   r/   r   r   r   get_tied_weights_and_groups  s   z*PipelineModule.get_tied_weights_and_groupsc                 C   sN   | j  D ]\}}|d D ]}tj| |d |t|d |d d qqd S )Nr   r   r<   r>   )srcr>   )rt   r   r   	broadcastr   r|   )r"   r3   r   r   r   r   r   ru     s   
z(PipelineModule._synchronize_tied_weightsc                    sx  i } j ddkr|S  j}tdd |D }t|D ]}g }t|D ]\}}t|tr7|j|kr7|	| q$t fdd|D }t
 jjD ]n}	t
 j D ]d}
g }t|D ]!} j dkrp|	 jj||	|
d qZ|	 jj||	d qZtj|d} j|v r| jv sJ | jv r|| j|  j| d	||<  j|d
 kr j|  D ]}d|_qqRqIq	 |S )z2 Build communication structures for tied modules. r@   r   c                 s   s     | ]}t |tr|jV  qd S r$   )rJ   r.   r3   )r   sr   r   r   r         z5PipelineModule._index_tied_modules.<locals>.<genexpr>c                 3   s    | ]}  |V  qd S r$   )stage_owner)r   r   r'   r   r   r     s    )r`   datamodel)r`   r   r;   )r<   r>   r   r   r   T)r[   r\   ra   setsortedr   rJ   r.   r3   r}   rR   r^   data_parallel_sizeget_slice_parallel_world_sizestage_to_globalr   rQ   r!   rj   rk   r:   r   )r"   rt   r   tie_keysr3   tied_layersr   r   tied_stagesrz   mp
tied_ranksr   r>   r   r   r'   r   rs     sH   



z"PipelineModule._index_tied_modulesc                 C   r   r$   )r   r'   r   r   r   
partitions     zPipelineModule.partitionsc                 C   sx   d|  kr| j k sJ  J t| jdD ]}| j| |  kr.| j|d  k r0|  S  qqtd| d| j )Nr   r@   r   zLayer z not owned? parts=)rc   rR   r[   r\   r   r   )r"   r   r   r   r   r   r     s   "zPipelineModule.stage_ownerc                 C   s   || _ || _dS )a  Manually define the range of layers that will be built on this process.

        These boundaries are treated as list slices and so start is inclusive and stop is
        exclusive. The default of None for both results in all layers being built
        locally.
        N)rd   re   )r"   r   r   r   r   r   r     s   
zPipelineModule._set_boundsc                 C   s   |dksJ || _ d S )Nr   )checkpoint_interval)r"   intervalr   r   r   set_checkpoint_interval(  s   
z&PipelineModule.set_checkpoint_intervalc                 C   r   )z3 ProcessTopology object to query process mappings. )r[   r'   r   r   r   rE   ,  s   zPipelineModule.topologyc                 C   r   r$   )r^   r'   r   r   r   mpu0  r   zPipelineModule.mpuc                 C   s   | j dS )Nr@   )r[   r\   r'   r   r   r   num_pipeline_stages3  s   z"PipelineModule.num_pipeline_stagesc                    sx   d}t dg  fdd| jj D }|D ]}t| jjj| jd|}|d| d|d7 }qtj	|t
||}|S )	z@Build a prefix for all checkpoint files written by this module. r   r   c                    s   g | ]}| vr|qS r   r   )r   a	omit_dimsr   r   r   >  s    z.PipelineModule.ckpt_prefix.<locals>.<listcomp>rank-_02d)	frozensetr^   r[   get_axis_namesr   r_   r!   rW   pathjoinr5   )r"   checkpoints_pathtag	rank_nameaxesdimr   	ckpt_namer   r   r   ckpt_prefix6  s   
zPipelineModule.ckpt_prefixc                 C   sT   || j  }tj|d|d}| jjj| jd}|dkr$|d| 7 }|d7 }|S )z9Customize a prefix for a specific pipeline module layer. layer_r  r    r   z-model_states.pt)rd   rW   r  r  r^   r[   get_rank_reprr!   )r"   ckpt_dirlocal_layer_idxr   layer_ckpt_path	rank_reprr   r   r   ckpt_layer_pathF  s   
zPipelineModule.ckpt_layer_pathc                 C   s@   || j  }tj|d|dd}|d7 }t|}|  |S )z=Get all ckpt file list for a specific pipeline module layer. r  r  r   z*model_states.pt)rd   rW   r  r  globsort)r"   r  r  r   r  
ckpt_filesr   r   r   ckpt_layer_path_listP  s   

z#PipelineModule.ckpt_layer_path_listc                 C   s   | j j}| j j}t| j}| jr"t||}|| ||d  }}	n|dkr(d S d|}}	| j||	 }
|j|dd t	|
D ]-\}}| 
||| }t|dsQq?| }|rb| |D ]}||= q\t|}||| q?d S )Nr   r   T)exist_ok
state_dict)r^   data_parallel_idr   rb   rg   "checkpoint_parallel_write_pipeliner%   r   makedirsr   r  hasattrr  r   r   save)r"   save_dircheckpoint_engineexclude_frozen_paramsdp_rankdp_sizer   offsetsr   r   
layer_listr   r   model_ckpt_pathorig_state_dictr   final_state_dictr   r   r   save_state_dictY  s,   


zPipelineModule.save_state_dictTc                 C   s   t | jD ]5\}}t|dsq| ||}| j }| j }tj|d|d}	|	j	||d dd\}
}}|j
||d q|   d S )Nload_state_dictg       @)versionr!  T)
module_keyis_pipe_parallel)strict)r   rg   r  r  r^   get_slice_parallel_rankr   r   get_sd_loaderloadr+  ru   )r"   load_dirr!  r/  r   r   model_ckpt_listmp_rankmp_world_size	sd_loader	load_path
checkpointr   r   r   r   load_state_dirz  s   


zPipelineModule.load_state_dirc                    sn    j tjur jjdv rt fdd|D S  jd ur't fdd|D S dd |D }tdd |D S )N)GPTModelPipeGPT2ModelPipec                 3   s4    | ]}d |j jv p jduo|j j jv V  qdS )ParallelTransformerLayerPipeNr7   r   rI   r   fr'   r   r   r     s    
z4PipelineModule._is_checkpointable.<locals>.<genexpr>c                 3   s    | ]
}|j j jv V  qd S r$   r>  r?  r'   r   r   r     s    c                 S   s"   g | ]}t |tjjr| qS r   )rJ   torchr   r   r:   r?  r   r   r   r     s   " z5PipelineModule._is_checkpointable.<locals>.<listcomp>c                 s   s     | ]}t t|d kV  qdS )r   N)rb   rK   r   r   r   r   r     r   )rm   r   non_reentrant_checkpointr7   r   allrI   any)r"   r   r   r   r'   r   r~     s   
z!PipelineModule._is_checkpointablec                 C   s   dS )z Returns model specific additional losses for reporting

         Return a dictionary of {"loss name": loss_value} or None if no additional losses.
        Nr   r'   r   r   r   get_additional_losses  s   z$PipelineModule.get_additional_lossesc                 O   sX   t | jD ]$\}}t|tjr|j|i | qtj|g|R i |}|| j|< qd S r$   )r   rg   rJ   r   r   r   rA  )r"   argskwargsr   r   	new_layerr   r   r   r     s   zPipelineModule.compile)r   )NNr-   )T))r   r   r   r   r   r9  r#   r   rp   r   r   r   r   rf   staticmethodrA  r   r   r5   Tensorr   r   r   ru   rs   r   r   r   r   rE   r   r   r  r  r  r*  r:  r~   rE  r   r8   r   r   r6   r   r9   V   sT    (a
6
55	6



	
!r9   )"rW   r  rer   	functoolsr   rA  torch.nnr   	deepspeedr   r   deepspeed.utilsr   r  r   r%   activation_checkpointingr   rE   r	   r
   $deepspeed.runtime.state_dict_factoryr   deepspeed.acceleratorr   deepspeed.checkpoint.utilsr   	Exceptionr   r   r.   r   r9   r   r   r   r   <module>   s$   /	