o
    }oiM                 	   @   s"  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlmZ d dlm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? d dl@mAZA d dlBmCZCmDZD d dlBmEZF d dlBmGZGmHZHmIZImJZJmKZK d dlLmMZMmNZN d dlOmPZP d dlQmRZR d dlSmTZT zd dlUmVZV W n eWy   d dlXmVZV Y nw d d lYmZZZ d d!l[m\Z\m]Z] d d"l^m_Z_ d d#l`maZa d d$lbmcZc d d%ldmeZe d d&lfmgZgmhZh d d'limjZjmkZkmlZl zd d(lmmnZn d d)lompZp d*ZqW n eWerfyc   d+ZqY nw zd dlsZsd*ZtW n eWerfyz   d+ZtY nw z@d d,lumvZvmwZw d d-lxmyZy d d.lzm{Z{ d d/l|m}Z} d d0l~mZmZmZ d d1lmZ d d lmZZ d d2lmZ d d3lmZ d*ZW n eWerfy   d+ZY nw zd d4lmZ W n eWerfy   ehd5 d d4lmZ Y nw zd d6lmZmZ d*ZW n ey   d+ZY nw d7Zd8Zd9e d:Z	;d]d<ed=ed>ed?dfd@dAZG dBdC dCe6ZG dDdE dEeZd^dFejjdGed?edH fdIdJZd^dFejjdGed?edH fdKdLZG dMdN dNe7ZG dOdP dPeaZG dQdR dRe2ZG dSdT dTe4ZG dUdV dVejjjZG dWdX dXe2ZG dYdZ dZe*ZG d[d\ d\e$ZdS )_    N)OrderedDictdefaultdict)contextmanager)Path)AnyCallableDict	GeneratorIteratorListLiteralMappingOptionalSizedUnion)TorchCheckpointIO)get_filesystem)_optimizer_to_device)TQDMProgressBar)	_update_n)LightningOptimizer)_DataFetcher)ClusterEnvironment)CheckpointIO)_WrappingCheckpointIO)MixedPrecisionPlugin)FSDPPrecision)DDPStrategyFSDPStrategy)	TrainerFn)Trainer)	OmegaConf)ReduceOp)	noop_hook)BackwardPrefetchFullStateDictConfig)FullyShardedDataParallel)MixedPrecisionOptimStateKeyTypeShardedStateDictConfigShardingStrategyStateDictType)FullOptimStateDictConfigShardedOptimStateDictConfig)transformer_auto_wrap_policy)DistributedDataParallel)is_global_rank_zero)_refresh_per_optimizer_state)Float16Module)AutocastTransformerLayerParallelTransformerLayer)utils_funcs)SaveRestoreConnector)MainParamsOptimizerWrapper)init_optimizer_states)AppStatelogging)ckpt_to_dirinject_model_parallel_rankuninject_model_parallel_rank)MegatronDistributedFusedAdam)McoreDistributedOptimizerTF)dist_checkpointingparallel_state)CheckpointingException)dict_list_map_outplace)LocalNonpersistentObject)!get_param_id_to_sharded_param_mapmake_sharded_optimizer_tensoroptim_state_to_sharding_state)&param_is_not_tensor_parallel_duplicate)TransformerLayer)DistributedCheckpointIO)get_num_microbatcheszCMegatron num_microbatches_calculator not found, using Apex version.)restore_sharded_modelopt_statesave_sharded_modelopt_state.NEMO_MEGATRON_MODEL_PARALLEL_APPSTATE_OVERRIDEzIhttps://docs.nvidia.com/nemo-framework/user-guide/latest/knownissues.htmlz
    (1) To resolve this issue, try to set `model.dist_ckpt_load_strictness` to `log_all`. This setting enables loading older checkpoints.
    (2) For more details and troubleshooting guidance, please refer to the framework documentation: z.
   sharpnccl_communicator_config_pathdistributed_timeout_minutesreturnc                 C   s  t  }|jdurt  tj rtj|j|j	|j
|j|j|j|| |j|jr(dnd|j|d |jt ks8J |jt ksAJ t |_t |_t |_t |_t |_|jrddl m!} ddl"}|j!#|d}||j!#dk rtjj$d	d
 dS dS dS dS dS )zInitializes Megatron-LM model parallel if using model parallelism.

    Args:
        sharp: Apply SHARP to NCCL data-parallel communication.
        nccl_communicator_config_path: Path to the yaml NCCL communication process group config file.
    Nztp-cp-ep-pp-dpztp-cp-ep-dp-pp)tensor_model_parallel_sizepipeline_model_parallel_size$virtual_pipeline_model_parallel_size"pipeline_model_parallel_split_rank$pipeline_model_parallel_comm_backendcontext_parallel_sizerQ   	use_sharpexpert_model_parallel_sizeorder#num_distributed_optimizer_instancesrR   r   )versiontransformer_enginez1.9mpi)backend)%r9   model_parallel_sizerA   destroy_model_paralleltorchdistributedis_initializedinitialize_model_parallelrT   rU   rV   rW   rX   rY   r[   use_tp_pp_dp_mappingr]   tensor_model_parallel_rankget_tensor_model_parallel_rankpipeline_model_parallel_rank get_pipeline_model_parallel_rankget_tensor_model_parallel_grouptensor_model_parallel_groupget_data_parallel_groupdata_parallel_groupget_data_parallel_rankdata_parallel_rankget_data_parallel_world_sizedata_parallel_size!get_pipeline_model_parallel_grouppipeline_model_parallel_groupinit_mpi_proc_groupimportlib.metadatar^   	packagingVersion	new_group)rP   rQ   rR   	app_stater^   ry   
te_version r~   \/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/nlp/parts/nlp_overrides.pyinit_model_parallel   sF   	






r   c                       s&  e Zd ZdZ							d?deeej  dedee	 de
dee d	e
d
e
deeeeef f ddf fddZd@ fddZdAdededdf fddZ fddZdBddZ	dCdeeef deeef dee ddfddZdDdeeef d!e
ddfd"d#Zd$edefd%d&Zd'eeef fd(d)Zd*eeef d+eeef fd,d-Zd*eeef d+eeef deeef fd.d/ZdDd*eeef d0e
deeef fd1d2Zdeeef deeef fd3d4Zdeeef ddfd5d6Ze d7d8 Z!e  fd9d:Z"e de
fd;d<Z#e de	fd=d>Z$  Z%S )ENLPDDPStrategya}  DDP plugin for Pytorch Lightning. Needed to customize DDP for model parallel models.

    Args:
        no_ddp_communication_hook: Disable DDP communication hook when using AMP-O2
        with FP32 gradient accumulation.
        nccl_communicator_config_path: Path to the yaml file with NCCL communicator options
        sharp: Apply SHARP to NCCL data-parallel communication.
    NFparallel_devicescluster_environmentcheckpoint_iono_ddp_communication_hookrQ   rP   dist_ckpt_parallel_savekwargsrS   c           	         sN   t stdtstdt jd|||d| || _|| _|| _|| _d S )NzApex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt.megatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt.)r   r   r   r~   )		HAVE_APEXImportErrorHAVE_MEGATRON_COREsuper__init__r   rQ   rP   _dist_ckpt_parallel_save)	selfr   r   r   r   rQ   rP   r   r   	__class__r~   r   r      s&   
