o
    TÃiÃø  ã                   @   s`  d dl mZ d dlmZ d dlmZ d dlmZ d dlZd dl	m
Z d dlmZ d dlmZ d d	lmZ d d
lmZ ddlmZmZ d dlmZmZmZmZmZmZmZmZmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z) ddl*m+Z+m,Z, ddl-m.Z. ddl-m/Z/ dZ0dZ1dZ2dZ3dZ4dZ5dZ6dZ7dZ8dZ9dd„ Z:d a;d a<d d!„ Z=G d"d#„ d#eƒZ>dS )$é    )Ú
MethodType)ÚOrderedDict)Úreduce)ÚmulN)Úcomm)Úlogger)ÚThroughputTimer)Úget_accelerator)ÚBF16_Optimizeré   )ÚDeepSpeedEngineÚMEMORY_OPT_ALLREDUCE_SIZE)
ÚFORWARD_MICRO_TIMERÚFORWARD_GLOBAL_TIMERÚBACKWARD_MICRO_TIMERÚBACKWARD_GLOBAL_TIMERÚBACKWARD_INNER_MICRO_TIMERÚBACKWARD_INNER_GLOBAL_TIMERÚBACKWARD_REDUCE_MICRO_TIMERÚBACKWARD_REDUCE_GLOBAL_TIMERÚSTEP_MICRO_TIMERÚSTEP_GLOBAL_TIMER)ÚPartitionedTensor)ÚRepeatingLoader)ÚZeroStageEnum)Úcheckpointingé   )ÚPipelineModuleÚPipelineError)Úp2p)ÚscheduleéþÿÿÿÚbatch_inputÚtrain_batchÚpipe_send_outputÚpipe_send_gradÚpipe_recv_inputÚpipe_recv_gradé   c                 C   s   | d dkS )Nr   r   © )Únumberr)   r)   úQ/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/pipe/engine.pyÚis_even1   s   r,   c                 C   s   |   ¡ |  ¡  S ©N)ÚnumelÚelement_size)Útensorr)   r)   r+   Ú_tensor_bytes9   s   r1   c                       s2  e Zd ZdZejejejejej	ej
ejejejejejejgZdd„ eeƒD ƒZda‡ fdd„	Zdd„ Zd	d
„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdbdd„Z					dcdd„Z‡ fdd„Zdd„ Zd d!„ Z ddd"d#„Z!dejfd$d%„Z"d&d'„ Z#d(d)„ Z$d*d+„ Z%d,d-„ Z&d.d/„ Z'd0d1„ Z(d2d3„ Z)d4d5„ Z*‡ fd6d7„Z+‡ fd8d9„Z,d:d;„ Z-d<d=„ Z.d>d?„ Z/d@dA„ Z0dBdC„ Z1dDdE„ Z2dFdG„ Z3dbdHdI„Z4dJdK„ Z5dedMdN„Z6dOdP„ Z7dQdR„ Z8dSdT„ Z9dUdV„ Z:dfdWdX„Z;dadYdZ„Z<dg‡ fd[d\„	Z=e>j?e4e>j@ee>jAee>jBe-e>jCe+e>jDe,e>jEe0e>jFe2e>jGe1e>jHe3i
ZId]d^„ ZJd_d`„ ZK‡  ZLS )hÚPipelineEnginezµ A training engine hybrid pipeline, data, and model parallel training.

    This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule`
    is provided.
    c                 C   s   i | ]\}}||“qS r)   r)   )Ú.0Úid_Údtyper)   r)   r+   Ú
<dictcomp>G   ó    zPipelineEngine.<dictcomp>Fc                    s.  t ƒ j|i |¤Ž t| jtƒsJ dƒ‚|  ¡ tjk sJ dƒ‚d| _|| _	d| _
d | _t| jƒtk| _d| _|  ¡ rE|  ¡ sE|  ¡ rEJ dƒ‚d| _|  ¡ | _|  ¡ | _| jj| _| j ¡ dkrkt d| j› d	| j› ¡ | j ¡ | _| j| jjkszJ ‚|   ¡ | j| j | jj ksŠJ ‚| jj!| _"| j #¡ | _$| j$d
 | _%| j$d
 | _&d | _'d | _(d| _)t*| j+j,|   ¡ | j-d|  .¡ d| _/| j0rÄ|  1| j0¡ | jj!d
k| _2| jjd
k| _3| jj4d
k| _5t| j+j6d t7ƒsäJ ‚t| j+j6d t7ƒsïJ ‚| j5o÷| j+j6d | _8| j5o| j+j6d | _9t d| j8› d| j9› ¡ t:dd„ | j ;¡ ƒ}t<dd„ |D ƒƒ}|}| jj=rVd}| jj= >¡ D ]\}}	| jt?|	d ƒkrP|t<dd„ |	d  ;¡ D ƒƒ7 }q3||8 }t@jA||gd B| jC¡}
tDjE|
| j F¡ d |
 G¡ }
|
d }|
d
 }| jjHdkr»t d| j› d| j$› d| jjI| jjJ › d| jjJ› d| jjI› d|› d |d! d"›d#|› d |d! d"›d$|› d |d! d"›d%¡ | j2rÅtK L| j¡ d| _Mg g g g d&œ| _Nd | _Od | _Pg | _Qd | _Rd| _Sd| _Td | _Ud | _Vd | _Wd | _Xt@ Yd'¡ B| jC¡| _Zd | _[d | _\t@jYd'dd( B| jC¡| _]t@jYd'dd( B| jC¡| _^d | __d | _`| j+j6d) dkrX| j+j6d) | j_a| j+j6 bd*¡d u r=d| j+j6d*< | j+j6d* du rXtcjd| j_e| j ¡ dkrXt d+¡ | jjadkrd| j f¡  | j+jg| j_g|  h¡ rt| jji| _j| jjkjld,k| _mtn| j$ƒr|  h¡ stK o| jZ| j&¡ |  p¡ sœtK q| jZ| j%¡ n|  p¡ sªtK q| jZ| j%¡ |  h¡ s·tK o| jZ| j&¡ |  r¡ r|  stt¡ u¡  |  stt¡ v¡  |  stw¡ u¡  |  stw¡ v¡  |  stx¡ u¡  |  stx¡ v¡  |  sty¡ u¡  |  sty¡ v¡  |  stz¡ u¡  |  stz¡ v¡  |  st{¡ u¡  |  st{¡ v¡  | jj|| _|d S )-Nzmodel must base PipelineModulez<ZeRO-2 and ZeRO-3 are incompatible with pipeline parallelismFTz@Elasticity is not currently supported with pipeline parallelism.éÿÿÿÿr   zCONFIG: micro_batches=z micro_batch_size=r   )Ú
