o
    }oi                    @   sp  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
mZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dl Z d dl!Z d dl"m#Z# d dl$m%Z% d dl&m'Z' d d	l(m)Z* d d
l(m+Z+ d dl,m-Z- d dl.m/Z/ d dl m0Z0m1Z1 d dl2m3Z3 z
d dl4m5Z5 dZ6W n e7y   dZ6Y nw ede0ee8e0f ee0 Z9ede1j:dZ;edZ<eee0ee8ef f  Z=erd dl>m?Z@ eG dd dee9 ZAdee9 de9fddZBde1j:de j0fddZCdd  ZDG d!d" d"e1jEee; ZFG d#d$ d$ZGd%edefd&d'ZHG d(d) d)e*ZIG d*d+ d+ZJeG d,d- d-ee;e9f ZKG d.d/ d/ZLed0ZMG d1d2 d2e1j:ee9eMf ZNeG d3d4 d4eZOeG d5d6 d6eZPdee1j: deQfd7d8ZRdee1j: deQfd9d:ZSdeTfd;d<ZUd=edee fd>d?ZVG d@dA dAeNZWG dBdC dCeWZXdDe0dEe0fdFdGZYe
dHdI ZZe [ dMdKdLZ\dS )N    N)defaultdict)contextmanagernullcontext)	dataclass)TYPE_CHECKINGAnyCallableDictGenericIterableIteratorListMappingOptionalProtocolSequenceTupleTypeVarUnioncastruntime_checkable)	TrainerFn)move_data_to_deviceparallel_state)DistributedDataParallel)DistributedDataParallelConfig)OptimizerConfig)TransformerConfig)Tensornn)override)FullyShardedDataParallelTFDataTModelT)boundTc                   @   s4   e Zd ZdedefddZdejdejfddZdS )	PrecisionPluginProtocoldatareturnc                 C      d S N selfr(   r,   r,   T/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/megatron_parallel.pyconvert_inputN       z%PrecisionPluginProtocol.convert_inputoutputc                 C   r*   r+   r,   )r.   r2   r,   r,   r/   convert_outputP   r1   z&PrecisionPluginProtocol.convert_outputN)__name__
__module____qualname__r#   r0   torchr   r3   r,   r,   r,   r/   r'   L   s    r'   dataloader_iterr)   c                 C   sJ   t  dkr
tdt| }t|trt|dkr|d }t|tj	
 S )ai  
    Moves the data to a device.

    In this case we unpack the dataloader iterator. There may be a wrapper on the dataloader
    iter from here: https://github.com/NVIDIA/NeMo/blob/main/nemo/lightning/fabric/strategies.py#L441.

    This will not subset the data for your with context parallel so please override this function if you
    want to use context parallel.

    Examples:
        If the dataloader_iter returns: [Tuple[<tensor>, <int>, <int>]] -> move to device
        If the dataloader_iter returns: [<tensor>, <tensor>] -> move to device

    Returns:
        DataT: The data moved to the device.
       zDefault data step is being used in a context parallel environment.Please define your own data step that appropriately slices the data for context parallel.   r   )r   get_context_parallel_world_size
ValueErrornext
isinstancetuplelenr   r7   cudacurrent_device)r8   batchr,   r,   r/   default_data_stepS   s   rD   modelc                 O   s   | |g|R i |S r+   r,   )rE   rC   argskwargsr,   r,   r/   default_forward_steps   s   rH   c                 C   st   d\}}t | ddr6dd |D }t|dkr|d n|}t | ddr6d	d |D }t|dkr4|d n|}||fS )
N)NNoverlap_grad_reduceFc                 S      g | ]}|j qS r,   )no_sync.0model_chunkr,   r,   r/   
<listcomp>{       z%extract_ddp_funcs.<locals>.<listcomp>r9   r   align_grad_reducec                 S   rJ   r,   )start_grad_syncrL   r,   r,   r/   rO   ~   rP   )getattrr@   )
ddp_configpipelineno_sync_funcgrad_sync_funcr,   r,   r/   extract_ddp_funcsw   s   rX   c                       s  e Zd ZdZ										dLdeeee f dee ded dee	e
e gef  d	ee	eegef  d
ee	egdf  dee dee dee dedee	egejf  ddf fddZ									dMdeee
e ee
e  f dedee	e
e gef  d	ee	eegef  d
ed dee dee dee dee dedejfddZ						dNdedee	e
e gef  d	ee	eegef  d
ed dee dee dee defdd Z							dOdedee	e
e gef  d	ee	eegef  d
ed dee dee dee dee defd!d"Z							dOdedee	e
e gef  d	ee	eegef  d
ed dee dee dee dee defd#d$Z							dOdedee	e
e gef  d	ee	eegef  d
ed dee dee dee dee defd%d&Z								dPd'ededee	e
e gef  d	ee	eegef  d
ed dee dee dee dedee defd(d)Zde	ejegeejd*f f fd+d,Zd-d. Zd/d0 Z d1d2 Z!d3d4 Z"dQd5d6Z#dejfd7d8Z$dRd:ede%ee&f fd;d<Z'de%ee&f fd=d>Z(d?d@ Z)dAdB Z*dCdD Z+e,deeee f fdEdFZ-e,defdGdHZ.e/dIe&de&f fdJdKZ0  Z1S )SMegatronParallelas	  Implements distributed model parallelism that is based on Megatron-LM.

    This supports various forms of parallelism:
    - tensor-parallelism
    - pipeline-parallelism
    - virtual pipeline parallelism
    - expert parallelism
    - sequence parallelism

    Attributes
    ----------
        pipeline (Union[nn.Module, Iterable[nn.Module]]): The sequence of modules that
            constitute the pipeline.
        precision_plugin (Optional[PrecisionPluginProtocol]): An optional plugin for
            managing precision-specific operations.
        callbacks (CallbackConnector): A connector for managing and invoking callbacks.
        data_step (Callable[[Iterator[DataT]], DataT]): A function that takes an iterator
            over the data and returns the next batch.
        forward_step (Callable[[nn.Module, DataT], Tensor]): A function that defines the
            forward pass of a model.
        loss_reduction (Optional[Callable[[nn.Module], MegatronLossReduction]]): An optional
            function that defines how the loss is reduced.
        vp_size (Optional[int]): Virtual pipeline parallel size.
        ddp_config (Optional[DistributedDataParallelConfig]): An instance of Megatron core's
            DistributedDataParallelConfig which controls the Megatron DDP configuration.
        fsdp (Optional[str]): Whether model should run Torch FSDP2 instead of DDP, select from
            ["megatron", "torch"]. Defaults to None.
        cpu (bool): Whether model should reside on CPU.
        convert_module_fn (Optional[Callable[[ModelT], nn.Module]]): An optional function to
            apply to the model parameters after initialization.

    Examples
    --------
        >>> from torch import nn
        >>> from megatron_ext.megatron_parallel import MegatronParallel
        >>> model = nn.Sequential(nn.Linear(10, 10), nn.ReLU(), nn.Linear(10, 5))
        >>> megatron_model = MegatronParallel(model)
        >>> print(megatron_model)
        MegatronParallel(
          (0): Linear(in_features=10, out_features=10, bias=True)
          (1): ReLU()
          (2): Linear(in_features=10, out_features=5, bias=True)
        )

    References
    ----------
        Shoeybi, M., Patwary, M., Puri, R., LeGresley, P., Casper, J., & Catanzaro, B. (2019).
        Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM.
        arXiv preprint arXiv:1909.08053.
    NFrU   precision_plugin	callbacksCallbackConnector	data_stepforward_steploss_reductionMegatronLossReductionvp_sizerT   fsdpcpuconvert_module_fnr)   c                    s   ddl m} t|tjrt|}nt|tjr|g}n|}|d urRt|dkrR| dkrRddl	m
} td|D ]}||d }t|drL|j|d || q8t | || _|
| _|pbt | _|pgt| _|plt| _|| _|| _|	| _|| _|| _d S )Nr   r   r9   )ioconfigure_model)vp_stage)megatron.corer   r>   r    
ModuleListlistModuler@   &get_pipeline_model_parallel_world_sizenemo.lightningre   rangereinithasattrrf   appendsuper__init__rZ   _cpur\   r[   rD   r]   rH   r^   r_   rT   rb   rd   ra   )r.   rU   rZ   r[   r]   r^   r_   ra   rT   rb   rc   rd   r   	_pipelinere   i_model	__class__r,   r/   rs      s2   




