o
    }oi                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZ d dl m!Z" d dl#Z#d dl$Z#d dl%m&Z&m'Z' d d	l(m)Z)m*Z* d d
l+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZF d dl#mGZG d dlHmIZI d dlJmKZK d dlLmMZM d dlNmOZO d dlPmQZQ d dlRmSZS d dlTmUZUmVZV d dlWmXZXmYZY d dlZm[Z[ d d l\m]Z]m^Z^m_Z_m`Z`maZambZbmcZcmdZd d d!lemZ d d"lfmgZg er-d d#lhmiZi ejekZled$Zmed% Zned% Zod&Zpd'ep d(ZqeG d)d* d*ZrG d+d, d,e9eVjsZtd-d. ZuG d/d0 d0e2ZvdS )1    N)OrderedDict)	ExitStackcontextmanagernullcontext)	dataclass)	timedelta)Path)TYPE_CHECKINGAnyCallableContextManagerDictListLiteralMappingOptionalTypeVarUnioncast)CheckpointIOClusterEnvironment)_optimizer_to_device_optimizers_to_device)rank_zero_info)
reset_seed)CPUAccelerator)_AutomaticOptimizationevaluation_loopfit_loopprediction_loop)_DataLoaderIterDataFetcher)DDPStrategy)RunningStage	TrainerFn)STEP_OUTPUT)Timers)StrictHandling)DistributedDataParallelConfig)OptimizerConfig)nn)	noop_hook)CheckpointException)DistributedDataParallel)
DataLoader)override)McoreDistributedOptimizer)_strategy_libio)CallbackConnectorMegatronParallel)ModelTransform)RestoreConfig_destroy_dist_connectionckpt_to_dircreate_checkpoint_iofix_progress_barinit_model_parallelsetup_data_samplersetup_parallel_ranks)logging)AsyncFinalizerCallback)DataSamplerConfigT)megatronpytorchzIhttps://docs.nvidia.com/nemo-framework/user-guide/latest/knownissues.htmlz
    (1) To resolve this issue, try to set `trainer.strategy.ckpt_load_strictness` to False. This setting enables loading older checkpoints.
    (2) For more details and troubleshooting guidance, please refer to the framework documentation: z.
c                   @   s0  e Zd ZU dZeed< eed< ee ed< eed< eed< eed< eed< eed	< ej	ed
< dZ
eed< dZeed< dZeed< dZee ed< dZee ed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeeeeee  f  ed< dS )ParallelismConfigz
    POD containing parallelism configuration.
    Parallelism configuration is passed to MegatronStrategy via constructor arguments,
    then copied to model's config during model setup.
    tensor_model_parallel_sizepipeline_model_parallel_size$virtual_pipeline_model_parallel_size"microbatch_group_size_per_vp_stagecontext_parallel_sizesequence_parallelexpert_model_parallel_sizemoe_extended_tppipeline_dtyper   "encoder_tensor_model_parallel_size$encoder_pipeline_model_parallel_sizeN$pipeline_model_parallel_comm_backend"num_layers_in_first_pipeline_stage!num_layers_in_last_pipeline_stageF'account_for_embedding_in_pipeline_split"account_for_loss_in_pipeline_splituse_te_rng_trackerexpert_tensor_parallel_sizeuse_tp_pp_dp_mapping   #num_distributed_optimizer_instancesnccl_communicator_config_path	use_sharpTuse_gloo_process_groupspipeline_model_parallel_layout)__name__