zNLPDDPStrategy.__init__trainer
pl.Trainerc                    s^   |j j}|tjkrt | dS | jdusJ | j| |   |   | j	dus-J dS )z
        Override setup() of DDPStrategy to avoid _sync_module_states(self.model) during eval
        as it can cause PP > 1 to hang due to assumption in DDPStrategy class that the same
        model is replicated across GPUs
        N)
statefnr   FITTINGr   setupacceleratormodel_to_devicesetup_precision_pluginmodel)r   r   
trainer_fnr   r~   r   r      s   
zNLPDDPStrategy.setupglobal_rank
world_sizec                    sJ   t    t s!t }|jdur#t| j| j| j	
 d d dS dS dS )zSet up distributed environment.N<   )rR   )r   setup_distributedrA   model_parallel_is_initializedr9   rb   r   rP   rQ   _timeouttotal_seconds)r   r   r   r|   r   r~   r   r     s   


z NLPDDPStrategy.setup_distributedc                    s   t | jdr
| jjst | jdr#| jjr#| jjr| j  | j| _dS t }|jdurSt	
d t| jfdtjddi| j| _| jrQd| j_| jdt dS dS t   dS )	zOverride LightningModule ddp if using model parallel.
        Sets find_unused_parameters to False to use activation-checkpoint-recomputation.
        megatron_amp_O2with_distributed_adamNz&Configuring DDP for model parallelism.process_groupTwith_context_parallelF)hasattrr   r   r   use_mcore_dist_optim setup_mcore_distributed_parallel_modelr9   rb   r:   infor/   rA   ro   _ddp_kwargsr   require_backward_grad_syncregister_comm_hookr#   r   configure_ddp)r   r|   r   r~   r   r     s.   





	zNLPDDPStrategy.configure_ddpc                    s>  | j jdd}| j  }dd | D }t|tr$|j|||| jdS t|tr/|||S t|tsSt	| |
 }t|tjdd |jD dt| |S t	|j |
 }t|tjd	d |jD dt|d
 t|d d ks|J fdd  fddt|d
 |d d D |d
< t|d  |S )ab  
        Sharded state dictionary for an MainParamsOptimizerWrapper.
        Used to save and load the optimizer state when training with distributed_checkpoint.
        Returns:
            dict: The sharded state dictionary for the optimizer
        Raises:
            ValueError: If a parameter ID does not match any model sharded parameter.
        F)use_pl_optimizerc                 S   s    i | ]\}}| d s||qS )_extra_state)endswith).0keyvaluer~   r~   r   
<dictcomp>O  s
    z?NLPDDPStrategy.optimizer_sharded_state_dict.<locals>.<dictcomp>)
is_loadingr   c                 s   s    | ]}|d  V  qdS )paramsNr~   r   gr~   r~   r   	<genexpr>a  s    z>NLPDDPStrategy.optimizer_sharded_state_dict.<locals>.<genexpr>)model_sharded_state_dictoptim_params_iterc                 s   s    | ]}|V  qd S Nr~   r   r~   r~   r   r   m  s    fp32_from_fp16_params	optimizerparam_groupsc              
      s6   z |  W S  t y } z	td|  d|d }~ww )Nz	Param id z' does not match any model sharded param)KeyError
ValueError)param_ide)id_to_sharded_param_mapr~   r   get_safeu  s   
z=NLPDDPStrategy.optimizer_sharded_state_dict.<locals>.get_safec                    s,   g | ]\}} fd dt |d |D qS )c                    s"   g | ]\}}t  ||d dqS )zoptimizer.state.fp32_paramprefix)rF   )r   r   
fp32_paramr   r~   r   
<listcomp>|  s    zJNLPDDPStrategy.optimizer_sharded_state_dict.<locals>.<listcomp>.<listcomp>r   )zip)r   
fp32_groupstate_groupr   r~   r   r   {  s    
z?NLPDDPStrategy.optimizer_sharded_state_dict.<locals>.<listcomp>)lightning_module
optimizerssharded_state_dictitems
isinstancer?   r   r>   r7   r8   
state_dictrE   	itertoolschainfrom_iterabler   rG   r   float16_groupslenr   )r   unsharded_optim_stater   r   r   optimizer_state_dictr~   )r   r   r   optimizer_sharded_state_dict@  sP   