zMegatronParallel.__init__Tr(   forward_onlyz!MegatronLossReduction[DataT, Any]
seq_lengthmicro_batch_sizenum_microbatchesstep_iwrap_forward_stepc              
   C   s  |p| j }|p	| j}i }|
r|p| j}| j||||d}n|}tj| |||||||	d}||d< | jd|}| jjd|d | }| jjd||d |rq| jjd	||d t	|t
r`|| j}||}| jjd
||||d n
tjdtj d}| jjd|||d |S )a  The method performs the forward pass of the model.

        This method is responsible for executing the forward pass of the model. If `forward_only` is set to False,

        During the execution, it invokes various callbacks at different stages of the operation.
        For more info about that see [CallbackConnector].

        Args:
            data (Union[DataT, Iterator[DataT], List[Iterator[DataT]]]): The input data for the model.
            forward_only (bool, optional): If True, only perform the forward pass. Defaults to True.
            data_step (Optional[Callable[[Iterator[DataT]], DataT]], optional): Function to process the data.
                Defaults to None.
            forward_step (Optional[Callable[[nn.Module, DataT], Tensor]], optional): Function to perform the
                forward pass. Defaults to None.
            loss_reduction (Optional[MegatronLossReduction[DataT, Any]], optional): Function to reduce the
                loss. Defaults to None.
            seq_length (Optional[int], optional): Sequence length for the model. Defaults to None.
            micro_batch_size (Optional[int], optional): Size of the micro batch. Defaults to None.
            num_microbatches (Optional[int], optional): Number of microbatches. Defaults to None.
            wrap_forward_step (bool, optional): If True, wrap the forward step function. Defaults to True.

        Returns
        -------
            torch.Tensor: The output tensor from the forward pass.
        )r^   r]   r_   context)rz   r|   r}   r{   r~   stepon_megatron_step_starton_megatron_microbatches_start)r   on_megatron_microbatches_end)r   microbatch_outputs%on_megatron_reduce_microbatches_start#on_megatron_reduce_microbatches_end)r   r_   r   reduced        deviceon_megatron_step_end)r   r   r   )r^   r_   r]   wrapped_forward_stepMegatronStepinferr[   transform_eventeventr>   _ModuleStepFunctionmodulereducer7   tensorrA   rB   )r.   r(   rz   r]   r^   r_   r{   r|   r}   r~   r   _forward_step_loss_reduction_forward_context
_data_stepforward_step_funcr   r   r   r,   r,   r/   forward   sX   
&





	zMegatronParallel.forwardc           	   
   K   s$   | j d|f||||||dd|S )NtrainingF)r]   r^   r_   r{   r|   r}   rz   _step)	r.   r(   r]   r^   r_   r{   r|   r}   rG   r,   r,   r/   training_stepD  s   
zMegatronParallel.training_stepc	           
      K   &   | j d|f|||||||dd|	S )N
validationTr]   r^   r_   r{   r|   r}   r~   rz   r   
r.   r(   r]   r^   r_   r{   r|   r}   r~   rG   r,   r,   r/   validation_step\     z MegatronParallel.validation_stepc	           
      K   r   )NtestTr   r   r   r,   r,   r/   	test_stepv  r   zMegatronParallel.test_stepc	           
      K   r   )NpredictTr   r   r   r,   r,   r/   predict_step  r   zMegatronParallel.predict_step	step_typec                 K   s|   t | j| dstd| d|pt| j|}|p"t| j|}|p+t| j|}| jd||||||||	|
d	|S )Nr   zself.module must have a `z_step` method)	r(   r]   r^   r_   r{   r|   r}   rz   r~   r,   )rp   r   AttributeErrorr   from_data_stepfrom_forward_stepfrom_loss_reductionr   )r.   r   r(   r]   r^   r_   r{   r|   r}   rz   r~   rG   r   r   r   r,   r,   r/   r     s$   
zMegatronParallel._stepMegatronCallbackProtocolc                    s0   ddl m t fdd}|S )a@  The method wraps the forward step function and returns a callable.

        The output is a forward_step function in the form of:
        https://github.com/NVIDIA/Megatron-LM/blob/main/pretrain_gpt.py#L129

        Args:
            forward_step (Callable): The forward step function to be wrapped.
            loss_reduction (Callable): The loss reduction function.
            context (Dict): The context dictionary.
            data_step (Callable): The data step function.

        Returns
        -------
            Callable: The wrapped forward step function.
        r   r   c                    s   t tr
|}n}|| } d }t tr|}n}t tr*|}n}jjd|||d jrKjdt|jdd drKj|}|||}j	||||d jroj
dt|jdd droj|}jjd||||d	 ||fS )
Nr   on_megatron_microbatch_start)r   rC   forward_callbackFrg   ignore_virtualrg   )rC   rE   forward_moduler   on_megatron_microbatch_end)r   rC   r2   r   )r>   r   r[   r   rZ   is_pipeline_first_stagerS   r   r0   _setup_moduleis_pipeline_last_stager3   )r8   rE   r   rC   r   r   r   output_tensorr   r]   r^   r_   r   r.   r,   r/   wrapped_forward_step_func  sR   








zHMegatronParallel.wrapped_forward_step.<locals>.wrapped_forward_step_func)rh   r   	functoolswraps)r.   r^   r_   r]   r   r   r,   r   r/   r     s   9z%MegatronParallel.wrapped_forward_stepc           
   
   C   sz  ddl m} ddlm} | D ]y}| js"tr| jdkr"|tj	  |
 D ]}|| q&t|drDt|dsDt|jdrC|jj|_n	 | r| dkrddlm} tt| }tt| }d|  d	|  d
| }|| ||kr|d| d|| dd q| jr|   z| jjjtjks|   W d S W d S  t y }	 zdt!|	vr|	W Y d }	~	d S d }	~	ww )Nr   r   )8set_defaults_if_not_set_tensor_model_parallel_attributesmegatronrf   set_input_tensorloggingzC > number of parameters on (tensor, pipeline) model parallel rank (z ,z): z# > number of trainable parameters: z (z.2%z
 of total)zis not attached to a `Trainer`)"rh   r   $megatron.core.tensor_parallel.layersr   rt   HAVE_CUSTOM_FSDPrb   rA   r7   rB   
parametersrp   r   r   model_parallel_is_initializedget_data_parallel_rank
nemo.utilsr   _calc_number_of_paramsrj    _calc_number_of_trainable_paramsget_tensor_model_parallel_rank get_pipeline_model_parallel_rankinford   apply_convert_module_fntrainerstatefnr   TESTINGinit_ddpRuntimeErrorstr)
r.   r   r   model_moduleparamr   
num_paramsnum_trainable_paramsmsger,   r,   r/   init_model_parallel  sX   



z$MegatronParallel.init_model_parallelc                 C   s(   t t| D ]}| | | | |< qd S r+   )rn   r@   rd   )r.   rv   r,   r,   r/   r   P  s   z(MegatronParallel.apply_convert_module_fnc              
   C   s,  t | jtsd S ddlm} ddlm} ddlm} t	| D ]\}}|j
}tdd | D r2tntj}d}t| drHt | jjtrH| jjj}|dkpM|}	| z |||}
tr| jd	krt |
tsdd
lm} t|jdds|t|jdd |d t|jddrt|jdd |d |jjsJ d| jjsJ dt|j| j||	d}nt |
tst|j| j||jdd|  |	d}n|
}W d    n1 sw   Y  ||_
|j!|_!t"#|j$j%|j$j&|j$j'|j$j(|j$j)}||_*|j*j+,|j$j+ t-|j._$qt/| j| \}}| D ]}||j_0||j_1qd S )Nr   r   )Float16Module)unwrap_modelc                 s   s    | ]}|j V  qd S r+   )requires_gradrM   xr,   r,   r/   	<genexpr>b  s    z,MegatronParallel.init_ddp.<locals>.<genexpr>Foptimr   r   use_custom_fsdpTz=Setting module.config.use_custom_fsdp to True for MCore FSDP.gradient_accumulation_fusionzKSetting module.config.gradient_accumulation_fusion to False for MCore FSDP.z,Custom FSDP is not enabled in module.config.z)Custom FSDP is not enabled in ddp_config.)disable_bucketingwith_context_parallel)data_parallel_groupexpert_data_parallel_groupr   )2r>   rT   r   rh   r    megatron.core.transformer.moduler   nemo.utils.model_utilsr   	enumerater   allr   r   r7   enable_gradrp   r   configr   (overlap_param_gather_with_optimizer_stepr   rb   r"   r   r   rS   setattrwarningr   DDPget_data_parallel_group%get_data_modulo_expert_parallel_groupbufferstypesFunctionType__getattr____code____globals__r4   __defaults____closure__original_getattr__dict__updategetattr_proxyry   rX   rV   rW   )r.   r   r   r   model_chunk_idxrN   r   init_ddp_contextr   r   unwrapped_moduler   dist_moduler   rV   rW   r,   r,   r/   r   T  s    






	&
zMegatronParallel.init_ddpc                 C   s"   | D ]}t |dr|j|j_qd S )Nr   )rp   r   ry   r   )r.   rN   r,   r,   r/   teardown_ddp  s
   

zMegatronParallel.teardown_ddpc                    sF   t |dr!t|jj  fdd| D }|jdi | d S d S )Nsetupc                       i | ]\}}| v r||qS r,   r,   rM   kv
setup_argsr,   r/   
<dictcomp>      z2MegatronParallel._setup_module.<locals>.<dictcomp>r,   )rp   inspectgetfullargspecr  rF   items)r.   functionrG   setup_kwargsr,   r  r/   r     s
   
zMegatronParallel._setup_modulec                    sF   | j |fi | t|j  fdd| D }||i |}|S )Nc                    r	  r,   r,   r
  	call_argsr,   r/   r    r  z1MegatronParallel._call_module.<locals>.<dictcomp>)r   r  r  rF   r  )r.   r  rF   rG   call_kwargsr   r,   r  r/   _call_module  s
   zMegatronParallel._call_module prefixc                 C   sR   i }t | D ] \}}| jdur| |}||d| < q| |}|| q|S )aI  
        Creates the sharded state dict which is used by dist_checkpoint to save the sharded tensors to disk.
        When given the sharded_stated_dict, dist_checkpoint.load will load the tensors corresponding to
        self.state_dict().
        The sharded tensor mapping is defined in the GPTModel class from mcore.
        Nmodel_)r   ra   _module_sharded_state_dictr  )r.   r  sharded_state_dictindexr   module_sharded_state_dictr,   r,   r/   r    s   


z#MegatronParallel.sharded_state_dictc                 O   s`   t |dr|j|i |S t |dr,d|dddg}| j|jg|R d|i|S td)Nr  rf   r  r  zmodule.z!Could not find sharded state dict)rp   r  joinpopr  r   r<   )r.   r   rF   rG   r  r,   r,   r/   r    s   