__module____qualname____doc__int__annotations__r   booltorchdtyperM   rN   rO   strrP   rQ   rR   rS   rT   rU   rV   rX   rY   rZ   r[   r\   r   r    rg   rg   g/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/pytorch/strategies/megatron_strategy.pyrC   f   s4   
 
$rC   c                f       s  e Zd ZU dZejed< 																																																			dd
ededede	e de	e de	e de	e dede
dede
de	e de	e de	e de
de
de	d de	eej  de
de
de
d eeef d!e	e d"e
d#e	ej d$e
d%e
d&ed'e
d(ed)e
d*e
d+e
d,e
d-e
d.e
d/e	d0 d1e
d2e
d3e
d4ed5e	e d6ed7e
d8ed9e	e d:e
d;e	eeeee  f  d<dfb fd=d>Zed?d@ ZejdAd@ ZedBejd<df fdCdDZedejd<dfdEdFZeddGdHZedIed<efdJdKZdejd<dfdLdMZdNdO Z eddPdQZ!edBe"j#d<e"j#f fdRdSZ$ed fdUdVZ%edWe&dXe&d<e'fdYdZZ(e	dd[ej)j*d\e+g e&f dBe	ed]e"j#f  dXe&d<e&f
 fd^d_Z,edWe&dXe&d<e'fd`daZ-edWe&dXe&d<e'fdbdcZ.edWe&dXe&d<e'fdddeZ/ed fdfdgZ0ed<e1f fdhdiZ2djefdkdlZ3ddmdnZ4e	ddoe5ee&f dpeee6f dqe	e& d<dfdrdsZ7ddte
d<e
fdudvZ8eddweee6f dte
d<e5ee&f fdxdyZ9ddzd{Z:eddoe;ee&f dte
d<dfd|d}Z<dpeee6f d<dfd~dZ=ddoe;ee&f de
d<dfddZ>eed<e?fddZ@e@jde?d<dfddZ@ed<efddZAed<e5ee&f f fddZBed<e
fddZCed<eDfddZEeFedde	e
 fddZG  ZHS )MegatronStrategya  Megatron plugin for Pytorch Lightning.

    This strategy implements model parallelism using NVIDIA's Megatron-LM framework. It supports
    various forms of parallelism including tensor model parallelism, pipeline model parallelism,
    sequence parallelism, and expert parallelism for efficient training of large language models.

    Args:
        tensor_model_parallel_size (int): Intra-layer model parallelism. Splits tensors across GPU ranks.
            Defaults to 1.
        pipeline_model_parallel_size (int): Inter-layer model parallelism. Splits transformer layers
            across GPU ranks. Defaults to 1.
        virtual_pipeline_model_parallel_size (Optional[int]): Interleaved pipeline parallelism used to
            improve performance by reducing the pipeline bubble. Defaults to None.
        microbatch_group_size_per_vp_stage (Optional[int]): the number of micro-batches that are executed
            at a time for a given virtual stage (both forward and backward). Defaults to None and convert
            to pipeline_parallel_size. which specifies a depth-first schedule.
        context_parallel_size (int): Splits network input along sequence dimension across GPU ranks.
            Defaults to 1.
        sequence_parallel (bool): Makes tensor parallelism more memory efficient for LLMs (20B+) by
            parallelizing layer norms and dropout sequentially. Defaults to False.
        expert_model_parallel_size (int): Distributes MoE Experts across sub data parallel dimension.
            Defaults to 1.
        expert_tensor_parallel_size (Optional[int]): Sets MoE Experts tensor parallelism size. Defaults to None.
        moe_extended_tp (bool): Alternative parallelization strategy for expert parallelism. Defaults to False.
        account_for_embedding_in_pipeline_split (bool): If set, *input* embedding layer will be treated as a standard
            transformer layer in the context of partition and placement for pipeline parallelism.
        account_for_loss_in_pipeline_split (bool): If set, loss layer will be treated as a standard transformer
            layer in the context of partition and placement for pipeline parallelism.
        data_sampler (Optional['DataSampler']): Custom data sampler for distributed training. Defaults to None.
        parallel_devices (Optional[List[torch.device]]): List of devices to use for parallelism. Defaults to None.
        cluster_environment: Cluster environment for distributed training. Defaults to None.
        checkpoint_io: Checkpoint I/O handler. Defaults to None.
        find_unused_parameters (bool): Find unused parameters in DDP. Defaults to False.
        ckpt_type (TrainerCkptProtocol): Checkpoint type. Defaults to TrainerCheckpoint.
        ckpt_load_optimizer (bool): Load optimizer state from trainer.ckpt_path. Defaults to True.
        ckpt_save_optimizer (bool): Save optimizer states in checkpoint. Defaults to True.
        ddp (Union[DDPLiteral, DistributedDataParallelConfig]): DDP configuration. Defaults to "megatron".
        fsdp (Optional[FSDPLiteral]): Option of using torch FSDP2, select from ["megatron", "pytorch"].
            Defaults to None.
        lazy_init (bool): Use lazy initialization for model parallel parameters. Defaults to False.
        pipeline_dtype (Optional[torch.dtype]): Data type for pipeline parallelism. Defaults to None.
        save_ckpt_format (str): Distributed checkpoint format to use for checkpoint saving. Should be one of
            'torch_dist' or 'zarr'. Defaults to 'torch_dist'.
        ckpt_async_save (bool): Whether to save checkpoints asynchronously to reduce checkpointing overhead.
            Defaults to True.
        ckpt_torch_dist_multiproc (int): Number of extra processes per rank used during ckpt save
            with PyTorch distributed format. Defaults to None.
        ckpt_assume_constant_structure (bool): Allows caching some computation across checkpoint saves.
            Set to True only if the state dict structure doesn't change within a single job.
        ckpt_parallel_save (bool): If true, each worker will write its own part of the dist checkpoint.
            Defaults to True.
        ckpt_parallel_save_within_dp (bool): If true, save will be parallelized only within a DP group
            (whole world otherwise), which might slightly reduce the save overhead. Defaults to False.
        ckpt_parallel_load (bool): If true, each worker will load part of the dist checkpoint
            and exchange with NCCL. Might use some extra GPU memory. Defaults to True.
        ckpt_parallel_save_optim (bool): Parallel save/load of a DistributedOptimizer. 'True'
            allows performant save and reshardable checkpoints. Set to 'False' only in order to minimize
            the number of checkpoint files.
        ckpt_load_directly_on_device (bool): if True, loads the weights directly on GPU.
            Has effect only for `zarr` based checkpoints (PyT Distributed always loads on device).
            Defaults to True.
        ckpt_load_strictness (StrictHandling, optional): defines loading strictness.
            If not None, overwrites the `strict` flag passed to `load_checkpoint`.
            Defaults to None. For a list of supported values, refer to the Megatron Core documentation:
            https://github.com/NVIDIA/Megatron-LM/blob/d4e72c0d33edc0c53aeb624f617eb77cebce6ae9/megatron/core/dist_checkpointing/validation.py#L46
        setup_optimizers (bool): Whether to call the trainer's setup_optimizers function to perform any
            necessary conversions of optimizer parameters and move optimizer parameters to the correct device.
            Defaults to True.
        init_model_parallel (bool): Whether to initialize the model parallel groups. Defaults to True.
        replace_progress_bar (bool): Whether to replace the TQDM progress bar with a megatron-style logger
            that prints the metrics to stdout. Suitable for non-interactive settings.
        progress_interval (int): How frequently to print progress to stdout. Only used when
            replace_progress_bar is True.
        megatron_log_level (int): Granularity level to measure and report timing.
            0: report only iteration time and make sure timing does not introduce extra overhead.
            1: report timing for operations that are executed very limited times (basically once) during
               each iteration (such as gradient all-reduce)
            2: report timing for operations that migh be executed numerous times during each iteration.
            Note that setting the level to 1 or 2 might cause increase in iteration time.
        use_tp_pp_dp_mapping (bool): Whether to use TP-PP-DP mapping instead of TP-DP-PP mapping.
            Defaults to False.
        num_distributed_optimizer_instances (int): The number of distributed optimizer replicas across
            the data-parallel domain.
        nccl_communicator_config_path (Optional[str]): Path to the yaml file of NCCL communicator configurations.
            `min_ctas`, `max_ctas`, and `cga_cluster_size` can be set for each communicator.
        use_sharp (bool): Whether to use SHARP. Defaults to False.
        use_gloo_process_groups (bool): Whether to use Gloo process groups. Defaults to True.
        pipeline_model_parallel_layout (Optional[Union[str, List[List[str]]]]): The layout of all layers among
            different PP and VP stages.
        **kwargs: Additional keyword arguments.

    Note:
        This strategy is designed to work with NVIDIA's Megatron-LM framework and requires
        specific model implementations that are compatible with Megatron's parallelism techniques.
    trainerrW   NFr   TrA   