batch_sizeÚ
logging_fnÚmonitor_memoryÚsteps_per_outputÚpipe_partitionedÚgrad_partitionedzis_pipe_partitioned= z is_grad_partitioned= c                 S   ó   | j S r-   ©Úrequires_grad)Úpr)   r)   r+   Ú<lambda>•   s    z)PipelineEngine.__init__.<locals>.<lambda>c                 S   ó   g | ]}|  ¡ ‘qS r)   ©r.   ©r3   rB   r)   r)   r+   Ú
<listcomp>–   ó    z+PipelineEngine.__init__.<locals>.<listcomp>Úranksc                 s   ó    | ]}|  ¡ V  qd S r-   rE   rF   r)   r)   r+   Ú	<genexpr>   ó   € z*PipelineEngine.__init__.<locals>.<genexpr>Úmodule)Údata©ÚgroupúRANK=z STAGE=z LAYERS=z [z, z) STAGE_PARAMS=z (g    €„.Aú0.3fzM) TOTAL_PARAMS=zM) UNIQUE_PARAMS=zM))ÚinputsÚlabelsÚoutputsÚoutput_tensorsç        r@   Úactivation_checkpoint_intervalÚuse_reentrantz;CONFIG: activation_checkpoint_func=non_reentrant_checkpointÚGPT2ModelPipe)}ÚsuperÚ__init__Ú
isinstancerM   r   Úzero_optimization_stager   Ú	gradientsÚenable_backward_allreduceÚhas_bool_tensorsÚeval_return_logitsrU   ÚtypeÚ	optimizerr
   Úusing_bf16_optimizerÚ"pipeline_enable_backward_allreduceÚelasticity_enabledÚ#is_elastic_model_parallel_supportedÚlog_batch_step_idÚtrain_micro_batch_size_per_gpuÚmicro_batch_sizeÚgradient_accumulation_stepsÚmicro_batchesÚ_gridÚgridÚget_global_rankr   ÚinfoÚglobal_rankÚdp_world_sizeÚdata_parallel_sizeÚtrain_batch_sizeÚpipe_parallel_sizeÚ
num_stagesÚget_stage_idÚstage_idÚ
prev_stageÚ
next_stageÚdata_iteratorÚbatch_fnÚ_force_grad_boundaryr   Ú_configÚtimers_configÚtput_logÚsteps_per_printÚbatch_timerÚtraining_dataÚ_build_data_iterÚis_pipe_parallelÚis_data_parallelÚmodel_parallel_sizeÚis_model_parallelÚpipelineÚboolÚis_pipe_partitionedÚis_grad_partitionedÚfilterÚ
parametersÚsumÚ
tied_commsÚitemsÚminÚtorchÚ
LongTensorÚtoÚdeviceÚdistÚ
all_reduceÚget_model_parallel_groupÚtolistÚdata_parallel_idÚ_local_stopÚ_local_startr   Úinit_process_groupsÚnum_pipe_buffersÚpipe_buffersÚpipe_recv_bufÚ
grad_layerÚ_grad_layer_bufÚmeta_bufferÚfirst_output_sendÚfirst_gradient_sendÚpipe_partition_input_meta_cacheÚ pipe_partition_output_meta_cacheÚpipe_partition_grad_meta_cacheÚ$grad_partition_grad_layer_meta_cacher0   ÚlossÚ
total_lossÚtotal_additional_lossesÚagg_lossÚdp_group_lossÚagg_train_lossÚagg_additional_lossesrX   ÚgetÚds_checkpointingÚnon_reentrant_checkpointÚactivation_checkpoint_funcÚ!_precompute_checkpointable_valuesÚ"checkpoint_parallel_write_pipelineÚis_last_stageÚloss_fnÚ
loss_modelÚ	__class__Ú__name__Úhas_attention_maskr,   ÚsendÚis_first_stageÚrecvÚwall_clock_breakdownÚtimersr   ÚstartÚstopr   r   r   r   r   Údynamic_shape)Úselfra   Ú
super_argsÚsuper_kwargsÚmodel_parametersÚ
num_paramsÚunique_paramsÚtied_paramsÚkeyÚdÚparams_tensorÚtotal_params©r¼   r)   r+   r\   I   s4  ÿÿ


ÿÿ
üÿ
€ÿþýýüüûûú
ú	ü






€


zPipelineEngine.__init__c                 C   s   t |tƒsJ ‚|| _d S r-   )r]   r‹   r¾   )rÇ   Úvaluer)   r)   r+   Úset_has_attention_mask  s   
z%PipelineEngine.set_has_attention_maskc                 C   sD   t jjjj|| j| j ¡ dd}| j||d}t	|ƒ}|  
|¡ d S )NF)Únum_replicasÚrankÚshuffle)Údata_sampler)r”   ÚutilsrN   ÚdistributedÚDistributedSamplerrs   ÚmpuÚget_data_parallel_rankÚdeepspeed_ior   Úset_dataloader)rÇ   ÚdatasetÚsamplerÚpipe_dataloaderr)   r)   r+   r…   	  s   ýzPipelineEngine._build_data_iterc                 C   sV   |   ¡ r	| j ¡  | j ¡ }|D ]\}}| jr|jn|j}|d ur(tj	||d qd S )NrO   )
Ú%zero_optimization_partition_gradientsrd   Ú/overlapping_partition_gradients_reduce_epiloguerM   Úget_tied_weights_and_groupsre   Ú_hp_gradÚgradr˜   r™   )rÇ   Úweight_group_listÚweightrP   rç   r)   r)   r+   Ú_exec_reduce_tied_grads  s   	

€ýz&PipelineEngine._exec_reduce_tied_gradsc                 C   s2   d| _ | jr| jr|  ¡  n| jtd d| _ d S )NT)Úbucket_sizeF)r~   rf   re   Ú_bf16_reduce_gradsÚallreduce_gradientsr   ©rÇ   r)   r)   r+   Ú_exec_reduce_grads%  s   