z+MegatronParallel._module_sharded_state_dictc                 C   4   | D ]}|j }t|tst|tsJ |  qd S r+   )r   r>   r   r"   enable_forward_pre_hookr.   rE   rN   r,   r,   r/   r$    
   
z(MegatronParallel.enable_forward_pre_hookc                 C   r#  r+   )r   r>   r   r"   disable_forward_pre_hookr%  r,   r,   r/   r'    r&  z)MegatronParallel.disable_forward_pre_hookc                 C   s8   | D ]}|j }t|tst|tsJ |jdd qd S )NT)
force_sync)r   r>   r   r"   start_param_syncr%  r,   r,   r/   force_param_sync  s
   z!MegatronParallel.force_param_syncc                 C   s   t | dkr
| d S t| S )Nr9   r   )r@   rj   r.   r,   r,   r/   rU     s   zMegatronParallel.pipelinec                 C   s   | d S )Nr   r,   r+  r,   r,   r/   r        zMegatronParallel.moduleitemc                    s   zt  |W S  tyD   t| dkr!td| jj d| dzt| j| d |W  Y S  tyC   td| jj d| dw w )Nr   '' object has no attribute 'z' and contains no modules)	rr   r   r   r@   ry   r4   rS   _modules_get_abs_string_indexr.   r-  rx   r,   r/   r     s   zMegatronParallel.__getattr__)
NNNNNNNNFN)	TNNNNNNNT)NNNNNN)NNNNNNN)NNNNNNTNr)   N)r  )2r4   r5   r6   __doc__r   r$   r   r   r'   r   r   r#   r   intr   r   boolr    rk   rs   r   r7   r   STEP_OUTPUTr   r   r   r   r   r   r   r   r   r   r  r   r  r	   r   r  r  r$  r'  r*  propertyrU   r   r!   r   __classcell__r,   r,   rx   r/   rY      s   6	
1	

a

	
	
	
	