torch_distrD   rE   rO   rP   rQ   rF   rG   rH   rI   rJ   rK   rU   rM   rN   rR   rS   data_samplerr?   parallel_devicesfind_unused_parametersckpt_load_optimizerckpt_save_optimizerddpfsdp	lazy_initrL   rT   rZ   save_ckpt_formatckpt_async_saveckpt_torch_dist_multiprocckpt_assume_constant_structureckpt_parallel_saveckpt_parallel_save_within_dpckpt_parallel_loadckpt_parallel_save_optimckpt_load_directly_on_deviceckpt_load_strictnessr&   setup_optimizersr:   replace_progress_barprogress_intervalrestore_configmegatron_log_levelrV   rX   rY   r[   r\   returnc3           4         s  t  jd||||d|3 t | _|| _|| _|| _|| _|| _|| _	|d ur*|n|| _
|| _|
| _|| _|| _|| _|	| _|| _|| _|| _|| _|| _|| _|| _|'| _|| _|| _|| _|(| _|)| _ttt !dd| _"ttt !dd| _#|.| _$|| _%|| _&| | _'|!| _(|"| _)|#| _*|$| _+|%| _,|&| _-|/| _.|*| _/|+| _0|0| _1|1| _2|,| _3t4|-d| _5|2| _6|| _7|dkrt8dd	| _9n#t:|t8r|| _9n|d
kr|d urt;dd | _9d| _<nt;d| d | _=|d u r| j9r| j9j>rt?@d d}|d
krtAd|dkr || _=| j9j>sd| j9_>t?@d t?Bd n|d ur-t;d| d|dkr9t8dd	| _9n't:|t8rC|| _9n|d
krY| j=d urRt;dd | _9d| _<nt;d| t:| j9t8rl| j.| j9_.tCD  d | _Ed S )N)rm   cluster_environmentcheckpoint_iorn   NEMO_LOG_TRAIN_LOSSrW   NEMO_LOG_MEMORY_USAGEr   minmaxrA   T)check_for_nan_in_gradrB   z'Please set ddp to megatron to use FSDP.FzInvalid DDP type: zeFSDP option is not set but ddp_config.use_custom_fsdp is set to true. Setting FSDP option to megatronz5PyTorch FSDP2 is not supported with MegatronParallel.z:Setting ddp_config.use_custom_fsdp to True for MCore FSDP.z>FSDP option is set to MCore. Using MCore's Custom FSDP for DP.z-, please choose from ["megatron", "pytorch"].rg   )Fsuper__init__r2   megatron_callbacksrl   rD   rE   rO   rP   rQ   rG   rH   rJ   rU   rK   rF   rI   rM   rN   rR   rS   rs   ro   rp   r}   rT   rZ   _pipeline_dtype_setup_optimizers_init_model_parallelrc   ra   osgetenvlog_train_losslog_memory_usagerV   rt   
async_savetorch_dist_multiprocassume_constant_structureparallel_saveparallel_save_within_dpparallel_loadparallel_save_optimload_directly_on_devicerX   r   r   rY   r[   r   r%   timersr\   _ddpr'   
ddp_config
isinstance
ValueErrorno_ddp_communication_hook_fsdpuse_custom_fsdpr=   warningNotImplementedErrorinfor0   enable_nvidia_optimizationsstore)4selfrD   rE   rO   rP   rQ   rF   rG   rH   rI   rJ   rK   rU   rM   rN   rR   rS   rl   rm   r   r   rn   ro   rp   rq   rr   rs   rL   rT   rZ   rt   ru   rv   rw   rx   ry   rz   r{   r|   r}   r~   r:   r   r   r   r   rV   rX   rY   r[   r\   kwargs	__class__rg   rh   r      s   6









zMegatronStrategy.__init__c                 C   s.   | j du rt| jdd}|dur|j| _ | j S ) Ndtype_config)r   getattr_precision_pluginrL   )r   r   rg   rg   rh   rL     s
   