z!PipelineEngine._exec_reduce_gradsc                 C   s   | j d td d S )N)ÚgradsÚelements_per_buffer)Úbuffered_allreduce_fallbackr   rî   r)   r)   r+   rì   /  s   z!PipelineEngine._bf16_reduce_gradsc                 C   sD   | j |krdS || j  }| jD ]}| j|  dg| ¡ q|| _ dS )zåEnsure that each pipeline buffer has at least ``num_buffers`` slots.

        This method only reserves slots and does not allocate tensors.

        Args:
            num_buffers (int): The number of buffers to reserve.
        N)r    r¡   Úextend)rÇ   Únum_buffersÚ	num_addedrÎ   r)   r)   r+   Ú_reserve_pipe_buffers2  s   



z$PipelineEngine._reserve_pipe_buffersc                 C   s:   d| _ d| _d| _g | _d| _d| _d| _d| _d| _dS )zæReset the buffers when the shape of activation and gradient change.
        For example, for curriculum learning that changes the seqlen of each
        sample, we need to call this whenever the seqlen is going to change.
        TN)	r¦   r¢   r£   r¤   r¥   r¨   r©   rª   r«   rî   r)   r)   r+   Úreset_activation_shapeB  s   
z%PipelineEngine.reset_activation_shapeNc           
      C   sd  t j ¡ s	tdƒ‚|  ¡ r5| j | jd ¡}| jdks| jjr(|  	¡  d| j_n|| j 
| j¡kr5|  	¡  |dur>|  |¡ | j ¡  d| _d| _d| _|  t¡ ¡  tj| j| j| jd}|  |¡ t  ¡  |  ¡ | _W d  ƒ n1 sww   Y  |  t¡ ¡  |  ¡ durð| j|  ¡  dkrð| jdkrç|  t¡j ddd	 }||  ¡  }|  !¡ | }d
| j› d| jd›d}| j"durÖ| j" #¡ D ]\}}	||› d|	 $¡ d›d7 }qÄ|d|d›d|d›7 }t%|ƒ n	|  t¡j dd | jdkr| j&j'rd| j (¡  $¡ | j)fg| _*| j& +| j*¡ |  ¡ dur/|  ,¡ r/| j|  ¡  dkr/| j -t.t/t0t1g¡ | jS )añ  Progress the pipeline to train the next batch of data. The engine will ingest
        ``self.train_batch_size()`` total samples collectively across all workers.


        An iterator that over training data should be provided as an argument
        unless ``deepspeed.initialize()`` was provided a training set. In that event,
        the training data will automatically be read.


        .. warning::
            A total of ``self.gradient_accumulation_steps()`` entries will be pulled
            from ``data_iter`` by each pipeline. There must be sufficient
            data left in ``data_iter`` or else a ``StopIteration`` will halt training.

            DeepSpeed provides a convenience class :class:`deepspeed.utils.RepeatingLoader`
            that wraps data loaders to automatically restart upon a ``StopIteration``.

        Args:
            data_iter (Iterator, optional): Iterator of training data.

        Returns:
            The arithmetic mean of the losses computed this batch.
        zCtrain_batch() requires gradients enabled. Use eval_batch() instead.r   r   FNT©rm   Ústagesry   )Úresetg     @@zsteps: z loss: z0.4fú z: ziter time (s): rR   z samples/sec: zTrain/Samples/train_loss)2r”   Ú_CÚis_grad_enabledÚRuntimeErrorÚcurriculum_enabled_legacyÚcurriculum_scheduler_legacyÚupdate_difficultyÚglobal_stepsÚ
first_stepr÷   Úget_difficultyÚset_dataiteratorrM   Útrainr­   r®   Ú_compute_lossrÃ   ÚTRAIN_BATCH_TIMERrÄ   r    ÚTrainSchedulerm   rw   ry   Ú_exec_scheduleÚno_gradÚ_aggregate_total_lossr±   rÅ   r‚   rr   Úelapsedru   r²   r’   ÚitemÚprintÚmonitorÚenabledÚmeanÚglobal_samplesÚsummary_eventsÚwrite_eventsrÂ   ÚlogÚPIPE_SEND_OUTPUT_TIMERÚPIPE_SEND_GRAD_TIMERÚPIPE_RECV_INPUT_TIMERÚPIPE_RECV_GRAD_TIMER)
rÇ   Ú	data_iterÚnew_difficultyÚschedr  Ú	iter_timeÚtputÚlog_strÚ	loss_nameÚ
loss_valuer)   r)   r+   r#   R  sn   
ÿ
ÿ

þ

ÿ


ÿüzPipelineEngine.train_batchTÚavgc                 C   sz  || _ | j ¡  |  ¡ r4| j | jd ¡}| jdks| jjr'|  ¡  d| j_n|| j 	| j¡kr4|  ¡  d}|| _
| j}	|  |¡ |du rH| jn|}
tj|
| j| jd}t ¡  t ¡  |  |¡ W d  ƒ n1 slw   Y  |  ¡ r~| j| j||
d}|r‹|s†| jjr‹|  |¡}| jdkr§| jjr§d| ¡  ¡ | j fg| _!| j "| j!¡ |  |	¡ d| _ |r»| j#}d| _#||fS |S )aÈ  Evaluate the pipeline on a batch of data from ``data_iter``. The
        engine will evaluate ``self.train_batch_size()`` total samples
        collectively across all workers.

        This method is equivalent to:

        .. code-block:: python

            module.eval()
            with torch.no_grad():
                output = module(batch)

        .. warning::
            A total of ``self.gradient_accumulation_steps()`` entries will be pulled
            from ``data_iter`` by each pipeline. There must be sufficient
            data left in ``data_iter`` or else a ``StopIteration`` will halt training.

            DeepSpeed provides a convenience class :class:`deepspeed.utils.RepeatingLoader`
            that wraps data loaders to automatically restart upon a ``StopIteration``.

        Args:
            data_iter (Iterator): Iterator of data to evaluate.

        Returns:
            The arithmetic mean of the losses computed this batch.
        r   r   FNrø   )r   rm   zTrain/Samples/eval_loss)$rb   rM   Úevalrÿ   r   r  r  r  r÷   r  r  r|   r  rm   r    ÚInferenceSchedulerw   ry   r˜   Úbarrierr”   r  r
  r¹   Ú_reduce_outputsÚfwd_outputsr  r  Ú_bcast_pipe_scalarrr   r  r  r  r  r  rU   )rÇ   r  Úreturn_logitsÚcompute_lossÚreduce_outputÚ