z+NLPDDPStrategy.optimizer_sharded_state_dict
checkpointfilepathstorage_optionsc                 C   s   t  }| jr`|s
dn|dd}|r.t|d dksJ d| j|d d d}|g|d< nd|d< tg |d	< | jj|t||d
 t	r\t
| jdr^t| j t|| jjdd dS dS dS t|}| jsl|jdkrw| jj|||d
 dS dS )a(  PTL method which we override to accomodate distributed checkpoints and
        the legacy model parallel checkpoints.

        When using megatron core, the distributed checkpointing library expects save functions to be
        called on every rank and internally does the rank checking.
        Tinclude_optimizeroptimizer_states   zFCurrently only support checkpointing 1 distributed optimizer per time!r   )r   Nr   r   get_model_module_listmodel.r   )r9   use_distributed_checkpointinggetr   r   r   r   save_checkpointr;   HAVE_MODELOPTr   r   rM   r   unwrapped_checkpoint_iosave_sharded_strategyr<   is_global_zerorr   )r   r   r   r   r|   r   sharded_optim_stater~   r~   r   r     s4   	

	zNLPDDPStrategy.save_checkpointTstrictc                 C   s   | j rdS d}d}t| jdrd}| jj}nt| jdr"d}| jj}|durTt|ts0t|trTi }|d  D ]}|	| d| dd}|d | ||< q8||d< | jj
|d |d dS )	z!Load lightning module state dict.Nr   enc_dec_modelr   ..module.r   )r   )r   r   r   r   r   r   r2   MCoreFloat16Modulekeysreplaceload_state_dictr   r   r   	model_key
model_attrnew_state_dictr   new_keyr~   r~   r   load_model_state_dict  s$   
z$NLPDDPStrategy.load_model_state_dictckptc                    sL   t j sJ t j t j ft jdt j d  fdd}t||S )z4Ensure checkpoint tensors are on the correct device.cuda)indexc                    s*   t | tjr| jr| j kr|  } | S r   )r   rd   Tensoris_cudadeviceto)tcur_devr~   r   _fix_device  s   
z7NLPDDPStrategy._fix_tensors_device.<locals>._fix_device)rd   r  rf   is_availabler  current_devicerC   )r   r   r
  r~   r  r   _fix_tensors_device  s   "
z"NLPDDPStrategy._fix_tensors_devicer   c                 C   s4   d|d d vr|d d d S |d d d d S )z)Return the param groups in the state dictr   r   r   r   r~   r   r   r~   r~   r   _get_param_group  s
   zNLPDDPStrategy._get_param_groupcheckpoint_pathr   c                 C   s\   t |}|ddu rdS |d d dddu rdS | |}| |}t|t|kS )z
        Check if the number of param groups in the checkpoint not match with the sharded_state_dict
        Returns:
            bool: True if the number of param groups does not match
        r   NFr   r   )r@   load_common_state_dictr   r  r   )r   r  r   common_state_dictmodel_param_groupscheckpoint_param_groupsr~   r~   r   _check_param_groups_mismatch  s   


z+NLPDDPStrategy._check_param_groups_mismatchc           
      C   s*  t |}| |}| |}tdd |D }tdd |D }d}|re|setd tdd t|D d}|ra||t	g dd d	|d
 d v rX||d
 d d	 d< n||d
 d d< nt
d| jj||d}	|durd	|	d
 d v r|	d
 d d	 d | |	S |	d
 d d | |	S )ad  
        Try to fix the param groups in the checkpoint.
        This is to fix the bug that in 24.03, all checkpoints store EP param group regardless of using EP or not.
        This function makes sure all checkpoints are compatible for loading.
        Returns:
            sharded_state_dict: Loaded dictionary for the distributed load function
        c                 s       | ]	}| d dV  qdS 	is_expertFNr   r   paramr~   r~   r   r         z3NLPDDPStrategy._fix_param_groups.<locals>.<genexpr>c                 s   r  r  r  r  r~   r~   r   r     r  NzCurrently training the model without expert parallelism while restored checkpoint has EP params. Ignoring the EP params for restoring.c                 s   s$    | ]\}}| d dr|V  qdS r  r  )r   r  entryr~   r~   r   r     s   " T)r   r  r   r   r   r   z+Cannot find expert param in the checkpoint.r   )r@   r  r  anyr:   warningnext	enumerateinsertrD   r   r   load_checkpointpop)
r   r  r   r  r  r  model_has_expert_paramcheckpoint_has_expert_paramexpert_indexloaded_state_dictr~   r~   r   _fix_param_groups  s6   



z NLPDDPStrategy._fix_param_groupsload_optimizer_statesc              
   C   s  t |}t }| jr|||std| dtr)t| jdr)t| j	 |dd | j
 }i }||d< |r?| jddg|d	< | ||rL| ||}n"z
| jj||d
}W n tym } z| dt }t|d}~ww t| jddrz| |}|S t|}d}	td|	d D ]}
||r ntd| d|
 d|	 d t|
d  qtd| dtj  t }| j|}t }|| }t d|j! d|dd|dd |S )a}  PTL method which we override to integrate distributed checkpoints for model parallel models.
        In order to load distributed checkpoints we need to provide the sharded_state_dict to
        the distributed load function. We get the sharded_state_dict from self.lightning_module
        which makes it convenient to have the loading logic happen at the strategy level.
        z6Distributed checkpoints should be a directory. Found: r   r   r   r   r   T)r   r   r  
Ncontinue_trainingF
   r   Checkpoint at z not found. Fail count z out of     not found. Aborting training.z Global Checkpoint Load : Rank : z : Start time : z.3fz#s : Time spent in load_checkpoint: s)"r   r9   r   isdirr   r   r   r   rL   r   r   r   r  r*  r   r$  rB   
LOAD_ERRORgetattr#_integrate_original_checkpoint_datar<   rangeexistsr:   r   timesleepFileNotFoundErrorrd   r  empty_cache	monotonicr   r   )r   r  r+  fsr|   r   r   r   error_messagemax_error_counterror_count
start_timeend_timedurationr~   r~   r   r$  '  sf   




zNLPDDPStrategy.load_checkpointc                 C   s   | j jj }|D ]}|dvr|| ||< q	d|d d v r2|d d d d |d d d d< |S |d d d |d d d< |S )z
        Ensures that model and optimizer weights are loaded from the checkpoint.
        All other metadata are reinitialized.
        )r   r   r   r   r   r   )r   r   _checkpoint_connectordump_checkpoint)r   r   original_checkpointr   r~   r~   r   r6  n  s$   
z2NLPDDPStrategy._integrate_original_checkpoint_datac                 C   sf   | j r| jr| jt| dS dS t }t|}| js!|jdkr1t	d|  | j| dS dS )z$Delete checkpoint saved at filepath.r   Removing checkpoint: N)
r   r   r   remove_checkpointr;   r9   r<   rr   r:   r   r   r   r|   r~   r~   r   rI    s   z NLPDDPStrategy.remove_checkpointc                 C   sj   t ot| jt}t| jdo| j du}|r&|s&td t	| jj
| _|s3|r3td t | _|S )z<Whether to use distributed checkpointing from megatron core.r   NzeDistributed checkpoints requires DistributedCheckpointIO plugin to be used. Setting up a default now.z^DistributedCheckpointIO configured but should not be used. Reverting back to TorchCheckpointIO)r   r   r   rJ   r   r   r   r:   r   from_configcfgr   r   )r   has_dist_ckpt_iohas_sharded_state_dictr~   r~   r   r     s   z,NLPDDPStrategy.use_distributed_checkpointingc                    s0   t  }|jdurt|j|jd}|S tt| jS )z#Provide distributed sampler kwargs.N)num_replicasrank)r9   rb   dictrt   rr   r   r   distributed_sampler_kwargs)r   r|   rR  r   r~   r   rR    s   
z)NLPDDPStrategy.distributed_sampler_kwargsc                 C      dS )zThis needs to be True for distributed checkpointing because
        we require the model to have configured the optimizer before
        deserializing the checkpoint.
        Tr~   r   r~   r~   r   restore_checkpoint_after_setup     z-NLPDDPStrategy.restore_checkpoint_after_setupc                 C   s$   | j }t|tr|j }t|ts|S )zFReturns CheckpointIO unwrapped from any _WrappedCheckpointIO wrappers.)r   r   r   )r   r   r~   r~   r   r     s
   

z&NLPDDPStrategy.unwrapped_checkpoint_io)NNNFNFF)r   r   rS   N)NN)NFr   )T)&__name__
__module____qualname____doc__r   r   rd   r  r   r   boolstrr   r   r   r   r   intr   r   r   r   r   r   r   r  r  r  r*  r$  r6  rI  propertyr   rR  rU  r   __classcell__r~   r~   r   r   r      s    	
"
,K


 -"



(."G
r   c                   @   s   e Zd ZdZdd ZdS )NLPDDPStrategyNotebooka  Version of NLPDDPStrategy to be used in a Jupyter Notebook
    A large portion of Megatron code has DDP dependency, so it has been necessary to use NLPDDPStrategy even for
    single-GPU training (e.g. in a Jupyter notebook)
    A PTL 2.0 changes has prevented DDPStrategy to be used in a notebook.
    This version of NLPDDPStrategy enables megatron training in a notebook in PTL 2.0.
    c                 C   s
   d | _ d S r   )	_launcherrT  r~   r~   r   _configure_launcher  s   
z*NLPDDPStrategyNotebook._configure_launcherN)rW  rX  rY  rZ  rb  r~   r~   r~   r   r`    s    r`  module
rank0_onlyNNNc                 C   s,   t dd}tdd}tj| tj||d}|S )NT)offload_to_cpurc  state_dict_typestate_dict_configoptim_state_dict_config)r)   r-   FSDPrh  r+   SHARDED_STATE_DICT)rc  rd  ri  rj  state_dict_type_contextr~   r~   r   _get_sharded_state_dict_context  s   

rn  c                 C   s0   t d|d}td|d}tj| tj||d}|S )NT)rf  rd  rg  )r,   r%   rk  rh  r+   FULL_STATE_DICT)rc  rd  rj  ri  rm  r~   r~   r   _get_full_state_dict_context  s   rp  c                       s  e Zd ZdZ								d+dedeeef ded	eeef d
ee dedee dee	 dee
eee
f f ddf fddZd	eeef deeef deeef defddZd, fddZdeee
f fddZdejjdeeejf fddZd-deee
f ddfddZdeee
f ddfddZ	d-deee
f d eeef d!ee
 ddfd"d#Zd$eeef deee
f fd%d&Zd eeef ddfd'd(Zedefd)d*Z  ZS ).NLPFSDPStrategya^  FSDP plugin for Pytorch Lightning with the support for tensor-parallelism.

    Args:
        sharding_strategy: FSDP parameter sharding strategy.
        grad_reduce_dtype: Data type for FSDP gradient shard ReduceScatter.
        sharded_checkpoint: Store/load FSDP-sharded checkpoints.
        precision: Precision recipe to be used with FSDP.
    fullNF
bf16-mixedsharding_strategygrad_reduce_dtypesharded_checkpoint	precisionrQ   rP   set_buffer_dtypeextra_fsdp_wrap_moduler   rS   c	                    s   t stdtstd| j|||d|	d< tj|	d< ddlm}
 tt	t
|
h| _|d ur2| j| tjt| jd|	d	< tjtjtjd
}|t| v sQJ d|dksYJ d|| |	d< || _|rftnt| _|| _|| _|| _t jdi |	 d S )Nr   r   )rx  mixed_precisionbackward_prefetchr   )BasicTransformerBlock)transformer_layer_clsauto_wrap_policy)rr  hybridgradz"Not a supported sharding strategy.r  z,Hybrid sharding is currrently not supported.rt  r~   )r   r   r   _set_mixed_precision_reciper$   BACKWARD_PRE>nemo.collections.multimodal.modules.stable_diffusion.attentionr|  MCoreTransformerLayerr3   r4   fsdp_wrap_moduleupdate	functoolspartialr.   r*   
FULL_SHARDHYBRID_SHARDSHARD_GRAD_OPlistr   rv  rn  rp  state_dict_contextrQ   rP   rt  r   r   )r   rt  ru  rv  rw  rQ   rP   rx  ry  r   r|  fsdp_sharding_strategyr   r~   r   r     sJ   



zNLPFSDPStrategy.__init__c                 C   s   |dv rt j } }}n |dv rt j } }}n|dkr$t j } }}ntd|d|dur6t|d}|dur@t|d}t|||dS )a  
        Set FSDP mixed precision recipe.
        `param_dtype` sets the data type for computation in forward and backpropagation, and the parameter
        data type for optimizer execution is maintained in the full precision.
        `buffer_dtype` is only valid when a module has buffers by `register_buffer` method, which is not
        shared by FSDP.
        `reduce_dtype` sets gradient reduction data type.
        )z16-true16-mixed   )z	bf16-truers  bf16    z-Was unable to infer precision type, received r   N)param_dtypereduce_dtypebuffer_dtype)rd   float16bfloat16floatr   r5   torch_dtype_from_precisionr'   )r   rw  ru  rx  r  r  r  r~   r~   r   r  >  s    z+NLPFSDPStrategy._set_mixed_precision_recipec                    s  t    t s+t }|jdksJ d| jtjkr$|j	dks$J dt
| j| j tjdd| jd< g | jd< t dkr~| j D ];}t|dd	rU| jd | qDtjtt|tjtj d
}tjj|tjt d |dkr}| jd | qDdS dS )z4
        Overriding to set parallel states.
        r   z*FSDP does not support pipeline parallelismzHFSDP hybrid sharding cannot be used when tensor_model_parallel_size > 1.Tr   r   ignored_statessequence_parallelFdtyper  opgroupr   N) r   setup_environmentrA   r   r9   rU   rt  r*   r  rT   r   rP   rQ   ro   r   $get_tensor_model_parallel_world_sizer   
parametersr5  appendrd   tensorr]  rH   int8r  r  re   
all_reducer"   MINrm   )r   r|   pis_not_tp_duplicater   r~   r   r  \  s8   


z!NLPFSDPStrategy.setup_environmentc                 C   sh   | j dusJ | jst dkrdnd}| j| j|d | j  }W d   |S 1 s-w   Y  |S )zN
        Store the model state dict in one of full or sharded format.
        Nr   TF)rd  )r   rv  rA   r  r  r   r   )r   rd  r   r~   r~   r   lightning_module_state_dict  s   
z+NLPFSDPStrategy.lightning_module_state_dictr   c                 C   sP   t |tr|j}| | j t| j|}W d   |S 1 s!w   Y  |S )zW
        Store the full optimizer state dict in one of full or sharded format.
        N)r   r   
_optimizerr  r   rk  optim_state_dict)r   r   r  r~   r~   r   optimizer_state  s   

zNLPFSDPStrategy.optimizer_stater   c                 C   s   d}d}t | jdrd}| jj}nt | jdrd}| jj}|durOt|ts+t|trOi }|d  D ]}|| d| dd}|d | ||< q3||d< | 	| j | j
|d  W d   dS 1 siw   Y  dS )z-Load lightning module states from checkpoint.Nr   r   r   r   r   r   )r   r   r   r   r   r2   r   r   r   r  r   r   r~   r~   r   r     s$   
"z%NLPFSDPStrategy.load_model_state_dictc           
      C   s2  dd }|d }t | j|D ]\}}| | jv ||}tt|d  d trsz+tj	| jddd t
|tj| j}W d	   n1 sHw   Y  t|| j}W n tyr } ztd
|  td W Y d	}~nd	}~ww tj|| j|d}	||	 t|| j W d	   n1 sw   Y  qd	S )zV
        Re-key the full optimizer state dict to sharded optimizer state dict
        c                 S   s:   | }	 d|v r	|S t |tsJ d|t| d  }q)NTr   z"Fail to find optimizer state dict.r   )r   rQ  r  r   )	opt_statetemp_opt_stater~   r~   r   _get_osd  s   z;NLPFSDPStrategy.load_optimizer_state_dict.<locals>._get_osdr   r   r   TF)	writebackrd  Nz3Failed to load optimzier state dicts. Errored with r   )r  r   optim)r   r   r  r   r   r  r   r]  rk  summon_full_paramsrekey_optim_state_dictr(   
PARAM_NAMEshard_full_optim_state_dict	Exceptionprintexitoptim_state_dict_to_loadr   r   root_device)
r   r   r  r   r   r  temp_osdrekeyed_osdr   sharded_osdr~   r~   r   load_optimizer_state_dict  s<   
z)NLPFSDPStrategy.load_optimizer_state_dictr   r   c                 C   sT   t  }t|| jd}| js|jdkr| jj|||d dS dS | jj|||d dS )zStore checkpoints
        1. In case of sharded checkpoint, all ranks store unique checkpoints.
        2. In case of non-sharded checkpoint, all data-parallel rank 0 store checkpoints.
        fsdp_sharded_ckptr   r   N)r9   r<   rv  rr   r   r   )r   r   r   r   r|   r~   r~   r   r     s   
zNLPFSDPStrategy.save_checkpointr  c                 C   s   t |}t|| jd}||std| dtj  ddlm	} |t
 d | j|}W d   |S 1 s<w   Y  |S )zLoad checkpointsr  r/  r1  r   )load_with_process_group)r   N)r   r<   rv  r8  r;  rd   r  r<  torch.distributed._shard.apir  rA   ro   r   r$  )r   r  r>  r  r   r~   r~   r   r$    s   


zNLPFSDPStrategy.load_checkpointc                 C   sh   t  }t|| jd}| jrtd|  | j| dS |jdkr2td|  | j| dS dS )zRemove checkpointsr  rH  r   N)r9   r<   rv  r:   r   r   rI  rr   rJ  r~   r~   r   rI    s   
z!NLPFSDPStrategy.remove_checkpointc                 C   rS  )zWhen loading FSDP-sharded checkpoint, need to restore checkpoint after configuring
        FSDP sharding to match FSDP-sharded format between the checkpoint and the current
        model and optimizer.
        Tr~   rT  r~   r~   r   rU  
  rV  z.NLPFSDPStrategy.restore_checkpoint_after_setup)rr  NFrs  NFNNrS   Nr   ) rW  rX  rY  rZ  r\  r   r]  r[  r   setr   r   r   r'   r  r  r  rd   r  	Optimizerr  r  r   r   r  r   r   r$  rI  r^  rU  r_  r~   r~   r   r   rq    sv    

	
G



& 
*


"rq  c                       s   e Zd ZdZd fddZdef fddZd	d
 Zd fdd	Z							ddede	e
eef  de	ej dedededede	e f fddZ  ZS )NLPSaveRestoreConnectorz8Custom connector to support saving and restoring states.rS   Nc                    s*   t std tstd t   d S )NzApex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/apex
Megatron-based models require Apex to function correctly.r   r   r:   r   r   r   r   rT  r   r~   r   r     s   	z NLPSaveRestoreConnector.__init__	save_pathc                    s  t  }|jddrd}nt|do| du}d}|jdur$|jdks'|rtj|}|rt	tj
|| j}t sYdd }|jjjdurS|jjjj||jd |jj  | }tj|jdd	}	|	|| trt|d
rt|	tr|	j}	t|	tswt| ||	jdd n8|jdkr|jdkrtj
|d|jdd| j }
ntj
|d|jdd|j dd| j }
| !|" |
 t#j$ rt#j$%  |rt& }n|j dko|jdko|jdk}|rt'( }|rt)*t+|| n|jdkr3t,|j-D ]1}t.tj
|d|d tj
|d|dd| j }
t)*|
tj
|d|d| j q nLt/0t,|j-t,|jD ]?\}}t.tj
|d|dd|d tj
|d|dd|dd| j }
t)*|
tj
|d|dd|d| j q?tj
|| j1}|j2|d t|dr|j3dur| j4||d | j5||d | j6r| 7|| ntj|}t8|D ]}t)*tj
||| qW d   n	1 sw   Y  t#j$ rt#j$%  dS dS t9 :||S )zSave model to save path.fsdpFr   Nr   c                   S      d S r   r~   r~   r~   r~   r   dummy?     z.NLPSaveRestoreConnector.save_to.<locals>.dummyr   )
async_saver   r   r   r   mp_rank_02d_tp_rank_	_pp_rank_03d)path2yaml_file	artifacts)nemo_file_folder);r9   rL  r   r   r   rb   ospathdirnamer;   joinmodel_weights_ckptrA   rf   r   strategylauncherlaunchr  rJ   rK  r   r   r   r   r   rM   r   r   rr   rU   ri   rk   _save_state_dict_to_diskr   rd   re   barrierr0   tempfileTemporaryDirectoryshutilmover\  r7  rT   makedirsr   productmodel_config_yamlto_config_filer  _handle_artifacts_update_artifact_pathspack_nemo_file_make_nemo_file_from_folderlistdirr   save_to)r   r   r  r|   	dist_ckptdist_ckpt_dirdir_namer  r   r   mp_model_weightsshould_move_datatmpdirtp_rankpp_rankconfig_yamlfolder_pathfiler   r~   r   r  (  s   








$0zNLPSaveRestoreConnector.save_toc                 C   s  | ddr)i }| D ]}|dd}|dd}|dd}|| ||< q|}| d	drGi }| D ]}|d
dd}|| ||< q5|}i }| D ]}|ddd}|| ||< qM|}| dds}i }| D ]}|ddd}|| ||< qk|}| ddkri }| D ] }|dd}|dd}|dd}|dd}|| ||< q|}| }d|v ri }dd }| D ]:}|dkrq||r|d}	t|	d  }
d|	d!d  tt|
d g |	d" g }|| ||< q|| ||< q|}| d#rD| d# d$dkrDi }| D ].}d%|v rq|d&d'}|d(d)}|d*d+}|d,d-}|d.d/}|| ||< q|}|S )0zRemap keys in state dict.megatron_legacyFzbert_model.language_modelzbert_model.model.language_modeltransformerencoderz.attention.z.self_attention.r   r   zmodel.module.r   z6word_embeddings.adapter_layer.mm_linear_adapter.linearz?word_embeddings.adapter_layer.mm_projector_adapter.mm_projectorinductorz
._orig_mod targetzbnemo.collections.multimodal.models.text_to_image.stable_diffusion.ldm.ddpm.MegatronLatentDiffusionunetzmodel.diffusion_modelvaefirst_stage_modeltext_encodercond_stage_modelz.noise_schedulerz?model.model.diffusion_model.input_blocks.1.0.in_layers.2.weightc           	      S   s   d}g d}|D ]\}dD ]W}dD ]R}dD ]M}|dkr=dD ]}| | d| d| d| d| 
}| |kr;     d	S qqt d
D ]}| | d| d| d| d| 
}| |kr`     d	S qAqqqqdS )Nzmodel.model.diffusion_model.)input_blocksmiddle_blockoutput_blocks)	in_layers
out_layers)      )weightbiasr	  )r   r  r   T   z.0.F)r7  )	r   base_strblocksblock
layer_typer  r  numtemplater~   r~   r   should_process  s.   $$zANLPSaveRestoreConnector.modify_state_dict.<locals>.should_processzEmodel.cond_stage_model.transformer.text_model.embeddings.position_idsr   Nunet_config
use_te_fp8extra_stateznorm_to_q.layer_norm_znorm.znorm_to_q.weightzto_q.weightzff.net.layer_norm_z	ff.net.0.zff.net.fc1_zff.net.1.proj.zff.net.fc2_z	ff.net.3.)r   r   r   splitr]  r  r\  )r   confr   r   r   r   loaded_keysr  key_r2  idxnew_key_r~   r~   r   modify_state_dict  s   
."
z)NLPSaveRestoreConnector.modify_state_dictc                    sN   t |}tj|rt ||S tjtj|d rd S td| d)Nr   	Expected z to be a file or directory.)	r=   r  r  isfiler   _load_state_dict_from_diskr3  splitextr   )r   model_weightsmap_locationuninject_model_weightsr   r~   r   r'    s   z2NLPSaveRestoreConnector._load_state_dict_from_diskTFrestore_pathoverride_config_pathr*  r   return_configr   validate_access_integrityreplace_sharded_tensor_keyc
              
      s  t  ||||||||}
