o
    TÃiº<  ã                   @   s.  d dl mZ ddlmZmZ G dd„ deƒZG dd„ deƒZG dd	„ d	eƒZG d
d„ deƒZG dd„ dƒZ	G dd„ de	ƒZ
G dd„ de	ƒZG dd„ de	ƒZG dd„ de	ƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZG d d!„ d!eƒZG d"d#„ d#eƒZd$d%„ Zd&d'„ Zd(S ))é   )Úcall_to_stré    )ÚABCÚabstractmethodc                       s˜   e Zd ZdZ‡ fdd„Zedd„ ƒZdd„ Zdd	„ Zd
d„ Z	e
dd„ ƒZe
dd„ ƒZe
dd„ ƒZe
dd„ ƒZe
dd„ ƒZdd„ Zdd„ Zdd„ Z‡  ZS )ÚPipeScheduleaZ  Directs the execution of a pipeline engine by generating sequences of
    :class:`PipeInstruction`.

    Schedules are generators that yield sequences of
    :class:`PipeInstruction` to process the micro-batches in one batch.
    Each yielded step is atomic in the sense that a barrier
    synchronization can be placed between successive steps without
    deadlock.

    Below is an example schedule that implements data parallelism with gradient accumulation:

    .. code-block:: python

        class DataParallelSchedule(PipeSchedule):
            def steps(self):
                for step_id in range(self.micro_batches):
                    cmds = [
                        LoadMicroBatch(buffer_id=0),
                        ForwardPass(buffer_id=0),
                        BackwardPass(buffer_id=0),
                    ]
                    if step_id == self.micro_batches - 1:
                        cmds.extend([
                            ReduceGrads(),
                            OptimizerStep(),
                        ])
                    yield cmds

            def num_pipe_buffers(self):
                return 1

    Args:
        micro_batches (int): The number of micro-batches that comprise a batch.
        stages (int): The number of pipeline stages.
        stage_id (int): The pipe stage that will execute the generated schedule.
    c                    s8   t ƒ  ¡  || _|| _|| _| jd | _| jd | _d S )Né   )ÚsuperÚ__init__Úmicro_batchesÚstagesÚstage_idÚ
prev_stageÚ
next_stage)Úselfr
   r   r   ©Ú	__class__© úS/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/pipe/schedule.pyr	   1   s   
zPipeSchedule.__init__c                 C   ó   dS )a  Yield a list of :class:`PipeInstruction` for each step in the schedule.

        .. note::
            Schedules must implement ``steps()`` to define the schedule.

        Returns:
            Instructions to be executed as one step of the pipeline
        Nr   ©r   r   r   r   Ústeps9   s   
zPipeSchedule.stepsc                 C   ó   | j S )a  The number of pipeline buffers that will be used by this stage.

        .. note::
            Schedules should specialize ``num_pipe_buffers()`` for memory savings at scale.

        Returns:
            The number of buffers for the engine to allocate.
        ©r
   r   r   r   r   Únum_pipe_buffersE   s   	zPipeSchedule.num_pipe_buffersc                 C   ó   d|  ko