zMegatronStrategy.pipeline_dtypec                 C   s
   || _ d S N)r   )r   valuerg   rg   rh   rL     s   
modelc           	         s4  t  | d|jvsJ dt| jdd}| jdu r!|r!|j| _t|| j}|r-|| _	|r<ddl
m} |||j|_t|drF| j|j_t|dd}|r| jrt|jdd}t|trtt|}| jsitd	tt| j}|r|||jj|j_||| j| _|j|jkrtd
 |j|_dS dS dS dS dS )zAttaches a model to strategy.is_hf_modelz7Cannot use HFAutoModelForCausalLM with MegatronParallelr   Nr   )"update_config_with_dtype_overridesconfigoptimz.PyTorch DDP is not enabled for mcore optimizerz<Fixing mis-match between ddp-config & mcore-optimizer config)r   connect__dict__r   r   rL   r0   set_model_parallel_attributesparallelism_mcore_config.nemo.lightning.pytorch.plugins.mixed_precisionr   r   hasattrr   r   r   r   r(   r   r   r   r'   use_distributed_optimizerr=   r   )	r   r   r   _maybe_mcore_configr   	has_optim
opt_configmcore_opt_configr   r   rg   rh   r     s<   






zMegatronStrategy.connectc           	      C   s
  | j dusJ | j | || _z|j| jjj_td|j d W n t	y1   t
d Y nw |jj}|tjkrM| jrM| jdusEJ | j| j| _t| j t|| j| j | | |   t| jddrxtdd |jD sxtd|jd	kr| jd	krd
|_ttt fD ]}t!|j"|_"q|tjkr| #  t$||jj%_&d
dl'm(  m)  m*  m+} t,| j-|j.r| /  n| jdusJ dd }| jD ]}|| q| j0rd}| jjD ]}t,|t1rd} nq|s| jj2t1  | j3r| jj4s| 5  dS dS dS )zSetups the strategyNzCopying Trainer's 'max_steps' (z ) to LR scheduler's 'max_steps'.zCould not copy Trainer's 'max_steps' to LR scheduler's 'max_steps'. If you are not using an LR scheduler, this warning can safely be ignored.model_transformc                 s   s    | ]}t |tV  qd S r   )r   r4   ).0cbrg   rg   rh   	<genexpr>  s    z)MegatronStrategy.setup.<locals>.<genexpr>zYou specified a model_transform function in the model, but noModelTransform callback was found in the trainer. Please initialize the trainer with `trainer = Trainer(..., callbacks=[ModelTransform()])`rW   r   c                 S   sf   ddl m} |  D ]&}t|dd }|r| }n|jddd}tjj|j	tj
|d|d q
dS )z
                Modified from
                https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/distributed/distributed_data_parallel.py#L466-L483
                Syncs parameters across all DP ranks.
                r   parallel_state	allreduceT)with_context_parallelpartial_data_parallel)srcgroupN)megatron.corer   