"
P4X
		 rY   c                   @   s   e Zd ZdZddededefddZedd	d
eded  fddZ	edd	d
eded  fddZ
edd	d
eded  fddZdejfddZdS )r   z
    This class acts as a bridge between Megatron core's lower-level functional API and PTL's object-oriented API,
        making it possible to use PTL-compatible functions in Megatron core.
    Fnameis_propertyincludes_selfc                 C   s   || _ || _|| _d S r+   )r:  r;  r<  )r.   r:  r;  r<  r,   r,   r/   rs     s   
z_ModuleStepFunction.__init__r   pl.LightningModuler   r)   c                 C   s.   | ddfD ]}t ||rt|  S qd S )Nr   r]   rp   r   clsr   r   fn_namer,   r,   r/   r     s
   
z"_ModuleStepFunction.from_data_stepc                 C   s   ddl m} |jdt|dd dr*t|| ds!td| dt| dd	d
S | ddfD ]}t||r@t|d	d
  S q1d S )Nr   r   Frg   r   r   zLightningModule does not have z_step methodT)r<  r   r^   )rh   r   r   rS   rp   r<   r   )r@  r   r   r   rA  r,   r,   r/   r     s   
z%_ModuleStepFunction.from_forward_stepc                 C   s2   | ddfD ]}t ||rt|dd  S qd S )Nr   r_   T)r;  r>  r?  r,   r,   r/   r   .  s
   
z'_ModuleStepFunction.from_loss_reductionc                    sL   t || j | jrtt t|| jtr S   S | jr$ fdd}|S  S )Nc                    s    | S r+   r,   )r.   rF   attrr,   r/   wrappedB     z-_ModuleStepFunction.__call__.<locals>.wrapped)rS   r:  r;  r>   typer8  r<  )r.   r   rD  r,   rB  r/   __call__6  s   z_ModuleStepFunction.__call__N)FF)r4   r5   r6   r4  r   r6  rs   classmethodr   r   r   r   r    rk   rG  r,   r,   r,   r/   r     s    r   r-  c                 C   s~   z
t | j| |W S  ty> } z(|dkr|zt| j|W W  Y d }~S  ty9   td| jj d| dw d }~ww )Nr   r.  r/  )rr   ry   r   r   rS   r   r4   )r.   r-  r   r,   r,   r/   r  J  s   r  c                	       sR   e Zd Z	ddededejjdef fddZ	dd	d
Z
dedefddZ  ZS )r   Fr   rT   r   r   c                    sD   t tjj  fdd| D }t jd||||d| d S )Nc                    r	  r,   r,   r
  init_parametersr,   r/   r  c  r  z DDP.__init__.<locals>.<dictcomp>)r   rT   r   r   r,   )r  	signatureMcoreDDPrs   r   r  rr   )r.   r   rT   r   r   rG   filtered_kwargsrx   rI  r/   rs   W  s   
zDDP.__init__r  c                 K   s   | j jd||d| d S )N)r  	keep_varsr,   )r   
state_dict)r.   r  rN  rG   r,   r,   r/   rO  l  s   zDDP.state_dictr-  r)   c                 C   s
   t | |S r+   )r  r2  r,   r,   r/   r   o     
zDDP.__getattr__)F)r  F)r4   r5   r6   r   r   r7   r    rk   r6  rs   rO  r   r   r9  r,   r,   rx   r/   r   V  s    
r   c                   @   sp   e Zd ZdZddddZdddZdeddfd	d
ZdededefddZ	dddZ
dddZdefddZdS )r\   a  
    A connector for managing and invoking callbacks.

    The CallbackConnector class in the MegatronParallel module
    is used to manage and invoke callbacks during the execution of the model.
    Callbacks are functions that are called at specific stages of the model
    execution, allowing you to hook into the model's operation for logging, debugging, or other purposes.

    The CallbackMethods class defines the names of the callback methods that can be used.

    These methods are:
    - `on_megatron_step_start`
    - `on_megatron_microbatch_start`
    - `on_megatron_microbatch_callback`
    - `on_megatron_microbatch_end`
    - `on_megatron_reduce_microbatches_start`
    - `on_megatron_reduce_microbatches_end`
    - `on_megatron_log_step_end`
    - `on_megatron_step_end`

    Each of these methods corresponds to a specific stage in the model's operation.
    You can define these methods in your callback functions to perform specific actions at these stages.
    There is no need for the class to be a subclass of a specific parent class.
    As long as the class contains the methods outlined above, it can be used as a callback.
    Nr)   c                 C   s    t t| _|r| j|  d S d S r+   )r   rj   r[   add)r.   r[   r,   r,   r/   rs     s   
zCallbackConnector.__init__c                    s   d zddl m} |j W n	 ty   Y nw  fddttD }|D ]3}t|tr>|j	 D ]\}}| j| 
| q0q$|D ]}t||rVtt||rV| j| | q@q$| S )a4  
        Adds callback functions to the connector.

        Parameters
        ----------
        *callbacks : CallbackT
            One or more callback functions to add.

        Returns
        -------
        CallbackConnector
            The CallbackConnector instance to allow method chaining.
        Nr   c                    s$   h | ]}| d rt |s|qS )on)
startswithrp   )rM   m_pl_callbackr,   r/   	<setcomp>  s   $ z(CallbackConnector.add.<locals>.<setcomp>)lightning.pytorchpytorchCallbackImportErrordirCallbackMethodsr>   r\   r[   r  extendrp   callablerS   rq   )r.   r[   plmegatron_methodscallback
event_nameevent_callbacksmethodr,   rU  r/   rQ    s&   

zCallbackConnector.addr:  c                    s  | j |g D ]y}t||d}t|rt|  j }tdd |D }tdd |D }|r;|r;||i | q|rP fdd|	 D }	||i |	 q|rddd t
||D }
||
i | qd	d t
||D }
 fd
d|	 D }	||
i |	 qdS )aM  
        Triggers an event and calls all associated callbacks.

        Parameters
        ----------
        name : str
            The name of the event to trigger.
        *args : Any
            Positional arguments to pass to the callbacks.
        **kwargs : Any
            Keyword arguments to pass to the callbacks.
        Nc                 s       | ]	}|j |jkV  qd S r+   )kindVAR_POSITIONALrM   pr,   r,   r/   r         z*CallbackConnector.event.<locals>.<genexpr>c                 s   rf  r+   )rg  VAR_KEYWORDri  r,   r,   r/   r     rk  c                        i | ]\}}| j v r||qS r,   r   r
  sigr,   r/   r         z+CallbackConnector.event.<locals>.<dictcomp>c                 S   &   g | ]\}}|j |j|jfv r|qS r,   rg  POSITIONAL_ONLYPOSITIONAL_OR_KEYWORDrM   argr   r,   r,   r/   rO     
    z+CallbackConnector.event.<locals>.<listcomp>c                 S   rr  r,   rs  rv  r,   r,   r/   rO     rx  c                    rm  r,   rn  r
  ro  r,   r/   r    rq  )r[   getrS   r_  r  rK  r   valuesanyr  zip)r.   r:  rF   rG   rb  callback_methodparamsaccepts_var_argsaccepts_var_kwargsrM  filtered_argsr,   ro  r/   r     s0   