bcast_lossÚnum_micro_batchesr  Úeval_outputÚtrain_iteratorrm   r  rU   r)   r)   r+   Ú
eval_batch¬  sJ   !
ÿ
ÿ

ÿ

zPipelineEngine.eval_batchc                    s   t ƒ  |¡ |  ¡ | _dS )aÞ  Adjust the global batch size by increasing or decreasing the number of
        micro-batches (i.e., gradient accumulation steps). The size of each micro-batch
        (i.e., ``train_micro_batch_size_per_gpu``) is not changed.
        Args:
            train_batch_size (int): The new global batch size for training.
        Raises:
            ValueError: if ``train_batch_size`` is not divisible by the
                configured micro-batch size and data parallelism.
        N)r[   Úset_train_batch_sizerl   rm   )rÇ   ru   rÒ   r)   r+   r2    s   
z#PipelineEngine.set_train_batch_sizec                 C   s
   | j dkS )z;True if this process is in the first stage in the pipeline.r   )ry   rî   r)   r)   r+   rÀ     s   
zPipelineEngine.is_first_stagec                 C   s   | j | jd kS )z:True if this process is in the last stage in the pipeline.r   )ry   rw   rî   r)   r)   r+   r¹     s   zPipelineEngine.is_last_stagec                 C   s  |d u r|S |  ¡ dkr{t |d ¡rt|ƒ}n!t|ttfƒs!J ‚dd„ |d D ƒ}|D ]\}}||  |7  < q,| j||d}|ry| jryt |¡r[t	j
|| j ¡ d || j }|S tt|ƒƒD ]}t	j
|| | j ¡ d ||  | j  < qa|S td|› dƒ‚)	Nr#  r   c                 S   ó   g | ]}t  |¡‘qS r)   ©r”   Ú
zeros_like)r3   Úor)   r)   r+   rG   $  r7   z2PipelineEngine._reduce_outputs.<locals>.<listcomp>)Úeval_micro_batchesrO   zreduction type z not supported.)Úlowerr”   Ú	is_tensorr   r]   ÚlistÚtupleÚ_scale_loss_by_gasr‡   r˜   r™   rÜ   Úget_data_parallel_grouprs   ÚrangeÚlenÚNotImplementedError)rÇ   rU   r   Ú	reduce_dprm   ÚreducedÚidxÚoutr)   r)   r+   r'    s(   



üzPipelineEngine._reduce_outputsc                 C   s„   |d u r| j  | jd ¡}|| j jv sJ ‚| j|kr(| ¡  ¡  |¡ | j	¡}nt
 dg¡ |¡ | j	¡}tj||| j ¡ d |S )Nr   rW   ©r0   ÚsrcrP   )ro   Ústage_to_globalrw   Úpp_grouprr   ÚcloneÚdetachrc   r–   r—   r”   ÚTensorr˜   Ú	broadcastrÜ   Úget_pipe_parallel_group)rÇ   rN   Úsrc_rankr5   Úresultr)   r)   r+   r)  9  s   
z!PipelineEngine._bcast_pipe_scalarc                    s”  ˆ  ¡ rèˆ ˆj¡}ˆjˆ_ˆjd ur"t‡fdd„ˆj ¡ D ƒƒˆ_| ¡  ¡ ˆ_	ˆj	 ¡  ¡ }ˆj
r´ˆjd u rHtj|ˆj ¡ d |ˆj }nldˆj ¡ vsQJ ‚td|iƒ}| ˆj ¡ ¡ t dd„ | ¡ D ƒ¡}tj|ˆj ¡ d |ˆj }d}i ‰| ¡ D ]\}}| ¡ }|||| …  ¡  ¡  |j¡ˆ|< ||7 }q‚ˆd }t‡fdd„ˆj ¡ D ƒƒˆ_ˆjˆjjv s½J ‚ˆj	|g‰ ˆjd urÐˆ tˆj ¡ ƒ7 ‰ t ˆ ¡ ¡ ‰ ˆjrætjˆ ˆjˆj  ¡ d	 |S ˆj !ˆj"d
 ¡}	|	ˆjjv sùJ ‚ˆj# $¡ }
|
d u rdnt%|
ƒ}t &dgd|  ¡ 'ˆj(¡‰ tjˆ |	ˆj  ¡ d	 ˆ d  ¡  ¡ ˆ_	ˆ d
  ¡  ¡ }|
d urHt‡ fdd„t)|
 ¡ ƒD ƒƒˆ_|S )Nc                    s$   i | ]\}}|ˆ   | ¡  ¡ ¡“qS r)   )r<  rI  rJ  )r3   r!  Ú_lossrî   r)   r+   r6   O  s    ÿÿz8PipelineEngine._aggregate_total_loss.<locals>.<dictcomp>rO   Ú__train_loss__c                 S   s   g | ]}|  ¡  d ¡ ¡ ‘qS ©r8   )rI  ÚreshaperJ  ©r3   Útr)   r)   r+   rG   b  s    z8PipelineEngine._aggregate_total_loss.<locals>.<listcomp>r   c                    s   i | ]}|ˆ | “qS r)   r)   )r3   Úname)Úreduced_tensorr)   r+   r6   m  s    ÿrE  r   rW   r   c                    s&   i | ]\}}|ˆ d |    ¡  ¡ “qS )r   )rI  rJ  )r3   ÚirV  )Úlossesr)   r+   r6   „  s    ÿÿ)*r¹   r<  r­   r®   r²   r   r’   rI  rJ  r°   r‡   r˜   r™   rÜ   r=  rs   ÚkeysÚupdater”   ÚcatÚvaluesr.   rS  Úshaperr   ro   rH  r:  ÚstackÚfloatr†   rL  rM  rG  rw   rM   Úget_additional_lossesr?  rK  r–   r—   Ú	enumerate)rÇ   r¬   r¯   ÚtensorsÚflat_tensorÚoffsetrV  rU  Ún_elemrN  Úadditional_lossesÚn_additional_lossesr)   )rY  rW  rÇ   r+   r  H  sh   

þ

$

ÿÿ

ñ