t|
tr|du r|
S |
\}}}|du rt s=dd }|jjdur8|jjj||d |j	  t
 }| jdur\tj| jr\td| j d | j}nd}|rddd	 }| j||d
}| j|||d tj|| j}tj|d }tj|sJ d| dtrt|drt| |dd i }| }||d< |	r|d  D ]}t|dr|jd|	|_qt |}|j!||||d}|"| t|dr|#  W d   n1 sw   Y  n| $||}t  %||| td|j&j' d| d |S )ay  
        Restores model instance (weights and configuration) into .nemo file

        Args:
            restore_path: path to .nemo file from which model should be instantiated
            override_config_path: path to a yaml config that will override the internal
                config file or an OmegaConf / DictConfig object representing the model config.
            map_location: Optional torch.device() to map the instantiated model to a device.
                By default (None), it will select a GPU if available, falling back to CPU otherwise.
            strict: Passed to load_state_dict. By default True
            return_config: If set to true, will return just the underlying config of the restored
                model as an OmegaConf DictConfig object without instantiating the model.

        Example:
            ```
            model = nemo.collections.nlp.models.TextClassification.restore_from('asr.nemo')
            assert isinstance(model, nemo.collections.nlp.models.TextClassification)
            ```

        Returns:
            An instance of type cls or its underlying config (if return_config is set).
        TNc                   S   r  r   r~   r~   r~   r~   r   r  a  r  z3NLPSaveRestoreConnector.restore_from.<locals>.dummyr  z9Restoration will occur within pre-extracted directory : `z`.c                 S   s   d| v S )Nz.yamlr~   )namer~   r~   r   <lambda>w  s    z6NLPSaveRestoreConnector.restore_from.<locals>.<lambda>)	filter_fn)	path2file