zCallbackConnector.eventobjc                 K   sV   | j |g D ]!}t||d}t|r(||fi |}|dur(t|t|r(|}q|S )a  
        Triggers an event that allows callbacks to transform and return an object.

        This method applies a series of potential transformations to the input object
        by calling registered callbacks. Each callback has the opportunity to modify
        and return a new version of the object.

        Parameters
        ----------
        name : str
            The name of the event to trigger.
        obj : T
            The object to be potentially transformed by callbacks.
        **kwargs : Any
            Additional keyword arguments to pass to the callbacks.

        Returns
        -------
        T
            The potentially transformed object.
        N)r[   ry  rS   r_  r>   rF  )r.   r:  r  rG   rb  r}  resultr,   r,   r/   r     s   z!CallbackConnector.transform_eventc                 C   s4   t |ts	tdt }tti | j|j|_|S )a  
        Adds another CallbackConnector's callbacks to this one.

        Parameters
        ----------
        other : CallbackConnector
            Another CallbackConnector instance to add.

        Returns
        -------
        CallbackConnector
            A new CallbackConnector instance with combined callbacks.

        Raises
        ------
        ValueError
            If `other` is not an instance of CallbackConnector.
        (Can only add CallbackConnector instances)r>   r\   r<   r   rj   r[   )r.   othernew_connectorr,   r,   r/   __add__	  s
   
zCallbackConnector.__add__c                 C   s:   t |ts	td|j D ]\}}| j| | q| S )a  
        In-place addition of another CallbackConnector's callbacks.

        Parameters
        ----------
        other : CallbackConnector
            Another CallbackConnector instance to add.

        Returns
        -------
        CallbackConnector
            The same CallbackConnector instance with combined callbacks.

        Raises
        ------
        ValueError
            If `other` is not an instance of CallbackConnector.
        r  )r>   r\   r<   r[   r  r^  )r.   r  rc  rd  r,   r,   r/   __iadd__"  s
   
zCallbackConnector.__iadd__c                    sR   dd t tD }t fdd|D }|sdS | j D ]	} |v r& dS qdS )a  
        Check if the given callback object is registered in the CallbackConnector.
        If the object has none of the methods of CallbackMethods, it returns True.
        If it has at least one of the methods, it checks if it's inside the CallbackConnector object.

        Args:
            callback_object: The object to check for callback methods.

        Returns
        -------
            bool: True if the callback object is registered, False otherwise.
        c                 S   s(   g | ]}t tt|r|d s|qS )__)r_  rS   r]  rS  )rM   funcr,   r,   r/   rO   I  s    z2CallbackConnector.__contains__.<locals>.<listcomp>c                 3   s    | ]}t  |V  qd S r+   )rp   )rM   re  callback_objectr,   r/   r   P      z1CallbackConnector.__contains__.<locals>.<genexpr>TF)r\  r]  r{  r[   rz  )r.   r  callback_methodshas_any_callback_methodrd  r,   r  r/   __contains__;  s   zCallbackConnector.__contains__r+   r3  )r)   r\   )r4   r5   r6   r4  rs   rQ  r   r   r&   r   r  r  r6  r  r,   r,   r,   r/   r\   s  s    
$1
!
r\   c                   @   s  e Zd ZU dZee ed< eee	e e
e	e  f ed< eed< eed< dZee ed< dZee ed< dZee ed	< dZee ed
< dZee ed< e				d+dee dedededee dee d	ee d
ee ddfddZde
e fddZdeee	e e
e	e  f de
e	e  fddZededee fddZededee fddZededee fddZedeee
e f fddZed,ddZed-d d!Ze j!d.d#d$Z"edeedf fd%d&Z#de$e
e	e  ee f fd'd(Z%e j!defd)d*Z&dS )/r   a  
    Represents a single step in the Megatron model's training or inference process.

    This class encapsulates all the necessary information and logic for executing
    a single step (forward pass, and optionally backward pass) in the Megatron model.
    It handles data preparation, model execution, and provides utilities for inferring
    batch sizes and sequence lengths.

    Attributes:
        pipeline (MegatronParallel[ModelT]): The Megatron parallel model pipeline.
        data (Union[DataT, Iterator[DataT], List[Iterator[DataT]]]): Input data for the step.
        forward_step_func (Callable): Function to perform the forward step.
        forward_only (bool): If True, only perform forward pass (no backward pass).
        micro_batch_size (Optional[int]): Size of each micro-batch.
        seq_length (Optional[int]): Sequence length for the current step.
        num_microbatches (Optional[int]): Number of micro-batches in this step.
        decoder_seq_length (Optional[int]): Sequence length of decoder (used only in
            encoder-decoder style models) for the current step.

    Type Parameters:
        ModelT: The type of the model being used.
        DataT: The type of the input data.
    rU   r(   r   rz   Nr|   r{   r}   r~   decoder_seq_lengthr)   zMegatronStep[ModelT, DataT]c	           	   
   C   sL   |du r|j r|j j}| |||||p| ||p| ||p"| ||dS )a  
        Creates a MegatronStep instance, inferring missing parameters if possible.

        This method attempts to infer the micro_batch_size, seq_length, and num_microbatches
        from the provided data if they are not explicitly specified.

        Args:
            pipeline (MegatronParallel[ModelT]): The Megatron parallel model pipeline.
            data (DataT): Input data for the step.
            forward_step_func (Callable): Function to perform the forward step.
            forward_only (bool): If True, only perform forward pass (no backward pass).
            micro_batch_size (Optional[int]): Size of each micro-batch.
            seq_length (Optional[int]): Sequence length for the current step.
            num_microbatches (Optional[int]): Number of micro-batches in this step.
            step_i (Optional[int]): Step index for the current step.
        Returns:
            MegatronStep[ModelT, DataT]: An instance of MegatronStep with inferred parameters.
        N)rU   r(   r   rz   r|   r{   r}   r~   )r   global_stepinfer_micro_batch_sizeinfer_seq_lengthinfer_num_microbatches)	r@  rU   r(   r   rz   r|   r{   r}   r~   r,   r,   r/   r     s   zMegatronStep.inferc                 C   sv   | j du r	td| jdu rtd| jdu rtd|  \}}|p%| j}| j| j|| j| j || j| j| j	| j
d	S )a  
        Executes the Megatron step.

        This method performs the forward (and optionally backward) pass using the
        configured forward_backward_func. It ensures all necessary parameters are set
        before execution.

        Returns:
            List[Any]: The output of the forward_backward_func, typically containing
                       loss values and other relevant information.

        Raises:
            ValueError: If any of num_microbatches, seq_length, or micro_batch_size is not set.
        Nznum_microbatches is not setzseq_length is not setzmicro_batch_size is not set)	r   data_iteratorrE   r}   r{   r|   rz   r  adjust_tensor_shapes_fn)r}   r<   r{   r|    get_data_iterator_and_seq_lengthforward_backward_funcr   rE   rz   r  r  )r.   r  r{   r,   r,   r/   rG    s&   



zMegatronStep.__call__c                 C   s^   t |trt| j|S t |tr"tdd |D r"tttt  |S tttt  t	|ggS )a9  
        Converts the provided data into a list of iterators.

        This method is used to convert the input data into a list of iterators that can be used
        for data parallelism in the Megatron model. The input data can be a single data item,
        an iterator, or a list of iterators.

        Args:
            data (Union[DataT, Iterator[DataT], List[Iterator[DataT]]]): The input data to be
                converted into a list of iterators.

        Returns:
            List[Iterator[DataT]]: A list of iterators created from the input data.
        c                 s   s    | ]}t |tV  qd S r+   )r>   r   rM   r-  r,   r,   r/   r     r  z5MegatronStep.to_data_iterator_list.<locals>.<genexpr>)