þz$PipelineEngine._aggregate_total_lossc                 C   s*   |   ¡ s|  ¡ r|| _t| jƒ| _dS dS )Ú N)rÀ   r¹   Útraining_dataloaderÚiterr|   )rÇ   Úloaderr)   r)   r+   rß   Š  s   þzPipelineEngine.set_dataloaderc                 C   s$   |   ¡ s|  ¡ rd| _|| _dS dS )z0 Store an iterator to sample for training data. N)rÀ   r¹   rj  r|   )rÇ   Úiteratorr)   r)   r+   r    s   
þzPipelineEngine.set_dataiteratorc                 C   s
   || _ dS )zyExecute a post-processing function on input data.

        Args:
            fn (function): The function to run.
        N)r}   )rÇ   Úfnr)   r)   r+   Úset_batch_fn–  s   
zPipelineEngine.set_batch_fnc                 C   r?   )aF  True if the engine is executing a gradient reduction or optimizer step instruction.

        This is overridden from :class:`DeepSpeedEngine` to force reductions
        and steps when the pipeline engine is instructed to do so.

        Returns:
            bool: whether reductions and optimizer steps should occur.
        )r~   rî   r)   r)   r+   Ú!is_gradient_accumulation_boundaryž  s   	z0PipelineEngine.is_gradient_accumulation_boundaryc                 G   sv   t | jks	t dkr7t| jjkstdkr9tdt ¡ › d| j› d| jj› d| j› d| j	› dg|¢R dd	iŽ d S d S d S )
Nr8   rQ   z	 PIPE-ID=z	 DATA-ID=z MBATCH-ID=z	 STEP-ID=z ::ÚflushT)
Ú	LOG_STAGEry   ÚDATA_PARALLEL_IDro   rœ   r  r˜   Úget_rankÚmicrobatch_idri   ©rÇ   Úmsgr)   r)   r+   Úlog_for_device©  s&   ÿþýüÿù
øþzPipelineEngine.log_for_devicec                 G   s0   | j dkr| j|  ¡  dkrt|Ž  d S d S d S )Nr   )rr   r  r‚   r  rv  r)   r)   r+   r   ¶  s   ÿzPipelineEngine.tput_logc                 C   s,   d }| j d urt| j ƒ}| jr|  |¡}|S r-   )r|   Únextr}   )rÇ   Úbatchr)   r)   r+   Ú_next_batchº  s   


zPipelineEngine._next_batchc                    sæ  | j  ¡  | jddd t| jd | tƒr%tdd„ | jd | D ƒƒ}n	| jd |  ¡ }| jrv|  ¡ sv| j	d u rB|d  
d¡| _	tj| j	|d	 | j ¡ d
}| ¡ g|dd … ¢R }d|d _d }t|ƒd	krm|d n|}|| jd |< tƒ  |¡}| jjs„t ¡  | jrß|  ¡ sßt|tƒrª|d }tdd„ |d	d … D ƒƒs£J ‚|d	d … }nt |¡r´|}g }ntdƒ‚t|| j ¡ d}tjd	|jjd|_|| jd |< |  ¡ | ¡ g|¢R }d }|| jd |< |  ¡ rm| j!r| jj"d ur| jd | }| j "||¡| _#n|| _#| j$r|| _%t| j#tj&ƒr"| j' (| j# )¡ ¡ n| j' (dd„ | j#D ƒ¡ dd„ }	|	| j*| j#ƒ| _*| j +¡ }
|
d uro| j,d u rNt-ƒ | _,|
 .¡ D ]\}}|| j,v ra| j,| nd }|	||ƒ| j,|< qRd S d S d S )Nz
BEFORE FWDT©Ú	reset_maxrS   c                 s   rJ   r-   )rI  rT  r)   r)   r+   rK   Ë  rL   z4PipelineEngine._exec_forward_pass.<locals>.<genexpr>r   Úcpur   ©ÚmetaÚ
local_partrP   r   c                 S   s    g | ]}t  |¡o|jd u ‘qS ©F)r”   r9  rA   ©r3   Úeltr)   r)   r+   rG   ì  s     z5PipelineEngine._exec_forward_pass.<locals>.<listcomp>ú(expecting a tensor or a tuple of tensors©r0   rP   )r—   rV   rU   rT   c                 S   rD   r)   )rJ  )r3   Úlr)   r)   r+   rG     rH   c                 S   sn   t |tjƒr| d u rt |¡} | | ¡ 7 } | S | d u r"dd„ |D ƒ} t|ƒD ]\}}| |  | ¡ 7  < q&| S )Nc                 S   r3  r)   r4  )r3   Ú_lr)   r)   r+   rG     r7   zPPipelineEngine._exec_forward_pass.<locals>.add_to_total_loss.<locals>.<listcomp>)r]   r”   rK  r5  rJ  rb  )Ú_total_lossrP  Ú_idxrˆ  r)   r)   r+   Úadd_to_total_loss  s   
üz<PipelineEngine._exec_forward_pass.<locals>.add_to_total_loss)/Ú
tput_timerrÄ   Ú
mem_statusr]   r¡   r;  rI  rŒ   rÀ   r¨   r–   r   Ú	from_metaro   Úget_slice_parallel_groupÚfullrA   r?  r[   ÚforwardrM   Útrainingr´   rú   r¹   Úallr”   r9  Ú
ValueErrorÚzerosrN   r—   Úto_metar  rº   r¬   rb   rU   rK  r(  ÚappendrJ  r­   ra  r®   r   r’   )rÇ   Ú	buffer_idrS   Ú
part_inputrU   Úfirst_outputÚoutputs_tailÚpartrT   r‹  rg  rV  r¬   ÚtotalrÒ   r)   r+   Ú_exec_forward_passÆ  sr   

þ