| j k S   S ©Nr   r   ©r   Úmicro_batch_idr   r   r   Ú_valid_micro_batchP   ó   zPipeSchedule._valid_micro_batchc                 C   r   r   ©r   )r   r   r   r   r   Ú_valid_stageS   r   zPipeSchedule._valid_stagec                 C   r   )z,Stage index used to configure this schedule.©r   r   r   r   r   ÚstageV   ó   zPipeSchedule.stagec                 C   r   )zDThe number of total pipeline stages used to configure this schedule.r    r   r   r   r   Ú
num_stages[   r$   zPipeSchedule.num_stagesc                 C   r   )zBThe number of total micro_batches used to configure this schedule.r   r   r   r   r   Únum_micro_batches`   r$   zPipeSchedule.num_micro_batchesc                 C   s
   | j dkS )zGTrue if the configured ``stage_id`` is the first stage in the pipeline.r   r"   r   r   r   r   Úis_first_stagee   s   
zPipeSchedule.is_first_stagec                 C   s   | j | jd kS )zFTrue if the configured ``stage_id`` is the last stage in the pipeline.r   )r   r   r   r   r   r   Úis_last_stagej   s   zPipeSchedule.is_last_stagec                 C   s   |   |¡sJ ‚||  ¡  S )a9  Map a micro-batch index to a pipeline buffer index.

        This method uses a cyclic allocation strategy.

        Args:
            micro_batch_id (int): The micro-batch index relative to the beginning of the schedule.

        Returns:
            int: The index of the buffer that should store data.
        )r   r   r   r   r   r   Ú_buffer_idxo   s   zPipeSchedule._buffer_idxc                 C   s
   d | _ | S ©N)Úitr   r   r   r   Ú__iter__}   s   zPipeSchedule.__iter__c                 C   s   | j d u r
|  ¡ | _ t| j ƒS r*   )r+   r   Únextr   r   r   r   Ú__next__   s   


zPipeSchedule.__next__)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r	   r   r   r   r   r!   Úpropertyr#   r%   r&   r'   r(   r)   r,   r.   Ú__classcell__r   r   r   r   r      s*    %





r   c                   @   ó    e Zd ZdZdd„ Zdd„ ZdS )ÚInferenceSchedulezCA schedule for inferencing batches using pipeline parallelism.
    c                 c   sP   d}| j | j d }t|ƒD ]–}g }|| j }t| jƒr(|d }|d d }n
|d d }|d }| js8| jrD|  |¡rD| t	|ƒ¡ t| jƒrp|  
| j¡r]|  |d ¡r]| t|ƒ¡ |  
| j¡ro|  |¡ro| t|ƒ¡ n&|  
| j¡r‚|  |¡r‚| t|ƒ¡ |  
| j¡r–|  |d ¡r–| t|ƒ¡ |  |¡r¢| t|ƒ¡ |V  qdS )Ú éÿÿÿÿr   r   N)r
   r   Úranger   Ú_is_evenr'   r(   r   ÚappendÚLoadMicroBatchr!   r   ÚSendActivationr   ÚRecvActivationÚForwardPass)r   Úprev_micro_batch_idÚtotal_stepsÚstep_idÚcmdsr   Úrecv_bufÚsend_bufr   r   r   r   ‹   s@   €




€

ÝzInferenceSchedule.stepsc                 C   r   )zdOnly two pipeline buffers are required for inferencing.

        Returns:
            ``2``
        r   r   r   r   r   r   r   ´   s   z"InferenceSchedule.num_pipe_buffersN©r/   r0   r1   r2   r   r   r   r   r   r   r6   ‡   s    )r6   c                   @   sH   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
dS )ÚTrainSchedulezãA schedule for training a batch using hybrid parallelism.

    Pipeline parallelism is extracted through gradient accumulation and thus
    convergence follows that of a data parallel approach with the same batch
    size.
    c           	      c   s˜   d}d| j | j d  }t|ƒD ]¸}|  |¡\}}|  |¡r$|  |¡}|  |¡r.|  |¡}g }|rW|  |¡rD|  | j¡rD| t	|ƒ¡ |  |¡rV|  | j¡rV| t
|ƒ¡ n$|  |¡ri|  | j¡ri| t|ƒ¡ |  |¡r{|  | j¡r{| t|ƒ¡ | jdksˆ| j| jd kr–|r–|  |¡r–| t|ƒ¡ |  |¡r¬|r¥| t|ƒ¡ n| t|ƒ¡ ||d krÄ| tƒ ¡ | tƒ ¡ | tƒ ¡ |}|V  qdS )r7   r8   r   r   r   N)r
   r   r9   Ú_step_to_micro_batchr   r)   r!   r   r;   ÚSendGradr>   r   ÚRecvGradr=   r   r<   r?   ÚBackwardPassÚReduceTiedGradsÚReduceGradsÚOptimizerStep)	r   r@   rA   rB   r   Ú
is_forwardÚprev_bufferÚcurr_bufferrC   r   r   r   r   Å   sD   €



€
ÔzTrainSchedule.stepsc                 C   s   t | j| j | jƒ}td|ƒS )as  Return the number of pipeline buffers required for this stage.

        This is equivalent to the maximum number of in-flight forward passes,
        since we need to remember the activations of forward passes in order
        to run backpropagation. For synchronous 1F1B, this is equivalent to
        the index difference between this stage and the last stage.
        r   )Úminr   r   r
   Úmax)r   Úbuffersr   r   r   r   ÷   s   
zTrainSchedule.num_pipe_buffersc                 C   s¤   t |ƒrt | jƒr|  |¡}d}||fS t|ƒr(t| jƒr(|  |¡}d}||fS t |ƒr<t| jƒr<|  |¡}d}||fS t|ƒrPt | jƒrP|  |¡}d}||fS J ‚)NTF)r:   r   Ú_even_step_forward_idÚ_is_oddÚ_odd_step_forward_idÚ_even_step_backward_idÚ_odd_step_backward_id)r   rB   r   rO   r   r   r   rH     s"   
ñ
õ
	ù
þz"TrainSchedule._step_to_micro_batchc                 C   s   |d }t || jd  ƒ}|S )Nr   ©Úintr   ©r   rB   Úbaser   r   r   r   rU     s   z#TrainSchedule._even_step_forward_idc                 C   s"   |d d }t || jd  ƒ}|S ©Nr   r   rZ   r\   r   r   r   rW     s   z"TrainSchedule._odd_step_forward_idc                 C   s(   |d }t || j | jd d  ƒ}|S )Nr   r   )r[   r   r   r\   r   r   r   rX   "  s   z$TrainSchedule._even_step_backward_idc                 C   s,   |d d | j  d }t|| jd  ƒ}|S r^   )r   r[   r   r\   r   r   r   rY   '  s   z#TrainSchedule._odd_step_backward_idN)r/   r0   r1   r2   r   r   rH   rU   rW   rX   rY   r   r   r   r   rG   ½   s    2rG   c                   @   r5   )ÚDataParallelSchedulezgAn example schedule that trains using traditional data parallelism with gradient
    accumulation.
    c                 c   sX    t | jƒD ]#}tddtddtddg}|| jd kr&| tƒ tƒ g¡ |V  qdS )r7   r   )Ú	buffer_idr   N)r9   r
   r<   r?   rK   ÚextendrM   rN   )r   rB   rC   r   r   r   r   2  s   €ýþõzDataParallelSchedule.stepsc                 C   r   )z)Only one pipeline buffer needed.
        r   r   r   r   r   r   r   A  s   z%DataParallelSchedule.num_pipe_buffersNrF   r   r   r   r   r_   -  s    r_   c                   @   r5   )ÚPipeInstructiona0  Base class for all instructions to be executed by the pipeline engine.

    All keyword arguments are stored as members similar to a ``namedtuple``. These are
    then accessible to the :class:`PipeEngine` during execution.

    Args:
        kwargs (optional): keyword arguments to store as members
    c                 K   s2   | j j| _|| _| ¡ D ]
\}}t| ||ƒ qd S r*   )r   r/   ÚnameÚkwargsÚitemsÚsetattr)r   rd   ÚkeyÚvalr   r   r   r	   Q  s
   
ÿzPipeInstruction.__init__c                 C   s   t | jfi | j¤ŽS r*   )r   rc   rd   r   r   r   r   Ú__repr__W  s   zPipeInstruction.__repr__N)r/   r0   r1   r2   r	   ri   r   r   r   r   rb   G  s    	rb   c                   @   ó   e Zd ZdZdS )rN   zàPerforms one step with the optimizer and zeros gradients.

    .. note:: Should be issued after :class:`ReduceGrads` and :class:`ReduceTiedGrads`.

    .. note:: Can be a synchronization point among data-parallel ranks.
    N©r/   r0   r1   r2   r   r   r   r   rN   [  s    rN   c                   @   rj   )rM   zRReduce the computed gradients among data-parallel processes within the stage.
    Nrk   r   r   r   r   rM   e  s    rM   c                   @   rj   )rL   as  Reduce the computed gradients of tied modules within a pipeline-parallel group.

    .. warning::
        The stages included in this synchronization point are not known until
        the model is partitioned among pipeline stages. In the worst case, it
        includes all pipeline stages. This instruction should be scheduled
        carefully to avoid deadlocks.
    Nrk   r   r   r   r   rL   k  ó    rL   c                       s    e Zd ZdZ‡ fdd„Z‡  ZS )ÚBufferOpInstructionz’A pipeline instruction that operates on pipeline buffer(s).

    Args:
        buffer_id (int): the index of the pipeline buffer() to modify.
    c                    s   t ƒ jdd|i|¤Ž d S )Nr`   r   )r   r	   )r   r`   rd   r   r   r   r	   ~  r   zBufferOpInstruction.__init__)r/   r0   r1   r2   r	   r4   r   r   r   r   rm   w  s    rm   c                   @   rj   )r<   zˆLoad a micro-batch into a buffer.

    Roughly:

    .. code-block:: python

        buffers['inputs'][buffer_id] = next(data_iter)
    Nrk   r   r   r   r   r<   ƒ  rl   r<   c                   @   rj   )r?   z•Compute a forward pass.

    Roughly:

    .. code-block:: python

        buffers['outputs'][buffer_id] = forward(buffers['inputs'][buffer_id])
    Nrk   r   r   r   r   r?     rl   r?   c                   @   rj   )rK   a.  Compute a backward pass and accumulate gradients.

    Roughly:

    .. code-block:: python

        outputs = buffers['outputs'][buffer_id]
        gradients = buffers['gradients'][buffer_id]
        torch.autograd.backward(tensors=outputs,
                                grad_tensors=gradients)
    Nrk   r   r   r   r   rK   œ  ó    rK   c                   @   rj   )r=   a,  Send activations to the next stage in the pipeline.

    Roughly:

    .. code-block:: python

        send(buffers['outputs'][buffer_id])

    .. note::
        The communication is blocking and must be paired with a :class:`RecvActivation`
        on the next pipeline stage to avoid deadlock.
    Nrk   r   r   r   r   r=   ¬  ó    r=   c                   @   rj   )r>   a;  Receive activations from the previous stage in the pipeline.

    Roughly:

    .. code-block:: python

        buffers['inputs'][buffer_id] = recv()

    .. note::
        The communication is blocking and must be paired with a :class:`SendActivation`
        on the previous pipeline stage to avoid deadlock.
    Nrk   r   r   r   r   r>   ¼  ro   r>   c                   @   rj   )rI   a·  Send computed gradients to the previous pipeline stage.
    with respect to the received activations

    .. note::
        Only received tensors with ``requires_grad==True`` will produce gradients.
        Missing gradients will be replaced with ``None`` on the receiving stage.

    .. note::
        The communication is blocking and must be paired with a :class:`RecvGrad`
        on the previous pipeline stage to avoid deadlock.
    Nrk   r   r   r   r   rI   Ì  rn   rI   c                   @   rj   )rJ   af  Receive computed gradients the next pipeline stage.

    .. note::
        Only activations with ``requires_grad==True`` will produce gradients.
        Missing gradients will be replaced with ``None``.

    .. note::
        The communication is blocking and must be paired with a :class:`SendGrad`
        on the next pipeline stage to avoid deadlock.
    Nrk   r   r   r   r   rJ   Û  s    
rJ   c                 C   s   | d dkS ©Nr   r   r   ©Úxr   r   r   r:   é  ó   r:   c                 C   s   | d dkS rp   r   rq   r   r   r   rV   í  rs   rV   N)Úutilsr   Úabcr   r   r   r6   rG   r_   rb   rN   rM   rL   rm   r<   r?   rK   r=   r>   rI   rJ   r:   rV   r   r   r   r   Ú<module>   s(   |6p