out_foldermembersr   r%  z to be a directory.r   r   r   r   r   r   )r   r   r/  "setup_transformer_engine_tp_groupszModel z  was successfully restored from r   )(r   load_config_and_state_dictr   tuplerA   rf   r  r  r  r  r  r  model_extracted_dirr  r  r3  r:   r   _filtered_tar_info_unpack_nemo_filer  r  r(  r   r   rL   r   r   valuesr   r   rJ   rK  r$  on_load_checkpointr7  r$  load_instance_with_state_dictr   rW  )r   calling_clsr,  r-  r*  r   r.  r   r/  r0  loaded_paramsr  instancer   r  r  r3  r6  tmp_model_weights_ckpttmp_model_weights_dirr   r   vr   r   r~   r   restore_from)  s   %








4z$NLPSaveRestoreConnector.restore_fromr  r   )NNTFNTN)rW  rX  rY  rZ  r   r\  r  r$  r'  r   r   r!   rd   r  r[  r    rF  r_  r~   r~   r   r   r    s>     r	
r  c                	       sZ   e Zd ZdZ	dded dedeejj	j
 ddf fdd	Zeded
 fddZ  ZS )PipelineMixedPrecisionPlugint  Overrides PTL autocasting to not wrap training/val/test_step.
    We do this because we have the megatron-core fwd/bwd functions in training_step.
    This means .backward is being called in training_step so we do not want the whole
    step wrapped in autocast.

    We instead wrap the fwd_output_and_loss_func that is passed to the megatron-core fwd/bwd functions.
    Nrw  r  rs  16r  r  r  scalerrS   c                    sn   |dv rd}n|dv rd}nt d| dt j|||d d }|dv r)tj}n|dv r0tj}t| d S )Nr  rJ  r  r  rs  r  rs  Rprecision expected to be one of: ['16-mixed', '16', 16, 'bf16-mixed', 'bf16'] but  found)rK  )RuntimeErrorr   r   rd   r  r  set_autocast_gpu_dtype)r   rw  r  rK  plugin_precisionr  r   r~   r   r     s    z%PipelineMixedPrecisionPlugin.__init__re  c                 c       dV  dS z(Have the PTL context manager do nothing.Nr~   rT  r~   r~   r   forward_context     
z,PipelineMixedPrecisionPlugin.forward_contextr   )rW  rX  rY  rZ  r   r\  r   rd   r  amp
GradScalerr   r   r	   rU  r_  r~   r~   r   r   rG    s    rG  c                       sP   e Zd ZdZ	dded ded ddf fdd	Zeded
 fddZ	  Z