Üz!PipelineEngine._exec_forward_passc                    s   | j d us	J dƒ‚| jddd |  ¡ r"tƒ  | j¡ |  d¡ d S | jd | }|  ¡ rI|  t	¡ 
¡  |  t¡ 
¡  |  t¡ 
¡  |  t¡ 
¡  | jr| jr„| jd u r\|d  d¡| _tj| j|d	 | j ¡ d
}| ¡ | jd | _| jd | g|dd … ¢R }n|d | jd | _| jd | g|d	d … ¢R }| j}| jrÎ| jd u r±| jd  d¡| _tj| j| jd	 | j ¡ d
}| ¡ g|dd … ¢R }d }| jrÚ|  ¡ sÚ| j  ¡  t|tƒrùdd„ |D ƒ}t|ƒt|ƒksðJ ‚tj j||d n
tj j|f|fd | jr|  ¡ s| j!j"s| j j#dd d | jd |< d | jd |< d }|  ¡ rI|  t¡ $¡  |  t¡ $¡  |  t	¡ $¡  |  t¡ $¡  |  d¡ d S )Nz;must provide optimizer during init in order to use backwardz
BEFORE BWDTr|  z	AFTER BWDrU   r   r~  r   r  rV   r   c                 S   s   g | ]}|  ¡ r|‘qS r)   )Úis_floating_pointrT  r)   r)   r+   rG   [  s    z6PipelineEngine._exec_backward_pass.<locals>.<listcomp>)rc  Úgrad_tensorsF)Úclear_lp_grads)%rd   r  r¹   r[   Úbackwardr¬   r¡   rÂ   rÃ   r   rÄ   r   r   r   rŒ   r   r©   r–   r   rŽ  ro   r  r  rN   r£   r«   re   r¡  r]   r;  r?  r”   Úautogradr   Úbfloat16_immediate_grad_updateÚupdate_hp_gradsrÅ   )rÇ   r˜  rU   Úpart_outputr   Ú	part_gradÚout_tensorsrÒ   r)   r+   Ú_exec_backward_pass$  sj   

þ 
þ



z"PipelineEngine._exec_backward_passc                 C   s¸  |   ¡ r|  t¡ ¡  |  ¡ }|  ¡ r„d }t |d ¡r<|d  ¡  	| j
¡ ¡ }| jjd dkr;| jjd r;| ¡ |_nAt|d ttfƒsGJ ‚g }|d D ]+}t |¡sVJ ‚| ¡  ¡  	| j
¡}| jjd dkrs| jjd rs| ¡ |_| |¡ qMt|ƒ}|| jd |< |  ¡ rÍ|d }t |d ¡rœ|d  	| j
¡}n*t|d ttfƒrÆg }|d D ]}t |¡s´J ‚| 	| j
¡ ¡ }| |¡ q«t|ƒ}|| jd |< |   ¡ rÚ|  t¡ ¡  d S d S )Nr   rX   rY   rS   r   rT   )rÂ   rÃ   ÚBATCH_INPUT_TIMERrÄ   r{  rÀ   r”   r9  rI  r–   r—   rJ  r   rŠ   rŸ  rA   r]   r;  r:  r—  r¡   r¹   rÅ   )rÇ   r˜  rz  ÚloadedÚxÚminer)   r)   r+   Ú_exec_load_micro_batchs  sN   ÿ
€ÿ
ÿz%PipelineEngine._exec_load_micro_batchc                 C   sh  t jtt j| jd}t|t jƒrOd| j|j t	| 
¡ ƒg}| | 
¡ ¡ t	|ƒtks6J dt› dt	|ƒ› ƒ‚|dt	|ƒ…  t j|t jd¡ t ||¡ dS t|tƒr«dt	|ƒg}|D ]#}t|t jƒsfJ ‚| | j|j ¡ | t	| 
¡ ƒ¡ | | 
¡ ¡ q\t	|ƒtks’J dt› dt	|ƒ› ƒ‚|dt	|ƒ…  t j|t jd¡ t ||¡ dS tdt|ƒ› ƒ‚)	a   Communicate metadata about upcoming p2p transfers.

        Metadata is communicated in this order:
            * type (0: tensor, 1: list)
            * num_tensors if type=list
            foreach tensor in buffer:
                * ndims
                * shape
        ©r5   r—   r   z7Buffer for metadata is too small. Current buffer size: z but required N)r5   r   zCould not send meta type )r”   ÚemptyÚTENSOR_META_SIZEÚint32r—   r]   rK  ÚDTYPE_TO_IDr5   r?  Úsizeró   Úcopy_r0   r   r¿   r;  r—  r@  rc   )rÇ   ÚbufferÚ
recv_stager¥   Úmeta_buf_listr0   r)   r)   r+   Ú_send_tensor_meta¡  sJ   


ýÿþþ"
êþÿþþ"ýz PipelineEngine._send_tensor_metac                 C   s*  t jtt j| jd}t ||¡ |d  ¡ }|dkr:| j|d  ¡  }|d  ¡ }|dd| …  	¡ }|  
d||¡S |dksB|dkrŒ|d  ¡ }g }d}	t|ƒD ]1}
| j||	  ¡  }||	d   ¡ }||	d |	d | …  	¡ }|	d| 7 }	| |  
|
||¡¡ qP|dkrŠt|ƒ}|S tdt|ƒ› ƒ‚)z¢Receive metadata about upcoming p2p transfers and return allocated buffers.

        Returns:
            Allocated buffer for receiving from send_stage.
        r¯  r   r   r   é   zCould not receive type )r”   r°  r±  r²  r—   r   rÁ   r  ÚID_TO_DTYPEr›   Ú_allocate_or_extend_buffersr>  r—  r;  r@  rc   )rÇ   Ú