r>   r   _make_data_iterator_listrE   rj   r   r   r   r#   iterr-   r,   r,   r/   to_data_iterator_list  s
   
z"MegatronStep.to_data_iterator_listc                 C   sd   t |tr
|dS t |tr| tt| S t |tt	fr0t
|dkr0|d }| |S dS )a  
        Infers the micro-batch size from the input data.

        This method attempts to determine the micro-batch size by examining the first
        dimension of the input data. It handles various data types including Tensors,
        dictionaries, lists, and tuples.

        Args:
            data (DataT): The input data from which to infer the micro-batch size.

        Returns:
            Optional[int]: The inferred micro-batch size, or None if it cannot be determined.
        r   N)r>   r   sizedictr  r=   r  rz  rj   r?   r@   r@  r(   _tensorr,   r,   r/   r    s   



z#MegatronStep.infer_micro_batch_sizec                 C   sd   t |tr
|dS t |tr| tt| S t |tt	fr0t
|dkr0|d }| |S dS )a  
        Infers the sequence length from the input data.

        This method attempts to determine the sequence length by examining the second
        dimension of the input data. It handles various data types including Tensors,
        dictionaries, lists, and tuples.

        Args:
            data (DataT): The input data from which to infer the sequence length.

        Returns:
            Optional[int]: The inferred sequence length, or None if it cannot be determined.
        r9   r   N)r>   r   r  r  r  r=   r  rz  rj   r?   r@   r  r,   r,   r/   r    s   



zMegatronStep.infer_seq_lengthc                 C   s   t |ttttfrdS dS )a  
        Infers the number of micro-batches from the input data.

        Currently, this method assumes a single micro-batch for common data types.
        It may need to be extended for more complex data structures or use cases.

        Args:
            data (DataT): The input data from which to infer the number of micro-batches.

        Returns:
            Optional[int]: The inferred number of micro-batches, or None if it cannot be determined.
        r9   N)r>   r  r?   rj   r   )r@  r(   r,   r,   r/   r  "  s   z#MegatronStep.infer_num_microbatchesc                 C   s   | j j S )z
        Retrieves the model or list of models from the pipeline.

        Returns:
            Union[ModelT, List[ModelT]]: The model or list of models in the pipeline.
        )rU   r+  r,   r,   r/   rE   5     zMegatronStep.modelr=  c                 C      | j jS )z
        Retrieves the PyTorch Lightning module from the pipeline.

        Returns:
            pl.LightningModule: The PyTorch Lightning module.
        )rU   r   r+  r,   r,   r/   	pl_module?  r  zMegatronStep.pl_module
pl.Trainerc                 C   r  )z
        Retrieves the PyTorch Lightning trainer from the pipeline.

        Returns:
            pl.Trainer: The PyTorch Lightning trainer.
        )rU   r   r+  r,   r,   r/   r   I  r  zMegatronStep.trainerMegatronStepProtocolc                 C   s   ddl m} | S )aI  
        Retrieves the forward-backward function for the Megatron model.

        This property uses Megatron's scheduling to get the appropriate
        forward-backward function based on the current configuration.

        Returns:
            MegatronStepProtocol: The function to perform forward and backward passes.
        r   )get_forward_backward_func))megatron.core.pipeline_parallel.schedulesr  )r.   r  r,   r,   r/   r  S  s   z"MegatronStep.forward_backward_funcc                 C   s&   ddl m} || j| j| j| j| jS )a|  
        Retrieves the function to adjust send and receive tensor shapes in Megatron-Core's forward pass.

        Currently only used during non-interleaved pipelining for Distillation.

        Returns:
            Union[Callable, None]: The function which takes in tensor shapes and returns updated shapes,
                                   or None if not applicable.
        r   ),get_tensor_shapes_adjust_fn_for_distillation)+nemo.collections.llm.modelopt.distill.utilsr  rE   r{   r|   r  rz   )r.   r  r,   r,   r/   r  b  s   z$MegatronStep.adjust_tensor_shapes_fnc                    s   d}| j rSt| j}t|trt|dkr|\} d}n
|d dd} d|v r2|d dnd}ddlm} ||| j	d}|rR fd	d
|D }t
|}n| j}d}| |}||fS )aD  
        Converts the provided data into a list of iterators.

        For finetuning, where sequence length is different for each step, this function also outputs the
        sequence length of the current batch.

        Returns:
            List[Iterator[DataT]]: A list of iterators created from the input data.
        Fr:   Tr   Ntokensr9   )get_iterator_k_splitc                    s   g | ]}| fqS r,   r,   )rM   d	batch_idxdataloader_idxr,   r/   rO     s    zAMegatronStep.get_data_iterator_and_seq_length.<locals>.<listcomp>)has_global_batch_samplerr=   r(   r>   r?   r@   r  2nemo.collections.nlp.modules.common.megatron.utilsr  r}   	itertoolschainr  )r.   has_dataloader_idx