S )FSDPMixedPrecisionPluginrH  Nrw  rI  rK  rX  rS   c                    s@   |dv rd}n|dv rd}nt d| dt j||d d S )NrL  r  rM  rs  rN  rO  )rw  rK  )rP  r   r   )r   rw  rK  rR  r   r~   r   r     s   z!FSDPMixedPrecisionPlugin.__init__re  c                 c   rS  rT  r~   rT  r~   r~   r   rU    rV  z(FSDPMixedPrecisionPlugin.forward_contextr   )rW  rX  rY  rZ  r   r   r   r   r	   rU  r_  r~   r~   r   r   rY    s    rY  c                       sh   e Zd ZdZ						d fdd		Z fd
dZ fddZdd ZdddZdd Z	dd Z
  ZS )rX  z
    Gradient sclaer for model-parallel inf check. The inf in gradients are checked across tensor-parallel
    ranks in (1) executing optimizer step and (2) gradient scaler update.

          @       @      ?  Tr   c                    s&   t  j|||||d d | _|| _d S )N)
init_scalegrowth_factorbackoff_factorgrowth_intervalenabled)r   r   optimizer_update_skipped
hysteresis)r   r^  r_  r`  ra  rb  rd  r   r~   r   r     s   	
zGradScaler.__init__c                    s8   t  | trtj| jgtj|d| _d S | j| _d S )Nr  )r   _lazy_init_scale_growth_tracker
HAVE_AMP_Crd   r  rd  int32_hysteresis_tracker)r   devr   r~   r   re    s   z*GradScaler._lazy_init_scale_growth_trackerc                    s*   t |ddr|j| S t j|g|R  S )N_custom_amp_unscale_gradsF)r5  unscale_gradsr   _unscale_grads_)r   r   argsr   r~   r   rl    s   
zGradScaler._unscale_grads_c                 O   s   d }t |d  }t|jddd}tjj|tjjjt	
 d | | _|| _| jdkr<|j|i |}d| _|S d| _|S )Nfound_inf_per_devicer   T)dimkeepdimr  F)r9  r=  rd   stacksumre   r  r"   MAXrA   get_model_parallel_groupitem_found_infs_cpu_found_infs_cudasteprc  )r   r   r  rm  r   retval
found_infs	found_infr~   r~   r   _maybe_opt_step  s   

zGradScaler._maybe_opt_stepNc              
      s  | j sdS | d\ }|durCt|tr| j| nd}t|tjjs)J ||	 dks3J ||j
du s<J || j| n fdd| j D }t|dksYJ d	|d }tjj|tjjjt d
 t|dkrtdt|D ]}|| }tjj|tjjjt d
 ||7 }qxtrt || j|| j| j| j| j n5|dkr|  jd8  _| jdkrt ||| j| j| j n|d nt ||| j| j| j | j| _tt | _dS )z
        Updates to native grad scaler update function.
        1. Check inf across model-parallel ranks.
        2. Update hysteresis tracker.
        3. Apply hysteresis to grad scale update.
        Nr  z[new_scale should be a float or a 1-element torch.cuda.FloatTensor with requires_grad=False.r   Fc                    s.   g | ]}|d    D ]
}|j jddq
qS )rn  T)r  non_blocking)r=  r  r  )r   r   r{  _scaler~   r   r   >  s    
z%GradScaler.update.<locals>.<listcomp>r   z,No inf checks were recorded prior to update.r  g        )!_enabled_check_scale_growth_trackerr   r  r  fill_rd   r  FloatTensornumelrequires_gradcopy__per_optimizer_statesr=  r   re   r  r"   rs  rA   rt  r7  rf  amp_Cupdate_scale_hysteresisrh  _growth_factor_backoff_factor_growth_intervalrd  _amp_update_scale_r   r1   )r   	new_scale_growth_trackerreasonrz  found_inf_combinedir{  r~   r~  r   r  %  sv   




zGradScaler.updatec                 C   s,   | j r|  | j| j| j|  | jdS i S )zL
        Add hysteresis_tracker to the native functions' state_dict
        )scaler_  r`  ra  r  rh  )r  	get_scaler  r  r  _get_growth_trackerrh  rT  r~   r~   r   r     s   