send_stager¶  Ú	recv_typeÚ
recv_dtypeÚ
recv_ndimsÚ
recv_shapeÚnum_tensorsÚbuffersre  rC  r)   r)   r+   Ú_recv_tensor_metaÔ  s,   z PipelineEngine._recv_tensor_metac                 C   s  |   ¡ r|  t¡ ¡  | jd | }| js| jr(t|ƒ}|d  ¡ |d< t	|ƒ}| j
s.| jr8d| _|  || j¡ t|tjƒrFt || j¡ nt|t	ƒr\t|ƒD ]\}}t || j¡ qOn	tdt|ƒ› ƒ‚| jsk| jr{t|ƒ}|d  ¡ |d< t	|ƒ}|   ¡ rˆ|  t¡ ¡  d S d S )NrU   r8   FzCould not send output of type )rÂ   rÃ   r  rÄ   r¡   r¾   ra   r:  Úhalfr;  rÆ   r¦   r¹  r{   r]   r”   rK  r   r¿   rb  r@  rc   r‹   rÅ   )rÇ   r˜  rU   rC  r¶  r)   r)   r+   Ú_exec_send_activationsü  s4   
ÿÿÿz%PipelineEngine._exec_send_activationsc                 C   s¶  |   ¡ r|  t¡ ¡  | jd | }| jrct|tƒr9|d }tdd„ |dd … D ƒƒs-J ‚dd„ |dd … D ƒ}nt	 
|¡rC|}g }ntdƒ‚t	 
|¡sNJ ‚t|j| j ¡ d}| ¡ | ¡ g|¢R }| jsi| jrut|ƒ}| ¡  t|ƒ}t|t	jƒr‹|jd us‚J ‚t |j| j¡ n:| jr¡t |d | j¡ t |d | j¡ n$t|ƒD ]\}}| ¡ sµ|jd u s´J ‚q¥|jd us¼J ‚t |j| j¡ q¥d | jd |< |   ¡ rÙ|  t¡ ¡  d S d S )	NrS   r   c                 S   r3  r)   )r”   r9  rƒ  r)   r)   r+   rG   *  r7   z3PipelineEngine._exec_send_grads.<locals>.<listcomp>r   c                 S   s   g | ]}|j ‘qS r)   )rç   rƒ  r)   r)   r+   rG   +  s    r…  r†  )rÂ   rÃ   r  rÄ   r¡   r   r]   r;  r“  r”   r9  r”  r   rç   ro   r  r–  rN   r¾   ra   r:  ÚpoprK  r   r¿   rz   rb  rŸ  rÅ   )rÇ   r˜  rS   Úfirst_inputÚinputs_grad_tailrœ  rC  r¶  r)   r)   r+   Ú_exec_send_grads   sF   

ÿzPipelineEngine._exec_send_gradsc                 C   sx  |   ¡ r|  t¡ ¡  d }| js| jd u r|  | j¡| _t| jt	j
ƒr8t | j| j¡ | j ¡  ¡ }| ¡ |_nnt| jtƒs@J ‚d gt| jƒ }t| jƒD ]<\}}t	 |¡sXJ ‚| jrz|dkrz|jt	jkrz| jd u rwt	j| ¡ t	j| jd| _| j}t || j¡ | ¡  ¡ ||< qM| js| jr˜|d  ¡ |d< t|ƒ}|D ]}| ¡ |_qž|| jd |< |   ¡ rº|  t¡  ¡  d S d S )Nr   r¯  r8   rS   )!rÂ   rÃ   r  rÄ   rÆ   r¢   rÄ  rz   r]   r”   rK  r   rÁ   rI  rJ  rŸ  rA   r;  r?  rb  r9  rŒ   r5   Úlongr¥   r•  r´  r—   r¾   ra   r‹   r¡   rÅ   )rÇ   r˜  ÚrecvdrC  r¶  r)   r)   r+   Ú_exec_recv_activationsX  s8   
ÿz%PipelineEngine._exec_recv_activationsc                    sÎ  ˆ   ¡ rˆ  t¡ ¡  ˆ jd | }ˆ jrMˆ jsMˆ jd u r%|d  d¡ˆ _t	j
ˆ j|d ˆ j ¡ d}| ¡ |d _|d g|dd … ¢R }|ˆ jd |< ˆ jsUˆ jd u r–t|tjƒriˆ  dt| ¡ ƒ|j¡ˆ _n-ˆ jrƒdd„ |d d… D ƒd	d„ |dd … D ƒ }nd
d„ |D ƒ}‡ fdd„t|ƒD ƒˆ _tˆ jtjƒr¦t ˆ jˆ j¡ n2t|tƒs­J ‚tˆ jƒD ]%\}}ˆ jrÐ|dkrÐ|jtjkrÐtj| ¡ tjˆ jd|_t |ˆ j¡ q²ˆ   ¡ råˆ  t¡  ¡  d S d S )NrU   r   r~  r   r  r   c                 S   s   g | ]}t | ¡ ƒ|jf‘qS r)   )r:  r´  r5   rT  r)   r)   r+   rG   ©  s    ÿz3PipelineEngine._exec_recv_grads.<locals>.<listcomp>c                 S   ó&   g | ]}|  ¡ rt| ¡ ƒ|jf‘qS r)   ©rŸ  r:  r´  r5   rT  r)   r)   r+   rG   ª  s    ÿc                 S   rÎ  r)   rÏ  rT  r)   r)   r+   rG   ­  s   & c                    s"   g | ]\}\}}ˆ   |||¡‘qS r)   )r¼  )r3   rX  r´  r5   rî   r)   r+   rG   ¯  s    
ÿÿr¯  )!rÂ   rÃ   r  rÄ   r¡   rŒ   r   rª   r–   r   rŽ  ro   r  r  rN   rÆ   r£   r]   r”   rK  r¼  r:  r´  r5   rb  r   rÁ   r{   r;  rË  r•  r—   rÅ   )rÇ   r˜  rU   r¦  Úsizes_and_dtypesrC  r¶  r)   rî   r+   Ú_exec_recv_gradsƒ  sL   
þ
ÿ
ÿÿ
þÿzPipelineEngine._exec_recv_gradsc                 C   s8  |   ¡ r|  t¡ ¡  |  t¡ ¡  | jddd d| _|  |¡ d| _|  d¡ | jdkr[| j	j
r[d|  ¡ d | jfg| _|  ¡ rTt| jdƒrT| j d	| jj| jf¡ | j	 | j¡ |   ¡ r˜|  t¡ ¡  |  t¡ ¡  | j|  ¡  dkr‚| j ttttttg¡ | j|  ¡  dkrš| j tttt tg¡ d S d S d S )
NzBEFORE STEPTr|  Fz
AFTER STEPr   zTrain/Samples/lrÚ	cur_scalezTrain/Samples/loss_scale)!rÂ   rÃ   r   rÄ   r   r  r~   Ú_take_model_steprr   r  r  Úget_lrr  r  Úfp16_enabledÚhasattrrd   r—  rÒ  r  rÅ   r  r‚   r  rª  r   r   r   r   r   r   r   r   )rÇ   Ú	lr_kwargsr)   r)   r+   Ú_exec_optimizer_stepÁ  sJ   

ÿú
ûóz#PipelineEngine._exec_optimizer_stepc                 K   sD   d|vr|   ¡ rtj|d< |  ¡ rtj|d< tj|fd| ji|¤ŽS )a   Allocate a tensor of zeros on the engine's device.

        Arguments:
            shape: the shape of the tensor to allocate
            kwargs: passed to torch.zeros()

        Returns:
            A tensor from torch.zeros() allocated on self.device.
        r5   r—   )rÕ  r”   rÅ  Úbfloat16_enabledÚbfloat16r•  r—   )rÇ   r^  Úkwargsr)   r)   r+   Ú_allocate_zerosé  s   