parametersr   get_expert_data_parallel_groupget_data_parallel_grouprd   distributed	broadcastdataget_global_rank)moduler   paramis_expert_paralleldata_parallel_grouprg   rg   rh   broadcast_params  s   
z0MegatronStrategy.setup.<locals>.broadcast_paramsFT)6acceleratorsetuprj   	max_stepsr   r   lr_schedulerr=   r   AttributeErrorr   statefnr#   FITTING_layer_syncapplyr;   r9   r   r   setup_megatron_parallelsetup_precision_pluginr   lightning_moduleany	callbacksr   num_sanity_val_stepsrE   r   r   r   _data_fetcher_wrapper_select_data_fetcherconfigure_ddp_MegatronAutomaticOptimization
epoch_loopautomatic_optimization>torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hookr   
algorithmsddp_comm_hookspost_localSGD_hookr   _ddp_comm_statePostLocalSGDState_enable_model_averagingr   r>   appendr   	ckpt_pathselective_restore)	r   rj   
trainer_fnlooppost_localSGDr   model_modulehave_async_callbackcallbackrg   rg   rh   r     sf   






zMegatronStrategy.setupc              	   C   sV  t |  t  |   |  | _| jdusJ tj s t	dtj
 r,td dS | j }| j }| jjtjd< t| jjtjd< td }rTtt|d}td| d	|d
  d|  tjj| j||| j|d | jdkr{tt td d| j d| dd d t| j | j rt!| jt"sJ d| j #| j  dS dS )zSetups dist envNzOtorch.distributed is not available. Cannot initialize distributed process groupz7torch.distributed is already initialized. Exiting earlyMASTER_ADDRMASTER_PORT TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC)secondsz'Initializing distributed: GLOBAL_RANK: z
, MEMBER: rW   /)rank
world_sizer   timeoutncclzd----------------------------------------------------------------------------------------------------z
distributed_backend=z5
All distributed processes registered. Starting with z processes

z#Cluster environment not initialized)$r<   r   set_world_ranks_get_process_group_backend_process_group_backendr   rd   r   is_availableRuntimeErroris_initialized_loggerdebugglobal_rankr  main_addressr   environrf   	main_portr   r   ra   r   init_process_groupr   atexitregisterr6   r   r:   r   rl   r   r   r   )r   r  r  r	  rg   rg   rh   setup_distributed4  sH   





 