zGradScaler.state_dictc                 C   s   | j sdS t|dkrtd|d | _| jdur!| j|d  |d | _|d | _|d | _|d | _	| j
durB| j
|d  d	|v rM|d	 | _dS tr\tjd
gtjdd| _dS d
| _dS )z^
        Load hysteresis_tracker in addition to the state dict of the native function
        Nr   zeThe source state dict is empty, possibly because it was saved from a disabled instance of GradScaler.r  r_  r`  ra  r  _hysterisis_trackerr   r  r  )r  r   rP  _init_scaler  r  r  r  r  _init_growth_trackerr  rh  rf  rd   r  rg  r  r~   r~   r   r     s(   







zGradScaler.load_state_dict)rZ  r[  r\  r]  Tr   r   )rW  rX  rY  rZ  r   re  rl  r|  r  r   r   r_  r~   r~   r   r   rX    s    
^rX  c                
       s   e Zd ZdZ	ddeeef dedeej	j
j ddf fddZd	ejjd
edejjf deg ef deddf
ddZeded fddZ  ZS )MegatronHalfPrecisionPlugina  
    Plugin for Half (FP16 and BF16) precision training.
    This plugin assumes the use of the optimizer with master parameters (fp32).
    This plugin uses half-precision at all operators in the model so need of input precision
    at each layer operator.

    Args:
        precision: Whether to use ``torch.float16`` (``16``) or ``torch.bfloat16`` (``'bf16'``).
        device: The device for ``torch.autocast``.
        scaler: An optional :class:`torch.cuda.amp.GradScaler` to use.
    Nrw  r  rK  rS   c                    s@   t  ||| d }|dkrtj}n|dkrtj}t| d S )Nr  rs  )r   r   rd   r  r  rQ  )r   rw  r  rK  r  r   r~   r   r     s   z$MegatronHalfPrecisionPlugin.__init__r   r   zpl.LightningModuleclosurer   c                 K   s   t |ts	J d| jdu r&|jsJ d| }| || |jdi |S |jr-J d| }|  | j| | || |du }t |tj	rO|j
rO|s`| jj|fi | | j  dS dS )z5Run optimizer step and scale gradients, if necessary.zNMegatronHalfPrecisionPlugin supports only the optimizer with master parametersNz BF16 uses FP32 grad accumulationz FP16 uses FP16 grad accumulationr~   )r   r7   rK  fp32_grad_accumulation_after_closurerx  copy_model_grads_to_main_gradsunscale_plLightningModuleautomatic_optimizationr  )r   r   r   r  r   r  closure_resultskipped_backwardr~   r~   r   optimizer_step  s(   
z*MegatronHalfPrecisionPlugin.optimizer_stepre  c                 c   s    zdV  W dS w )zHNo explicit precision casting. Inputs are supposed to be manually castedNr~   rT  r~   r~   r   rU    s   z+MegatronHalfPrecisionPlugin.forward_contextr   )rW  rX  rY  rZ  r   r\  r]  r   rd   r  rW  rX  r   r  r  nnModuler   r   r  r   r	   rU  r_  r~   r~   r   r   r    s2    


