o
    }oiUd                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlZd dlZd dlZd dlmZ d dl m!Z!m"Z" d dl#m$Z$ d d	l%m&Z& d d
l'm(Z( d dlm)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z;m<Z<m=Z= d dl>m?Z@ d dlAmBZB d dlCmDZD eZE	 edZFeG dd de!ZGG dd de<ZHdeeeF  deFfddZIdeejJejKf fdd ZLd!eeee= ee= f  dee= fd"d#ZMG d$d% d%e<ZNG d&d' d'ejOe8jPe8jQZRG d(d) d)e(ZSG d*d+ d+eZTG d,d- d-e0ZUG d.d/ d/ejVZWG d0d1 d1ejXZYd2d3 ZZeded fd4d5Z[ej\]d6ej\j^ej\j_d7d8 Z`d9d: Zaebd;krtea  dS dS )<    N)contextmanager)	dataclass)Path)AnyDictIterableIteratorListOptionalSequenceTuple	TypedDictTypeVarUnion)TensorBoardLogger)ModelParallelConfigparallel_state)OptimizerConfig)	ModelType)MegatronModule)Tensornn)
DataLoader)
transforms)MNIST)	lightning)llm)
NeMoLoggerioresume)DataTMegatronLossReduction
ReductionT)	callbacks)MegatronOptimizerModule)MegatronDataSamplerTc                   @   s:   e Zd ZU dZdZeed< dZeed< dej	fddZ
dS )	ExampleConfigzExampleConfig is a dataclass that is used to configure the model.

    Timers from ModelParallelConfig are required for megatron forward compatibility.
    Fcalculate_per_token_lossfp8returnc                 C   s   t | S )zThis function is called by the strategy to construct the model.

        Note: Must pass self into Model since model requires having a config object.

        Returns:
            The model object.
        )ExampleModelself r.   `/home/ubuntu/.local/lib/python3.10/site-packages/tests/collections/llm/test_mnist_model_nemo2.pyconfigure_modelC   s   zExampleConfig.configure_modelN)__name__
__module____qualname____doc__r(   bool__annotations__r)   r   Moduler0   r.   r.   r.   r/   r'   9   s
   
 r'   c                   @   sD   e Zd ZdZdededeeef fddZde	e defdd	Z
d
S )MSELossReductionz]A class used for calculating the loss, and for logging the reduced loss across micro batches.batchforward_outr*   c                 C   s@   |d }|}|d }| |dd}tj||}|d|ifS )a-  Calculates the loss within a micro-batch. A micro-batch is a batch of data on a single GPU.

        Args:
            batch: A batch of data that gets passed to the original forward inside LitAutoEncoder.
            forward_out: the output of the forward method inside LitAutoEncoder.

        Returns:
            A tuple containing [<loss_tensor>, ReductionT] where the loss tensor will be used for
                backpropagation and the ReductionT will be passed to the reduce method
                (which currently only works for logging.).
        datax_hatr   avg)viewsizer   
functionalmse_loss)r-   r9   r:   xoutputsr<   xviewlossr.   r.   r/   forwardQ   s   zMSELossReduction.forwardlosses_reduced_per_micro_batchc                 C   s   t dd |D }| S )aZ  Works across micro-batches. (data on single gpu).

        Note: This currently only works for logging and this loss will not be used for backpropagation.

        Args:
            losses_reduced_per_micro_batch: a list of the outputs of forward

        Returns:
            A tensor that is the mean of the losses. (used for logging).
        c                 S   s   g | ]}|d  qS )r>   r.   ).0rF   r.   r.   r/   
<listcomp>q       z+MSELossReduction.reduce.<locals>.<listcomp>)torchstackmean)r-   rH   
mse_lossesr.   r.   r/   reducef   s   zMSELossReduction.reduceN)r1   r2   r3   r4   r    r   r   r"   rG   r   rP   r.   r.   r.   r/   r8   N   s    r8   seqr*   c                 C   s"   | D ]
}|dur|  S qt d)z;Returns the first non-None value from the sequence or failsNznon-None value not found)
ValueError)rQ   sr.   r.   r/   
some_firstu   s
   rT   c           
   
   C   s(  |    r dkr  t d   r#d! r! "  }|s$ t d    tjd r: d  d  }} ||fS    tjjd rf } zt| }W n ty_ } zt d|d }~ww |j|j	fS    t
d r} d  } t| }t|S   td r }	t|	}t|S   	 td	)
Nr   z!Looking up dtype on an empty listz!Looking up dtype on an empty dict)dtypedevice   r.   z6Cannot get dtype on a torch module with no parameters.)keysvalueszGot something we didnt expect)rR   rL   r   r   r7   next
parametersStopIterationrU   rV   dictrT   get_dtype_devicelist	TypeError)
torch_objectr;   rU   rV   mperY   vallr.   r.   r/   r^   }   s:   $

r^   batchesc                    s0      r dkr d t jd r   t j ddS      r9 dkr9 d td r8    fdd d D S      r] dkr] d td r\   t fddtt d D S      r dkr d td r~    fd	d
tt d D S    du r dS   r dkr td 	 td)aI  Takes a sequence of batches and collates them into a single batch.
        This is distinct from the standard pytorch default_collator since it does
        not add the batch dimension, it's assumed the batch
        dimension is already present in the input, as would be the case when
        parallelizing across minibatches.

    IMPORTANT: The underlying data primitive _must_ be a torch Tensor. The input to this function is a recurisve type,
    there can be any amount of nesting between dictionaries, tuples, and lists, as long as the inner type is a n-d torch.Tensor.

    Examples:
        Outer container = Dict:
            [{'a': torch.tensor([1]), 'b': torch.tensor([2])}, {'a': torch.tensor([2]), 'b': torch.tensor([3])}] -> {'a': torch.tensor([1, 2]), 'b': torch.tensor([2, 3])}
        Outer container = List:
            [[torch.tensor([1]), torch.tensor([2])], [torch.tensor([2]), torch.tensor([3])]] -> [torch.tensor([1, 2]), torch.tensor([2, 3])]
        Outer container = Tuple:
            ([torch.tensor([1]), torch.tensor([2])], [torch.tensor([2]), torch.tensor([3])]) -> (torch.tensor([1, 2]), torch.tensor([2, 3]))

    Args:
        batches (Optional[Sequence[ReductionT]]): sequence of batches to collate into a single batch.

    Returns:
        A single batch of the same type as the elements of your input sequence.
    rW   r   r.   )dimc                    s$   i | ]  t  fd dD qS )c                       g | ]}|  qS r.   r.   rI   r9   keyr.   r/   rJ      rK   z-batch_collator.<locals>.<dictcomp>.<listcomp>batch_collatorrI   rg   rk   r/   
<dictcomp>   s   $ z"batch_collator.<locals>.<dictcomp>c                 3   s&    | ] t  fd dD V  qdS )c                    ri   r.   r.   rj   ir.   r/   rJ      rK   z,batch_collator.<locals>.<genexpr>.<listcomp>Nrm   ro   rp   rr   r/   	<genexpr>   s   $ z!batch_collator.<locals>.<genexpr>c                    s"   g | ] t  fd dD qS )c                    ri   r.   r.   rj   rr   r.   r/   rJ      rK   z-batch_collator.<locals>.<listcomp>.<listcomp>rm   ro   rp   rr   r/   rJ      s   " z"batch_collator.<locals>.<listcomp>Nz Cannot process an empty sequencez-Unsupported input structure in batch_collator)	rL   r   catr]   tuplerangelenr_   rR   rp   r.   rp   r/   rn      s(   $""""
rn   c                   @   sF   e Zd ZdZdededeejef fddZde	e defddZ
d	S )
PassthroughLossReductiona  Internally in NeMo2.0 the forward step is always expected to return a loss reduction class, and forward is expected to return a loss.
    This class hijacks that mechanism to instead pass through the forward output unperturbed as the loss (to enable inference in the predict step), and then the
    reduce method is used to collate the batch of forward outputs into a single batch. This supports the model forward output being a tensor, dict, tuple,
    or list of tensors. The inner type _must always be a torch.Tensor_.
    r9   r:   r*   c                 C   s    t |\}}tjd||d|fS )as  _summary_

        Args:
            batch (DataT): The batch of data that was passed through the model to generate output.
            forward_out (torch.Tensor): The output from your model's forward pass.

        Returns:
            Tuple[torch.Tensor, ReductionT]: A tuple containing the loss tensor (dummy in this case) and the forward output (unmodified).
        rW   )rV   rU   )r^   rL   zeros)r-   r9   r:   rU   rV   r.   r.   r/   rG      s   
z PassthroughLossReduction.forwardc                 C   s   t |S )a:  This overrides the standard reduce with a simplified version that just takes a list of your model's forward outputs
            and collates them togehter into a single output.

        Args:
            forward_out (List[ReductionT]): _description_

        Returns:
            ReductionT: _description_
        rm   )r-   r:   r.   r.   r/   rP      s   
zPassthroughLossReduction.reduceN)r1   r2   r3   r4   r    r   rL   r   rG   r	   rP   r.   r.   r.   r/   ry      s     ry   c                       s   e Zd ZdZ fddZddedee defdd	Z	ddee fd
dZ
defddZdefddZdefddZdefddZdddZ  ZS )LitAutoEncoderzhA very basic lightning module for testing the megatron strategy and the megatron-nemo2-bionemo contract.c                    s6   t    || _ttddddd| _| j|  dS )zInitializes the model.

        Args:
            config: a Config object necessary to construct the actual nn.Module (the thing that has the parameters).
        g-C6?adamT)lr	optimizeruse_distributed_optimizerconfigN)super__init__r   r$   r   optimconnectr-   r   	__class__r.   r/   r      s   
zLitAutoEncoder.__init__Nr9   	batch_idxr*   c                 C   s   |d }|  |S )ar  This forward will be called by the megatron scheduler and it will be wrapped.

        !!! note

            The `training_step` defines the training loop and is independent of the `forward` method here.

        Args:
            batch: A dictionary of data.
            batch_idx: The index of the batch.

        Returns:
            The output of the model.
        r;   )module)r-   r9   r   rC   r.   r.   r/   rG      s   
zLitAutoEncoder.forwardc                 C   s
   | ||S )aR  The training step is where the loss is calculated and the backpropagation is done.

        Background:
        - NeMo's Strategy overrides this method.
        - The strategies' training step will call the forward method of the model.
        - That forward method then calls the wrapped forward step of MegatronParallel which wraps the forward method of the model.
        - That wrapped forward step is then executed inside the Mcore scheduler, which calls the `_forward_step` method from the
            MegatronParallel class.
        - Which then calls the training_step function here.

        In this particular use case, we simply call the forward method of this class, the lightning module.

        Args:
            batch: A dictionary of data. requires `batch_idx` as default None.
            batch_idx: The index of the batch.
        r.   )r-   r9   r   r.   r.   r/   training_step  s   
zLitAutoEncoder.training_stepc                 C      t  S Nr8   r,   r.   r.   r/   training_loss_reduction     z&LitAutoEncoder.training_loss_reductionc                 C   r   r   r   r,   r.   r.   r/   validation_loss_reduction     z(LitAutoEncoder.validation_loss_reductionc                 C   r   r   r   r,   r.   r.   r/   test_loss_reduction  r   z"LitAutoEncoder.test_loss_reductionc                 C   r   r   )ry   r,   r.   r.   r/   predict_loss_reduction  r   z%LitAutoEncoder.predict_loss_reductionc                 C   s   | j  | _d S r   )r   r0   r   r,   r.   r.   r/   r0   #  s   zLitAutoEncoder.configure_modelr   )r*   N)r1   r2   r3   r4   r   r   r
   intr   rG   r   r!   r   r   r   r   r0   __classcell__r.   r.   r   r/   r{      s    r{   c                       sV   e Zd Zdeddf fddZdedeeef fddZd	e	e ddfd
dZ
  ZS )r+   r   r*   Nc                    sd   t  | tj| _tdd| _t | _	tdd| _
tdd| _t | _tdd| _dS )zConstructor of the model.

        Args:
            config: The config object is responsible for telling the strategy what model to create.
          @      N)r   r   r   encoder_or_decoder
model_typer   Linearlinear1ReLUrelulinear2linear3relu2linear4r   r   r.   r/   r   (  s   

zExampleModel.__init__rC   c                 C   sX   | |dd}| |}| |}| |}| |}| |}| |}||dS )zForward pass of the model.

        Args:
            x: The input data.

        Returns:
            x_hat: The result of the last linear layer of the network.
        r   r=   )r<   z)r?   r@   r   r   r   r   r   r   )r-   rC   r   r<   r.   r.   r/   rG   7  s   	






zExampleModel.forwardinput_tensorc                 C   s   dS )a'  This is needed because it is a megatron convention. Even if it is a no-op for single GPU testing.

        See megatron.model.transformer.set_input_tensor()

        Note: Currently this is a no-op just to get by an mcore function.

        Args:
            input_tensor: Input tensor.
        Nr.   )r-   r   r.   r.   r/   set_input_tensorI  s   
zExampleModel.set_input_tensor)r1   r2   r3   r   r   r   r   strrG   r
   r   r   r.   r.   r   r/   r+   '  s    r+   c                   @   s&   e Zd ZU eed< eed< eed< dS )	MnistItemr;   labelidxN)r1   r2   r3   r   r6   r   r.   r.   r.   r/   r   V  s   
 r   c                       s&   e Zd Zdedef fddZ  ZS )MNISTCustomindexr*   c                    s   t  |\}}|||dS )a  Wraps the getitem method of the MNIST dataset such that we return a Dict
        instead of a Tuple or tensor.

        Args:
            index: The index we want to grab, an int.

        Returns:
            A dict containing the data ("x"), label ("y"), and index ("idx").
        )r;   r   r   )r   __getitem__)r-   r   rC   yr   r.   r/   r   ]  s
   
zMNISTCustom.__getitem__)r1   r2   r3   r   r   r   r   r.   r.   r   r/   r   \  s    r   c                   @   s@   e Zd Zdd Zdd ZdddZddd	Zd
d Zdd ZdS )LossLoggingCallbackc                 C   s   g | _ g | _dS )zqLog the loss at the end of each batch. For training do not reduce across the epoch but do so for validation/test.N)
val_lossestest_lossesr,   r.   r.   r/   r   r  s   
zLossLoggingCallback.__init__c                 C   sN   t j dkr#t r%t|tr|d }|}|jd|ddddd d S d S d S )Nr   rF   
train_lossT)on_stepprog_barloggerrank_zero_only)rL   distributedget_rankr   is_pipeline_last_stage
isinstancer]   log)r-   trainer	pl_modulerD   r9   r   rF   r.   r.   r/   on_train_batch_endw  s   
z&LossLoggingCallback.on_train_batch_endr   c                 C   D   t j dkrt r t|tr|d }|}| j| d S d S d S Nr   rF   )	rL   r   r   r   r   r   r]   r   appendr-   r   r   rD   r9   r   dataloader_idxrF   r.   r.   r/   on_test_batch_end  s   
z%LossLoggingCallback.on_test_batch_endc                 C   r   r   )	rL   r   r   r   r   r   r]   r   r   r   r.   r.   r/   on_validation_batch_end  s   
z+LossLoggingCallback.on_validation_batch_endc                 C   b   t j dkr+t r-t| jdkr/t | j }|j	d|dddd | j
  d S d S d S d S )Nr   val_lossTr   r   r   )rL   r   r   r   r   rx   r   rM   rN   r   clear)r-   r   r   avg_val_lossr.   r.   r/   on_validation_epoch_end     z+LossLoggingCallback.on_validation_epoch_endc                 C   r   )Nr   	test_lossTr   )rL   r   r   r   r   rx   r   rM   rN   r   r   )r-   r   r   avg_test_lossr.   r.   r/   on_test_epoch_end  r   z%LossLoggingCallback.on_test_epoch_endN)r   )	r1   r2   r3   r   r   r   r   r   r   r.   r.   r.   r/   r   q  s    

r   c                       sh   e Zd Zddededdf fddZd	eddfd
dZdefddZdefddZ	defddZ
  ZS )MNISTDataModule./    data_dir
batch_sizer*   Nc                    sL   t    || _|| _d| _d| _d| _d | _t| j| j| j| jd| _	d S )N   d   )seq_lenmicro_batch_sizeglobal_batch_sizerampup_batch_size)
r   r   r   r   r   r   max_lenr   r%   data_sampler)r-   r   r   r   r.   r/   r     s   
zMNISTDataModule.__init__stagec                 C   st   t | jdt dd| _t | jdt dd| _t | jdt dd}tjjj	|ddgt
 dd\| _| _dS )	zeSets up the datasets

        Args:
            stage: can be one of train / test / predict.
        TF)download	transformtraini  i  *   )	generatorN)r   r   r   ToTensor
mnist_testmnist_predictrL   utilsr;   random_split	Generatormanual_seedmnist_train	mnist_val)r-   r   
mnist_fullr.   r.   r/   setup  s   zMNISTDataModule.setupc                 C      t | j| jddS Nr   )r   num_workers)r   r   r   r,   r.   r.   r/   train_dataloader     z MNISTDataModule.train_dataloaderc                 C   r   r   )r   r   r   r,   r.   r.   r/   val_dataloader  r   zMNISTDataModule.val_dataloaderc                 C   r   r   )r   r   r   r,   r.   r.   r/   test_dataloader  r   zMNISTDataModule.test_dataloader)r   r   )r1   r2   r3   r   r   r   r   r   r   r   r   r   r.   r.   r   r/   r     s    r   c                   C   s@   dt jj_tj  t rt	  tj
 rtj
  dS dS )zResets _GLOBAL_NUM_MICROBATCHES_CALCULATOR in megatron which is used in NeMo to initialized model parallel in
    nemo.collections.nlp.modules.common.megatron.megatron_init.initialize_model_parallel_for_nemo
    N)megatroncorenum_microbatches_calculator#_GLOBAL_NUM_MICROBATCHES_CALCULATORrL   cudaempty_cacher   is_initializeddestroy_model_parallelr   destroy_process_groupr.   r.   r.   r/   _reset_megatron_parallel_state  s   


r   c                   c   s$    zt   dV  W t   dS t   w )zIPuts you into a clean parallel state, and again tears it down at the end.N)r   r.   r.   r.   r/   reset_megatron_parallel_state  s
   r   GPUc                  C   s0   t jt} d|  }tj|dtjtjd d S )Nzpython T)shellstdoutstderr)ospathabspath__file__
subprocess
check_callsysr  )r  callr.   r.   r/   Atest_train_mnist_litautoencoder_with_megatron_strategy_single_gpu  s   
r  c                  C   sr  t  )} t| }| sJ | sJ t h d}tjddddddd}|}|| }tt	||d}t
t	||||d	}tt d
}tjdddddd}	tjdd|	dddddtt gd	}
t|d}tj|||
|tjdddd |
  W d   n1 sw   Y  t  tjdddddtdddddd}tjdd|t	|d}|jdd}t|d }t| sJ d| dtt|j |
jj }t!|j"||# |d}t$|% d d!hksJ d"| |d! j&t'|j(dfksJ |d  j&t'|j(d#fks
J |  W d   n1 sw   Y  W d   dS W d   dS 1 s2w   Y  dS )$znThis is the actual test that will get run in a subprocess so it does not contaminate the state of other tests.test_experimentTr   rW      z6{model_name}--{val_loss:.2f}-{step}-{consumed_samples})	save_lastmonitor
save_top_kevery_n_train_stepsfilenamealways_save_context)save_dirname)log_dirr  tensorboardckptr   r   )tensor_model_parallel_sizepipeline_model_parallel_sizeddpfind_unused_parametersenable_nemo_ckpt_iogpu   )	acceleratordevicesstrategylimit_val_batchesval_check_interval	max_steps	num_nodeslog_every_n_stepsr#   )r   )resume_if_existsresume_ignore_no_checkpoint)modelr;   r   r   r   Nr      F)r   r   r   
output_log)r  r  r  r  r  r   )r!  r"  r#  default_root_dirz.ckpt weightszcheckpoint z not found in )dataloaders	ckpt_pathr   r<   z>We expect forward output from predit_step, not the loss, got: r   ))tempfileTemporaryDirectoryr   existsis_dirr   nl_callbacksModelCheckpointr   r   r   r{   r'   nlMegatronStrategyTrainerr   track_ior   r   r   r   r   
AutoResume	_teardownr%   last_model_pathreplacer  listdirparentr+  r   rn   predictr   setrX   shaperx   r   )
tmpdir_strtmpdirr  checkpoint_callbackroot_dirr  	tb_loggernemo_loggerr+  r#  r   data_modulepred_strategypredict_trainerr2  unwrapped_trained_modelforward_outputr.   r.   r/   @run_train_mnist_litautoencoder_with_megatron_strategy_single_gpu  s   		


8



 <$rQ  __main__)cr  r  r
  r3  
contextlibr   dataclassesr   pathlibr   typingr   r   r   r   r	   r
   r   r   r   r   r   lightning.pytorchpytorchpl)megatron.core.num_microbatches_calculatorr   pytestrL   torch.distributedlightning.pytorch.loggersr   megatron.corer   r   megatron.core.optimizerr   megatron.core.transformer.enumsr    megatron.core.transformer.moduler   r   r   torch.utils.datar   torchvisionr   torchvision.datasetsr   nemor   r9  nemo.collectionsr   nemo.lightningr   r   r    nemo.lightning.megatron_parallelr    r!   r"   nemo.lightning.pytorchr#   r7  nemo.lightning.pytorch.optimr$   nemo.lightning.pytorch.pluginsr%   TokenizerTyper&   r'   r8   rT   rU   rV   r^   rn   ry   LightningModuleIOMixinConnectorMixinr{   r+   r   r   Callbackr   LightningDataModuler   r   r   markrun_only_onintegrationpleasefixmer  rQ  r1   r.   r.   r.   r/   <module>   sp   4'*)!G/,-
	
m