zPipelineEngine._allocate_zerosr8   c                 K   s<   g }|dkr	| j }t|ƒD ]}| | j|fi |¤Ž¡ q|S )Nr8   )r    r>  r—  rÜ  )rÇ   r^  rô   rÛ  rÃ  Úcountr)   r)   r+   Ú_allocate_bufferû  s   zPipelineEngine._allocate_bufferc                 C   sš   t |ƒdkrtt|ƒnd}t | jƒ|ks| j|  ¡ |k r?| j||ddd }t | jƒ|kr5| j |¡ n|| j|< | j| S | j|  ¡ d |…  |¡S )Nr   r   )r5   rô   )	r?  r   r   r¤   r.   rÞ  r—  ÚflattenÚview)rÇ   rC  r^  r5   r.   Únew_bufr)   r)   r+   r¼    s    

z*PipelineEngine._allocate_or_extend_buffersc                 O   ó   t dƒ‚©z@Disabled for pipeline parallel training. See ``train_batch()``. z2Only train_batch() is accessible in pipeline mode.©r   ©rÇ   ÚargsrÛ  r)   r)   r+   r‘    ó   zPipelineEngine.forwardc                 O   râ  rã  rä  rå  r)   r)   r+   r¢    rç  zPipelineEngine.backwardc                 O   râ  rã  rä  rå  r)   r)   r+   Ústep  rç  zPipelineEngine.stepc                 C   s   d S r-   )r  rÜ   rÝ   rr   r	   ÚsynchronizeÚreset_max_memory_cachedÚreset_max_memory_allocatedÚmemory_allocatedÚmemory_cachedÚmem_allocedÚ
mem_cachedÚmax_memory_allocatedÚmax_memory_cachedr  ry   )rÇ   rw  Ú
print_rankr}  rÖ   Únew_allocedÚ
new_cachedÚdelta_allocedÚdelta_cachedÚmax_allocedÚ
max_cachedr)   r)   r+   r    s   zPipelineEngine.mem_statusc                 C   s<   t | jtƒsJ ‚| jdusJ dƒ‚| jj| j| j|d dS )aH  Override hack to save a pipe model and return the directory path of the save.

        This method should only be called by DeepSpeed's ``save_checkpoint()``. The
        recommended way of saving a ``PipelineModule`` outside of ``save_checkpoint()``
        is ``save_state_dict()``.

        Returns:
            None
        NzNPipelineEngine expects module_state_dict() to be called from save_checkpoint())Úcheckpoint_engineÚexclude_frozen_params)r]   rM   r   Ú_curr_ckpt_pathÚsave_state_dictrù  )rÇ   Úexclude_frozen_parametersr)   r)   r+   Úmodule_state_dictJ  s   
ÿ
þz PipelineEngine.module_state_dictc                    s`   |du sJ dƒ‚| j r|n|d }|dur#t|tƒs#tƒ  ||¡ dS | jj| j|| jd dS )a~  Override hack to instead use a directory path.

        This is important because pipeline models checkpoint by layer instead of rank.

        If ``state_dict`` is not ``None`` or a ``str``, we revert to ``super()`` expecting a ``dict``.

        Args:
            state_dict (str, None): unused
            strict (bool, optional): Strict state loading. Defaults to True.
        Nz4custom_load_fn not supported w. pipeline parallelismrM   )Úload_dirÚstrictrù  )	Úhas_moe_layersr]   Ústrr[   Úload_module_state_dictrM   Úload_state_dirrû  rù  )rÇ   Ú
checkpointr   Úcustom_load_fnÚfetch_z3_paramsÚ
state_dictrÒ   r)   r+   r  ]  s   

þz%PipelineEngine.load_module_state_dictc                 C   s|   |   | ¡ ¡ g | _|D ]/}|D ]*}t|ƒ| jvr&t| jj› dt|ƒ› ƒ‚t	| jt|ƒ | ƒ| _
| j
di |j¤Ž qqd S )Nz! does not understand instruction r)   )rö   r    r(  rc   Ú_INSTRUCTION_MAPrþ   r¼   r½   Úreprr   Ú_exec_instrrÛ  )rÇ   Úpipe_scheduleÚ	step_cmdsÚcmdr)   r)   r+   r
    s   úþzPipelineEngine._exec_schedulec                 C   r?   r-   )r²   rî   r)   r)   r+   ra  ‘  s   z$PipelineEngine.get_additional_lossesr‚  r-   )FTr#  TN)r#  TNrR  )r8   F)TNF)Mr½   Ú
__module__Ú__qualname__Ú__doc__r”   Úfloat32Úfloat64Ú	complex64Ú
complex128Úfloat16rÚ  Úuint8Úint8Úint16r²  Úint64r‹   r»  rb  r³  r\   rÔ   r…   rê   rï   rì   rö   r÷   r#   r1  r2  rÀ   r¹   r'  r)  r  rß   r  ro  rp  rx  r   r{  rž  r©  r®  r¹  rÄ  rÆ  rÊ  rÍ  rÑ  rØ  rÜ  rÞ  r¼  r‘  r¢  rè  r  rþ  r  r    ÚOptimizerStepÚReduceGradsÚReduceTiedGradsÚLoadMicroBatchÚForwardPassÚBackwardPassÚSendActivationÚRecvActivationÚSendGradÚRecvGradr	  r
  ra  Ú__classcell__r)   r)   rÒ   r+   r2   =   s†    þ =


\
úY
B^O.3($8+
>(


/ör2   )?Útypesr   Úcollectionsr   Ú	functoolsr   Úoperatorr   r”   Ú	deepspeedr   r˜   Údeepspeed.utilsr   Údeepspeed.utils.timerr   Údeepspeed.acceleratorr	   Ú deepspeed.runtime.bf16_optimizerr
   Úenginer   r   r   r   r   r   r   r   r   r   r   r   rÙ   r   Ú
dataloaderr   Úzero.configr   Úactivation_checkpointingr   r´   rM   r   r   ri  r   r    Ú	TARGET_IDrr  rs  rª  r  r  r  r  r  r±  r,   rî  rï  r1   r2   r)   r)   r)   r+   Ú<module>   sD   0