$r  c                       sB   e Zd ZdZddededdf fdd	Zd
eddfddZ  Z	S )GlobalBatchDataFetcherz8Overrides PTL DataFetcher. Used to fetch global batches.r   Fprefetch_batchesstore_on_devicerS   Nc                    s0   t std tstd t j||d d S )NzKApex was not found. Using model parallel or megatron models will error out.zUMegatron-core was not found. Using model parallel or megatron models will error out..)r  r  r  )r   r  r  r   r~   r   r     s
   

zGlobalBatchDataFetcher.__init__iteratorc                    sn   |   } fddtt D }|  jd7  _| js/| jr/| j}t|ts'J | jt	|k| _
| || d S )Nc                    s   g | ]}t  qS r~   )r!  )r   r  r  r~   r   r     s    z<GlobalBatchDataFetcher._fetch_next_batch.<locals>.<listcomp>r   )on_fetch_startr7  rK   fetchedr  _has_len
dataloaderr   r   r   doneon_fetch_end)r   r  start_outputbatchr  r~   r  r   _fetch_next_batch  s   z(GlobalBatchDataFetcher._fetch_next_batch)r   F)
rW  rX  rY  rZ  r]  r[  r   r
   r  r_  r~   r~   r   r   r    s    	r  c                       s8   e Zd ZdZdd Z fddZdd Zdd	 Z  ZS )
CustomProgressBarz
    Add CustomProgressBar to remove 's/it' and display progress per step instead of per microbatch
    for megatron models
    c                 C   s   |j jjjjjjjS )z7
        Get the value of step within an epoch
        )fit_loop
epoch_loopr  optim_progressr   rx  current	completed)r   r   r~   r~   r   get_current_epoch_step  s   z(CustomProgressBar.get_current_epoch_stepc                    s   t   | _d| j_| jS )z8
        Override bar_format to not have 's/it'
        zV{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}{postfix}])r   init_train_tqdmbar
bar_formatrT  r   r~   r   r    s   z!CustomProgressBar.init_train_tqdmc                 G   s2   |j }| j| d| j_| jd|j  dS )zPOverride parent class on_train_epoch_start to initialize the progress bar state.r   zEpoch N)	max_stepstrain_progress_barresetinitialset_descriptioncurrent_epoch)r   r   r  num_training_batchesr~   r~   r   on_train_epoch_start&  s   z&CustomProgressBar.on_train_epoch_startc                 O   sB   |  |}| || jjrt| j| | j| || dS dS )z|
        Override parent class on_train_batch_end to update progress bar per global batch instead of per microbatch
        N)r  _should_updater  totalr   set_postfixget_metrics)r   r   	pl_moduler  __nr~   r~   r   on_train_batch_end0  s
   
z$CustomProgressBar.on_train_batch_end)	rW  rX  rY  rZ  r  r  r  r  r_  r~   r~   r   r   r    s    
r  )NrO   )F)r  r   r  r  r  r9  collectionsr   r   
contextlibr   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   r   lightning.pytorchpytorchr  rd   lightning.fabric.pluginsr   #lightning.fabric.utilities.cloud_ior   $lightning.fabric.utilities.optimizerr   $lightning.pytorch.callbacks.progressr   2lightning.pytorch.callbacks.progress.tqdm_progressr    lightning.pytorch.core.optimizerr    lightning.pytorch.loops.fetchersr   lightning.pytorch.pluginsr   .lightning.pytorch.plugins.io.checkpoint_pluginr   $lightning.pytorch.plugins.io.wrapperr   #lightning.pytorch.plugins.precisionr   (lightning.pytorch.plugins.precision.fsdpr   lightning.pytorch.strategiesr   r    lightning.pytorch.trainer.statesr   !lightning.pytorch.trainer.trainerr    	omegaconfr!   torch._C._distributed_c10dr"   ;torch.distributed.algorithms.ddp_comm_hooks.debugging_hooksr#   torch.distributed.fsdpr$   r%   r&   rk  r'   r(   r)   r*   r+   torch.distributed.fsdp.apir,   r-   torch.distributed.fsdp.wrapr.   torch.nn.parallelr/   nemo.utils.get_rankr0   torch.cuda.amp.grad_scalerr1   r   torch.amp.grad_scaler3nemo.collections.nlp.modules.common.megatron.moduler2   8nemo.collections.nlp.modules.common.megatron.transformerr3   r4   nemo.collections.nlp.partsr5   +nemo.core.connectors.save_restore_connectorr6   nemo.core.optimr7   nemo.core.optim.optimizersr8   
nemo.utilsr9   r:   nemo.utils.model_utilsr;   r<   r=    nemo.core.optim.distributed_adamr>   nemo.core.optim.mcore_optimr?   r   ModuleNotFoundErrorr  rf  megatron.corer@   rA   %megatron.core.dist_checkpointing.corerB   +megatron.core.dist_checkpointing.dict_utilsrC   (megatron.core.dist_checkpointing.mappingrD   *megatron.core.dist_checkpointing.optimizerrE   rF   rG   $megatron.core.tensor_parallel.layersrH    megatron.core.transformer.moduler   +megatron.core.transformer.transformer_layerrI   r  !nemo.utils.callbacks.dist_ckpt_iorJ   r   )megatron.core.num_microbatches_calculatorrK   r   (apex.transformer.pipeline_parallel.utilsmodelopt.torch.opt.pluginsrL   rM   r   r  rN   URLr4  r[  r\  r]  r   r   r`  r  r  rn  rp  rq  r  rG  rY  r  rW  rX  r  r  r  r~   r~   r~   r   <module>   s   4

7        (   )  IG