batch_datarC   r{   r  r(   packed_datar,   r  r/   r  w  s$   




z-MegatronStep.get_data_iterator_and_seq_lengthc                 C   s`   t | jdd d ur| jjjjdk}|S t | jdd d ur,ddlm} t| jjj	|}|S d}|S )N
datamodulerC   predict_dataloadersr   )MegatronPretrainingBatchSamplerF)
rS   r   r  data_samplerdataloader_typeLnemo.collections.nlp.data.language_modeling.megatron.megatron_batch_samplersr  r>   r  batch_sampler)r.   use_global_batch_samplerr  r,   r,   r/   r    s   
z%MegatronStep.has_global_batch_sampler)NNNN)r)   r=  )r)   r  )r)   r  )'r4   r5   r6   r4  rY   r$   __annotations__r   r#   r   r   r   r6  r|   r   r5  r{   r}   r~   r  rH  r   r   rG  r  r  r  r  r8  rE   r  r   r   cached_propertyr  r  r   r  r  r,   r,   r,   r/   r   ^  sx   
 	
+'

			"$r   c                   @   s  e Zd ZdZdedefddZdeddfddZded	ed
dddfddZded	ed
dde	ddf
ddZ
dedee	 ddfddZdedee	 ddfddZdedee	 dddeejeeejf f ddf
ddZ	ddedee	 deeejeeejf f  ddfddZdS )r]  a_  
    Defines callback methods for various stages of the Megatron model's execution.

    This class outlines the structure for callbacks that can be implemented to hook into
    different phases of the Megatron model's training or inference process. Each method
    represents a specific point in the execution where custom logic can be inserted.
    r   r)   c                 C      dS )a  
        Called at the beginning of each Megatron step.

        This method is invoked before any processing of the step begins. It allows for
        any necessary setup or initialization for the step.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.

        Returns:
            MegatronStep: The potentially modified MegatronStep object.
        Nr,   r.   r   r,   r,   r/   r     s   z&CallbackMethods.on_megatron_step_startNc                 C   r  )ab  
        Called before processing of microbatches begins.

        This method is invoked just before the model starts processing the microbatches
        within a step. It can be used for any preparations needed before microbatch processing.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
        Nr,   r  r,   r,   r/   r     s   
z.CallbackMethods.on_megatron_microbatches_startrC   r   r`   c                 C   r  )a  
        Called at the start of processing each microbatch.

        This method is invoked before the forward pass of each microbatch. It provides
        access to the current batch data and the loss reduction callback.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            batch (DataT): The current microbatch of data being processed.
            forward_callback (MegatronLossReduction): The callback for loss reduction.
        Nr,   )r.   r   rC   r   r,   r,   r/   r     s   z,CallbackMethods.on_megatron_microbatch_startr2   c                 C   r  )aU  
        Called at the end of processing each microbatch.

        This method is invoked after the forward pass of each microbatch. It provides
        access to the processed batch, the loss reduction callback, and the output of the forward pass.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            batch (DataT): The microbatch of data that was processed.
            forward_callback (MegatronLossReduction): The callback for loss reduction.
            output (Any): The output from the forward pass for this microbatch.
        Nr,   )r.   r   rC   r   r2   r,   r,   r/   r     s   z*CallbackMethods.on_megatron_microbatch_endr   c                 C   r  )a  
        Called after all microbatches in a step have been processed.

        This method is invoked once all microbatches within a step have been processed.
        It provides access to the outputs from all microbatches.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            microbatch_outputs (List[Any]): A list of outputs from all processed microbatches.
        Nr,   r.   r   r   r,   r,   r/   r     s   z,CallbackMethods.on_megatron_microbatches_endc                 C   r  )a  
        Called before the reduction of microbatch outputs begins.

        This method is invoked just before the model starts reducing (e.g., averaging)
        the outputs from all microbatches. It can be used for any preparations needed
        before the reduction process.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            microbatch_outputs (List[Any]): A list of outputs from all processed microbatches.
        Nr,   r  r,   r,   r/   r     s   z5CallbackMethods.on_megatron_reduce_microbatches_startr_   r   c                 C   r  )a  
        Called after the reduction of microbatch outputs is complete.

        This method is invoked after the model has finished reducing the outputs from
        all microbatches. It provides access to the original microbatch outputs,
        the loss reduction object, and the final reduced output.

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            microbatch_outputs (List[Any]): A list of outputs from all processed microbatches.
            loss_reduction (MegatronLossReduction): The object used for loss reduction.
            reduced (Union[torch.Tensor, Dict[str, torch.Tensor]]): The final reduced output.
        Nr,   )r.   r   r   r_   r   r,   r,   r/   r     s   z3CallbackMethods.on_megatron_reduce_microbatches_endc                 C   r  )a  
        Called at the end of each Megatron step.

        This method is invoked after all processing for a step is complete. It provides
        access to the outputs from all microbatches and the final reduced output (if available).

        Args:
            step (MegatronStep): The MegatronStep object representing the current step.
            microbatch_outputs (List[Any]): A list of outputs from all processed microbatches.
            reduced (Optional[Union[torch.Tensor, Dict[str, torch.Tensor]]]): The final reduced
                output, if available. This may be None for certain configurations or pipeline stages.
        Nr,   )r.   r   r   r   r,   r,   r/   r   3  s   z$CallbackMethods.on_megatron_step_endr+   )r4   r5   r6   r4  r   r   r   r#   r   r   r   r   r   r   r   r7   r   r	   r   r   r   r   r,   r,   r,   r/   r]    sj    



r]  
ReductionTc                       sp   e Zd Zd fddZdddZdd Zd	ed
ejde	eje
f fddZejdee
 dejfddZ  ZS )r`   r)   Nc                    s$   t t|   d | _| | j d S r+   )rr   r`   rs   rC   register_forward_pre_hook_pre_forward_hookr+  rx   r,   r/   rs   L  s   zMegatronLossReduction.__init__c                 C   s
   || _ d S r+   rC   )r.   rC   r,   r,   r/   r  Q  rP  zMegatronLossReduction.setupc                 C   s   | j f| S r+   r  )r.   r   r   r,   r,   r/   r  T  s   z'MegatronLossReduction._pre_forward_hookrC   forward_outc                 C      t dNz Must be implemented by subclass.NotImplementedError)r.   rC   r  r,   r,   r/   r   W  rE  zMegatronLossReduction.forwardlosses_reduced_per_micro_batchc                 C   r  r  r  )r.   r  r,   r,   r/   r   Z  r,  zMegatronLossReduction.reducer3  )r4   r5   r6   rs   r  r  r#   r7   r   r   r  r   abcabstractmethodr   r   r9  r,   r,   rx   r/   r`   K  s    