z"MegatronStrategy.setup_distributed
dataloaderc                 C   s   | j r	| j |S |S )zSetups dataloader)rl   transform_dataloader)r   r  rg   rg   rh   process_dataloader`  s   z#MegatronStrategy.process_dataloaderc              	   C   s  | j dus	J dd}t| jdr| jj}t| j | j| jt|jt| j	| j
|d| _|| j_| jr5|   t| j j}d|jv rMtj| j j| jd| j _| jrU| | | j| _ t|dd}|rh| j jj|  | jrs| j j| j t|dd}|r| j j| dS dS )	zConfigures megatron parallelNzModel is not setconvert_module)precision_pluginvp_sizecpur   rr   convert_module_fnmegatron_parallel)r$  r   
datamodule)r   r   r   r  r3   rF   r   r   r   r   r   r$  rj   r   r:   inspect	signatureconfigure_optimizersr   	functoolspartialr   r~   r   r   addrl   )r   rj   r#  sigtrainer_callbacksr%  rg   rg   rh   r   h  sB   




z(MegatronStrategy.setup_megatron_parallelc                 C   s   | j   dS )zInitializes megatron parallelN)r$  r:   r   rg   rg   rh   r:     s   z$MegatronStrategy.init_model_parallelc                 C   s<   t | jj d | | j| _| jdu r|   dS dS )zConfigures ddpz: configuring MegatronParallelN)r=   r  r   r]   _setup_modelr   r   _register_ddp_hooksr.  rg   rg   rh   r     s
   
zMegatronStrategy.configure_ddpc                    sh   ddl m} ddlm} | }|jdur| | jd< | js2t 	|}| j
r0d|_|dt |}|S )z=Only called when we need to wrap the model for pytorch's ddp.r   r   AppStateNprocess_groupF)r   r   
nemo.utilsr2  model_parallel_sizer   _ddp_kwargsr   r   r/  r   require_backward_grad_syncregister_comm_hookr*   )r   r   r   r2  	app_statedist_data_parallelr   rg   rh   r/    s   
zMegatronStrategy._setup_model
pl.Trainerc                    sP   t  | t| jdrg | j}| j| jd |d< || _t| j| j dS )zSetups optimizersconvert_optimizerr   N)r   r~   r   r   
optimizersr<  r   root_device)r   rj   _optimizersr   rg   rh   r~     s   
z!MegatronStrategy.setup_optimizersargsr   c           
      O   sv  | j dusJ t| jtsJ | j  | jD ]}|  q| jD ]}|  q"| jj	|g|R i |}t
|r>|}nt|tsLtdt| d|vrYtd|  |d }| j jd| jjddd | j d	| jj | jrt
j }t
j }	| j jd
|ddd | j jd|	ddd | jrtj|dd | j jd|dddd |W  d   S 1 sw   Y  dS )zRuns one training stepNz4Expected dict or tensor for reduced_train_loss, got lossz$Expected 'loss' in output dict, got global_stepTrW   )prog_bar
batch_sizestepmax_memory_reservedmax_memory_allocatedF)r   reduced_train_loss)rC  rD  	sync_dist)r   r   r   r3   r   train_step_contextzero_grad_bufferr=  	zero_gradtraining_steprd   	is_tensordictr   typekeyslogrj   rB  r   cudarF  rG  r   r0   _sync_from_last_pipeline_stage)
r   dataloader_iterr@  r   model_chunkoptoutrH  rF  rG  rg   rg   rh   rM    s^   









$zMegatronStrategy.training_step	optimizerclosurezpl.LightningModulec                    sd   t  j|||fi |}t|tr0|\}}}|dur#| jjd|dd |dur0| jjd|dd |S )zRuns one optimizer stepN	grad_normrW   )rD  num_zeros_in_grad)r   optimizer_stepr   r/   r   rR  )r   rY  rZ  r   r   optimizer_outputr[  r\  r   rg   rh   r]    s   	

zMegatronStrategy.optimizer_stepc              	   O   s   | j dusJ t| jtsJ | j @ | jj|g|R i |}ddlm} |	 }|dkrA| j j
d|| dd| dd n
| j j
d|ddd |W  d   S 1 sWw   Y  dS )	zRuns one validation stepNr   r   rW   val_lossT)rC  rI  sync_dist_groupon_epoch)rC  ra  )r   r   r   r3   r   val_step_contextvalidation_stepr   r   &get_pipeline_model_parallel_world_sizerR  !get_pipeline_model_parallel_group)r   rU  r@  r   rX  r   pp_sizerg   rg   rh   rc  #  s$   	$z MegatronStrategy.validation_stepc                 O   f   | j dusJ t| jtsJ | j  | jj|g|R i |W  d   S 1 s,w   Y  dS )zRuns one test stepN)r   r   r   r3   r   test_step_context	test_stepr   rU  r@  r   rg   rg   rh   ri  ?  
   $zMegatronStrategy.test_stepc                 O   rg  )zRuns one prediction stepN)r   r   r   r3   r   predict_step_contextpredict_steprj  rg   rg   rh   rm  H  rk  zMegatronStrategy.predict_stepc                    s"   t | dr
| j  t   dS )zTearsdown the strategyr$  N)r   r$  teardown_ddpr   teardownr.  r   rg   rh   ro  Q  s   

zMegatronStrategy.teardownc                    s6   | j rt| drt }|t| j |S t  S )zModel sharded contextr   )	rs   r   r   enter_contextr0   megatron_lazy_init_contextr   r   model_sharded_context)r   stackr   rg   rh   rr  X  s
   
z&MegatronStrategy.model_sharded_context	step_namec                 C   sF   d|vr|  ||d< d|vr| ||d< d|vr!| ||d< |S )N	data_stepforward_steploss_reduction)_get_data_step_get_forward_step_get_loss_reduction)r   rU  r   rt  rg   rg   rh   _update_step_kwargsb  s   z$MegatronStrategy._update_step_kwargsc                 C   s0   | j jdd}| jrdnd}tj| j|||dS )ar  
        Sharded state dictionary for an MainParamsOptimizerWrapper.
        Used to save and load the optimizer state when training with distributed_checkpoint.

        Returns
        -------
            dict: The sharded state dictionary for the optimizer
        Raises:
            ValueError: If a parameter ID does not match any model sharded parameter.
        Fuse_pl_optimizerfully_sharded_model_spacedp_zero_gather_scatter)
is_loadingsharding_type)r   r=  r   r0   optimizer_sharded_state_dictr$  )r   r  rY  r  rg   rg   rh   r  l  s
   
z-MegatronStrategy.optimizer_sharded_state_dict
checkpointfilepathstorage_optionsc                 C   s   ddl m} t| jtr| jjr| jjr| j  t	g |d< d|vr*| j
 |d< d|v rD| jjjtjkrDi |d< | jrD|  g|d< | jj|||d || j|| j dS )	zSaves checkpointr   )save_modelopt_state
state_dictsharded_state_dictoptimizer_statesrY  )r  N))nemo.collections.llm.modelopt.model_utilsr  r   r   r'   r   overlap_param_gatherr$  force_param_syncr   r  rj   r   r   r#   r   rp   r  r   save_checkpoint)r   r  r  r  r  rg   rg   rh   r    s"   

z MegatronStrategy.save_checkpointr   c                 C   s   |r| j r	| j jS dS | jS )z5Determines whether to restore optimizer states or notF)r   load_optim_statero   )r   r   rg   rg   rh   should_restore_optimizer_states  s   z0MegatronStrategy.should_restore_optimizer_statescheckpoint_pathc              
   C   sB  ddl m} tj  | j|d}|| j|}|du s%|s%| jjj	t
jkr't}i }|  | j |d< W d   n1 s>w   Y  |r]| jjj	t
jkr]| jjddr]| jdd	g|d
< | jdu rf| jjn| j}z| jj|||d}W n ty }	 z|	 dt }
t|
d}	~	ww |ri }| D ]}|| ||< q|S |S )a}  PTL method which we override to integrate distributed checkpoints for model parallel models.
        In order to load distributed checkpoints we need to provide the sharded_state_dict to
        the distributed load function. We get the sharded_state_dict from self.lightning_module
        which makes it convenient to have the loading logic happen at the strategy level.
        r   )restore_modelopt_stater   Nr  Fr|  T)r  rY  )r  strictr  )r  r  rd   rS  empty_cacher  r$  rj   r   r   r#   r   r   r  r   r=  r  r}   strict_loadingr   load_checkpointr+   
LOAD_ERRORr  rQ  )r   r  r   r  restore_optimizerssharded_state_contextr  r  r  eerror_messagefinal_checkpointkeyrg   rg   rh   r    s<   

z MegatronStrategy.load_checkpointc                 C   s   | j sdS td| j   | j| j jdd}| j jr5td| j   | jdu r+dn| j}| j||d | j jrItd| j   | j	|dd td	| j  d
 t
j  | jjd dS )z.Implements selective restoration of checkpointNzDoing selective restore from T)r  r   zRestoring model weights from )r  r  z Restoring optimizer states from )r  r   zFinished restoring from z, cleaning up.zMegatronStrategy.restore_end)r   r=   r   r  pathload_model_stater}   load_model_state_dictr  load_optimizer_state_dictrd   rS  r  rj   strategybarrier)r   r  r  rg   rg   rh   r     s   
z"MegatronStrategy.selective_restorec              
   C   s   | j |dsdS ddlm} ddlm} ddlm}m} ||	 d}|d }t
| j|D ]?\}	}
| jdurbt |
d	< |
d d
  D ]}t|tra| D ]\}}||||ddf||< qOqD|	|
 t|	| j q.dS )zLoads optimizer state-dictr  Nr   r   )
DeviceMesh)DTensorShardrS  rY  fp32_from_fp16_paramsr   )dim)r  r   r   torch.distributedr  torch.distributed._tensorr  r  
from_groupr   zipr=  r   r   valuesr   r   items
from_localload_state_dictr   r>  )r   r  r   r   r  r  r  meshr  rY  	opt_state	opt_paramopt_param_state_keyopt_param_staterg   rg   rh   r    s,   





z*MegatronStrategy.load_optimizer_state_dictc                 C   s:   t |}| jrtj|rt| dS t| dS dS )zDeletes checkpointN)r7   is_global_zeror   r  islinkunlinkshutilrmtree)r   r  ckptrg   rg   rh   remove_checkpoint  s   z"MegatronStrategy.remove_checkpointr  c                 C   sX   | j dusJ | jdu r|n| j}tj| j ||d d|vr(| jD ]}|  q!dS dS )zloads model state dictN)r  rY  )r$  r}   r0   r  r=  reload_model_params)r   r  r  rW  rg   rg   rh   r    s   

z&MegatronStrategy.load_model_state_dictc              
   C   s6   | j st| j| j| j| j| j| j| j| j	d| _ | j S )zCreates & returns checkpoint io)rt   r   r   r   r   r   r   r   )
_checkpoint_ior8   rt   r   r   r   r   r   r   r   r.  rg   rg   rh   r   !  s   zMegatronStrategy.checkpoint_ior1   c                 C   s
   || _ dS )zCheckpointIO setterN)r  )r   r1   rg   rg   rh   r   3  s   
c                 C   s*   t | jjjjjjjjj	| jjjj
jjj	S )z8
        Get the value of step within an epoch.
        )maxrj   r   r   r   optim_progressrY  rE  current	completedmanual_optimizationoptim_step_progressr.  rg   rg   rh   current_epoch_step8  s   z#MegatronStrategy.current_epoch_stepc                    s8   ddl m} | }|jdurt|j|jd}|S t jS )zReturns dist-sampler's kwargsr   r1  N)num_replicasr  )r4  r2  r5  rO  data_parallel_sizedata_parallel_rankr   distributed_sampler_kwargs)r   r2  r9  r  r   rg   rh   r  B  s   
z+MegatronStrategy.distributed_sampler_kwargsc                 C   s   dS )zNeeds to be True for distributed checkpointing because
        we require the model to have configured the optimizer before
        deserializing the checkpoint.
        Trg   r.  rg   rg   rh   restore_checkpoint_after_setupT  s   z/MegatronStrategy.restore_checkpoint_after_setupc                 C   s   t di d| jd| jd| jd| jd| jd| jd| jd| jd	| j	d
| j
d| jd| jd| jd| jd| jd| jd| jd| jd| jd| jd| jd| jd| jd| jS )z4Returns parallelism config from class attrs as a PODrD   rE   rO   rP   rQ   rF   rG   rH   rI   rJ   rU   rK   rM   rN   rR   rS   rL   rT   rV   rX   rY   rZ   r[   r\   Nrg   )rC   rD   rE   rO   rP   rQ   rF   rG   rH   rI   rJ   rU   rK   rM   rN   rR   rS   rL   rT   rV   rX   rY   rZ   r[   r\   r.  rg   rg   rh   r   \  sb   	
zMegatronStrategy.parallelism
empty_initc                 c   s    dV  dS )z'Context manager used for initializationNrg   )r   r  rg   rg   rh   tensor_init_contextz  s   
z$MegatronStrategy.tensor_init_context)2rW   rW   NNNNNrW   FrW   FNr   r   FFNNNNFTTrA   NFNFFrk   TNFTFTTTNTTTrW   Nr   FrW   NTN)r   Nrj   r;  r   Nr   )F)T)Ir]   r^   r_   r`   plTrainerrb   ra   rf   r   rc   r   rd   devicer   
DDPLiteralr'   FSDPLiteralre   r5   r   propertyrL   setterr.   LightningModuler   r   r  r-   r  r   r:   r   r)   Moduler/  r~   r
   r$   rM  r   	Optimizerr   r]  rc  ri  rm  ro  r   rr  r{  r  r   r   r  r  r  r   r   r  r  r  r   r   r  r  r  rC   r   r   r  __classcell__rg   rg   r   rh   ri      s  
 
`	

 !"#$%&'()*+,-./01235 (

(k+/
G
	



"*
/" 		 ri   c                 C   s"   t | dtjdtfdd}|S )Nrj   stagec                 S   s   t | jtr	t S d S r   )r   r  ri   r    )rj   r  rg   rg   rh   wrapped  s   z&_data_fetcher_wrapper.<locals>.wrapped)r)  wrapsr  r  r"   )r   r  rg   rg   rh   r     s   r   c                       s"   e Zd ZdZd fddZ  ZS )	r   z
    Custom loop for automatic optimization, tailored to work with a specific training_step
    implementation that involves custom data preparation, forward pass, and loss reduction steps.
    rj   r;  r   Nc                    s   t  | d| _d S )NT)r   r   _skip_backward)r   rj   r   rg   rh   r     s   
z'_MegatronAutomaticOptimization.__init__r  )r]   r^   r_   r`   r   r  rg   rg   r   rh   r     s    r   )wr  r)  r&  r=   _loggingr   r  collectionsr   
contextlibr   r   r   dataclassesr   datetimer   pathlibr   typingr	   r
   r   r   r   r   r   r   r   r   r   r   lightning.pytorchrB   r  rd   r  lightning.fabric.pluginsr   r   $lightning.fabric.utilities.optimizerr   r   $lightning.fabric.utilities.rank_zeror   lightning.fabric.utilities.seedr   lightning.pytorch.acceleratorsr   lightning.pytorch.loopsr   r   r   r    lightning.pytorch.loops.fetchersr     lightning.pytorch.strategies.ddpr!    lightning.pytorch.trainer.statesr"   r#   !lightning.pytorch.utilities.typesr$   r   r%   +megatron.core.dist_checkpointing.validationr&   megatron.core.distributedr'   megatron.core.optimizerr(   r)   ;torch.distributed.algorithms.ddp_comm_hooks.debugging_hooksr*   "torch.distributed.checkpoint.utilsr+   torch.nn.parallelr,   torch.utils.datar-   typing_extensionsr.   nemo.core.optim.mcore_optimr/   nemo.lightningr0   r1    nemo.lightning.megatron_parallelr2   r3    nemo.lightning.pytorch.callbacksr4   'nemo.lightning.pytorch.strategies.utilsr5   r6   r7   r8   r9   r:   r;   r<   r4  !nemo.utils.callbacks.dist_ckpt_ior>   +nemo.lightning.pytorch.plugins.data_samplerr?   	getLoggerr]   r  r@   r  r  URLr  rC   IOMixinri   r   r   rg   rg   rg   rh   <module>   s   8(

!        	