""r`   c                   @   s4   e Zd Zdejdeejeeejf f fddZdS )r   r   r)   c                 C   r*   r+   r,   )r.   r   r,   r,   r/   rG  a  r1   z!MegatronCallbackProtocol.__call__N)	r4   r5   r6   r7   r   r   r	   r   rG  r,   r,   r,   r/   r   _  s    ,r   c                   @   sh   e Zd Zdddddeeee f deejjeejj f de	de	de	d	e
e	 d
ededefddZdS )r  NF)r  rz   collect_non_loss_datar  rE   r}   r{   r|   r  rz   r  r)   c       	   
      C   r*   r+   r,   )
r.   r   r  rE   r}   r{   r|   r  rz   r  r,   r,   r/   rG  f  s   zMegatronStepProtocol.__call__)r4   r5   r6   r   r   r   r7   r    rk   r5  r   r6  rj   rG  r,   r,   r,   r/   r  d  s.    	
r  c                 C       t | tsJ tdd | D S )Nc                 S   "   g | ]}t d d | D qS )c                 S   s   g | ]}|  qS r,   )nelementri  r,   r,   r/   rO   x      z5_calc_number_of_params.<locals>.<listcomp>.<listcomp>sumr   rM   r   r,   r,   r/   rO   x     " z*_calc_number_of_params.<locals>.<listcomp>r>   rj   r  rE   r,   r,   r/   r   u     r   c                 C   r  )Nc                 S   r  )c                 S   s   g | ]	}|j r| qS r,   )r   numelri  r,   r,   r/   rO   ~  s    z?_calc_number_of_trainable_params.<locals>.<listcomp>.<listcomp>r  r  r,   r,   r/   rO   ~  r  z4_calc_number_of_trainable_params.<locals>.<listcomp>r  r  r,   r,   r/   r   {  r  r   c                 C   s    t | tsdS tdd | D S )NFc                 s   s    | ]
}t |tjjV  qd S r+   )r>   collectionsr  r   r  r,   r,   r/   r     s    z'is_list_of_iterators.<locals>.<genexpr>)r>   rj   r   )varr,   r,   r/   is_list_of_iterators  s   
r  r  c                    sl   t | trt| dkr|S G  fddd  |g}t|t| k r4||d   t|t| k s#|S )a  Convert data iterator into form expected by Megatron.

    With interleaved pipeline parallelism, Megatron expects a
    list of one data iterator per model chunk. Each model
    chunk independently gets data from its data iterator, so
    we need to interact with the data iterator multiple times
    for each microbatch step. Instead of incorporating this
    logic into the data loader, we cache the iterator's output
    to the first model chunk and reuse it in the other model
    chunks.
    r9   c                       sH   e Zd ZdZG dd dZdefddZ fddZd	d
 Zdd Z	dS )z1_make_data_iterator_list.<locals>.CachingIteratorz$Iterator wrapper that caches values.c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	z7_make_data_iterator_list.<locals>.CachingIterator.Proxyz{Returns values from caching iterator wrapper.

            Assumed to never advance past the caching iterator.
            c                 S   s   t  | _d S r+   )queueQueuecacher+  r,   r,   r/   rs     s   z@_make_data_iterator_list.<locals>.CachingIterator.Proxy.__init__c                 S      | S r+   r,   r+  r,   r,   r/   __iter__     z@_make_data_iterator_list.<locals>.CachingIterator.Proxy.__iter__c                 S   s
   | j  S r+   )r  
get_nowaitr+  r,   r,   r/   __next__  rP  z@_make_data_iterator_list.<locals>.CachingIterator.Proxy.__next__N)r4   r5   r6   r4  rs   r  r  r,   r,   r,   r/   Proxy  s
    r  iteratorc                 S   s   || _ g | _d S r+   )r  proxies)r.   r  r,   r,   r/   rs     s   
z:_make_data_iterator_list.<locals>.CachingIterator.__init__c                    s   | j    | j d S )N)r  rq   r  r+  CachingIteratorr,   r/   
make_proxy  s   
z<_make_data_iterator_list.<locals>.CachingIterator.make_proxyc                 S   r  r+   r,   r+  r,   r,   r/   r    r  z:_make_data_iterator_list.<locals>.CachingIterator.__iter__c                 S   s&   t | j}| jD ]}|j| q|S r+   )r=   r  r  r  put)r.   valproxyr,   r,   r/   r    s   

z:_make_data_iterator_list.<locals>.CachingIterator.__next__N)
r4   r5   r6   r4  r  r   rs   r  r  r  r,   r  r,   r/   r    s    r  r   )r>   rj   r@   rq   r  )rE   r  itersr,   r  r/   r    s   
$r  c                       sv   e Zd Zddededdf fddZd	eeejf d
ejde	ejejeeejf f fddZ
dejfddZ  ZS )MaskedTokenLossReductionFTr   val_drop_lastr)   Nc                    s   t    || _|| _d S r+   )rr   rs   r   r  )r.   r   r  rx   r,   r/   rs     s   

z!MaskedTokenLossReduction.__init__rC   r  c                 C   s   t |tr|\}}||d< t||d \}}| jr-| js-| r-|dks(J dt|}| 	 
tj}t| 	 d|dg}||d|ifS )zTaken from: https://github.com/NVIDIA/NeMo/blob/main
        /nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py#L951-L976 .	loss_maskr   z!Got NaN loss with non-empty inputr9   loss_sum_and_ub_size)r>   r?   masked_token_lossr   r  isnanr7   
zeros_likeclonedetachtor5  catview)r.   rC   r  r  loss_sumnum_valid_tokensr  r,   r,   r/   r     s   

"z MaskedTokenLossReduction.forwardc                 C   s   |rVd|d v rdd |D }t | }|S ddlm} dd |D }t|dkr4t |jddnt jddgt j	
 d	}t jj||jd
dd |d |d  }|S t jdt j	
 d	S )zTaken from: https://github.com/NVIDIA/NeMo/blob/main
        /nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py#L535-L552 .avgr   c                 S   s   g | ]}|d  qS )r  r,   r   r,   r,   r/   rO     r  z3MaskedTokenLossReduction.reduce.<locals>.<listcomp>r   c                 S   s$   g | ]}|d  d dkr|d  qS )r  r9   r   r,   r   r,   r,   r/   rO     s    )dimr   r   Tr   )groupr9   )r7   r  meanrh   r   r@   vstackr  r   rA   rB   distributed
all_reducer   )r.   r  r  lossr   r  r,   r,   r/   r     s(   
zMaskedTokenLossReduction.reduce)FT)r4   r5   r6   r6  rs   r	   r   r7   r   r   r   r   r9  r,   r,   rx   r/   r    s    
r  c                
       sT   e Zd Zdeeejf deejejf deejeeejf f f fddZ  Z	S )$MaskedTokenLossReductionWithLossMaskrC   r  r)   c                    s   |\}}||d< t  ||S )Nr  )rr   r   )r.   rC   r  r  rx   r,   r/   r      s   z,MaskedTokenLossReductionWithLossMask.forward)
r4   r5   r6   r	   r   r7   r   r   r   r9  r,   r,   rx   r/   r
    s    r
  r   maskc                 C   s:   |  d }| d }t|| }| }||fS )zS
    The function takes as input per-token loss and masks non-required values.
    r  )r  floatr7   r  )r   r  lossesr  r   r  r,   r,   r/   r    s
   r  c                  c   s4    ddl m} m} |  z	d V  W |   d S |   w )Nr   clear_aux_losses_tracker&reduce_aux_losses_tracker_across_ranks)'megatron.core.transformer.moe.moe_utilsr  r  r  r,   r,   r/   moe_loss_tracker_ctx  s   r        ?c                    s   t  8 t } fdd| D }i }| D ]\}}||vr%d||< ||  |  7  < q|W  d    S 1 s>w   Y  d S )Nc                    s"   i | ]\}}||d      qS )rz  )r  r
  
loss_scaler,   r/   r  *  r  z,aggregate_moe_loss_stats.<locals>.<dictcomp>r   )r  r   "get_moe_layer_wise_logging_trackerr  r  r-  )r  tracker
aux_lossestotal_loss_dictr:  	loss_listr,   r  r/   aggregate_moe_loss_stats&  s   $r  )r  )]r  collections.abcr  r   r  r  r  r   r   
contextlibr   r   dataclassesr   typingr   r   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   r   r7   torch.distributed lightning.pytorch.trainer.statesr   lightning.pytorch.utilitiesr   rh   r   megatron.core.distributedr   rL  r   megatron.core.optimizerr   ,megatron.core.transformer.transformer_configr   r   r    typing_extensionsr!   %megatron.core.distributed.custom_fsdpr"   r   r[  r   r#   rk   r$   r&   r7  rX  rY  r`  r'   rD   rH   rX   ri   rY   r   r  r   r\   r   r]  r  r`   r   r  r5  r   r   r6  r  r  r  r
  r  r  no_gradr  r,   r,   r,   r/   <module>   s   L      ? l  U =:
