o
    }oiy                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ dZerTd d	lmZ d d
lmZ G dd deZ		d;dedededdddf
ddZd<deej ddfddZ dd Z!eded fddZ"eded fdd Z#ed!ejd"Z$G d#d$ d$ej%j&j'Z'd=d%d&Z(		'd>ded(d)dee)ej*f fd*d+Z+d?d-ee)e	f d.e,ddfd/d0Z-d@d1ej*d2e,fd3d4Z.			5dAd6ee
 d7ee
 d8e/fd9d:Z0dS )B    N)defaultdict)contextmanager)	TYPE_CHECKINGAnyCallableDict	GeneratorMappingOptionalProtocolTypeVar)nn)"initialize_model_parallel_for_nemo)logging.NEMO_MEGATRON_MODEL_PARALLEL_APPSTATE_OVERRIDE)Optimizable)ModelParallelConfigc                   @   s   e Zd ZdZdddZdS )SharedStateDictProtocol  c                 C   s   dS )r   N )selfprefixr   r   P/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/_strategy_lib.pysharded_state_dict'   s   z*SharedStateDictProtocol.sharded_state_dictN)r   __name__
__module____qualname____doc__r   r   r   r   r   r   $   s    r     F
world_sizeglobal_rank
local_rankparallel_configr   returnc                 C   s  ddl m} | }tjtd dkr!|j|j }|j	}	|j
}
n| }|jp'd}| |k r7td|  d| d|}	|}
td%i d	|d
|	d|
d|jd|jd|jd|jd|jd|jd|jdt|dddt|ddd|dt|ddd|dt|ddot|dddkdt|dddt|ddd t|d dd!t|d!dd"t|d"dd#t|d#d$ dS dt|dddt|ddd t|d dd!t|d!dd"t|d"dd#t|d#d$ dS )&aw  
    Initializes the parallel ranks for distributed training.

    This function sets up the parallel ranks based on the provided world size, global rank, local rank,
    and parallel configuration. It also sets the seed for random number generation and determines whether
    to use fp8 precision.

    Args:
        world_size (int): The total number of processes participating in the distributed training.
        global_rank (int): The rank of the current process in the distributed training setup.
        local_rank (int): The rank of the current process within its machine.
        parallel_config (ModelParallelConfig): The configuration object containing settings for model parallelism.
        seed (int, optional): The seed for random number generation. Defaults to 1234.
        fp8 (bool, optional): Whether to use fp8 precision for model parameters. Defaults to False.
    r   AppStatefalsetrue   zExpected world_size (z-) to be greater than/equal to pipeline size ()r!   r"   r#   tensor_model_parallel_sizeexpert_model_parallel_sizeexpert_tensor_parallel_sizepipeline_model_parallel_size$pipeline_model_parallel_comm_backend$virtual_pipeline_model_parallel_sizecontext_parallel_size"encoder_tensor_model_parallel_size$encoder_pipeline_model_parallel_sizeseed"pipeline_model_parallel_split_rankNuse_fp8init_mpi_proc_grouptp_comm_overlapFtp_comm_bootstrap_backendmpiuse_te_rng_tracker	use_sharpuse_tp_pp_dp_mapping#num_distributed_optimizer_instancesnccl_communicator_config_pathuse_gloo_process_groupsTr   )
nemo.utilsr'   osenvirongetr   lowerr,   r/   r"   r#   
ValueErrorr   r-   r.   r0   r1   r2   getattr)r!   r"   r#   r$   r5   fp8r'   	app_stateinit_world_sizeinit_global_rankinit_local_rankppr   r   r   init_parallel_ranks,   s   
	
rO   modelc                 C   s  ddl }ddlm} ddlm} | }|jdur|  |j r|j	|j
|j|j|j|j|j|j|j|j|j|j|jr>dnd|j|j|jd |j| ksQJ |j| ksZJ |j| kscJ | |_ |! |_"|# |_$|% |_&|' |_(|j)r|jj*dd	 dS dS dS dS )
zBInitializes Megatron-LM model parallel if using model parallelism.r   Nparallel_stater&   ztp-cp-ep-pp-dpztp-cp-ep-dp-pp)r,   r/   r1   r6   r0   r4   r3   r2   r-   r.   r=   orderr?   r@   create_gloo_process_groupsr;   )backend)+torch.distributedmegatron.corerR   rB   r'   model_parallel_sizedestroy_model_paralleldistributedis_initializedinitialize_model_parallelr,   r/   r1   r6   r0   r4   r3   r2   r-   r.   r=   r>   r?   r@   rA   tensor_model_parallel_rankget_tensor_model_parallel_rankpipeline_model_parallel_rank get_pipeline_model_parallel_rankexpert_tensor_parallel_rankget_expert_tensor_parallel_rankget_tensor_model_parallel_grouptensor_model_parallel_groupget_data_parallel_groupdata_parallel_groupget_data_parallel_rankdata_parallel_rankget_data_parallel_world_sizedata_parallel_size!get_pipeline_model_parallel_grouppipeline_model_parallel_groupr8   	new_group)rP   torchrR   r'   rJ   r   r   r   init_model_parallelo   sJ   






ro   c                 C   s   ddl m} tt| dd|}|rNt| drN| j}tdd t|D ] }t||s*q"t||t|| t|drBt|j	|t|| q"t|d	rL|
  |S dS )
r   r   )TransformerConfigconfigNconfigure_modelc                 S   s   |  d S )N__)
startswith)xr   r   r   <lambda>   s    z/set_model_parallel_attributes.<locals>.<lambda>__io____post_init__),megatron.core.transformer.transformer_configrp   
isinstancerH   hasattrrq   filterdirsetattrrw   rx   )rP   parallelismrp   has_mcore_configrq   	attr_namer   r   r   set_model_parallel_attributes   s   


r   )NNNc                 #   s    zddl m} |j  fdd}||_W n	 ty   Y nw | j}| j}d| _d| _dV  zddl m}  |_W n	 tyB   Y nw || _|| _dS )r   r   )transformer_enginec                    s    | }d|d< |S )zForces device to metametadevicer   )ckwargsoriginalr   r   _get_extra_te_kwargs_meta   s   z=megatron_lazy_init_context.<locals>._get_extra_te_kwargs_metaFTN)megatron.core.extensionsr   _get_extra_te_kwargsImportErrorperform_initializationuse_cpu_initialization)rq   _ter   _orig_perform_initialization_orig_use_cpu_initializationr   r   r   megatron_lazy_init_context   s,   


r   c                 c   s    | j }d| _ dV  || _ dS )r   TN)r   )rq   r   r   r   r   megatron_cpu_init_context   s
   
r   ModelT)boundc                       s\   e Zd ZdZ						d fdd		Z fd
dZdd ZdddZdd Zdd Z	  Z
S )
GradScalerz
    Gradient sclaer for model-parallel inf check. The inf in gradients are checked across tensor-parallel
    ranks in (1) executing optimizer step and (2) gradient scaler update.

          @       @      ?  Tr*   c                    s.   t  j|||||d d | _|| _| j| _d S )N)
init_scalegrowth_factorbackoff_factorgrowth_intervalenabled)super__init__optimizer_update_skipped
hysteresis_hysteresis_tracker)r   r   r   r   r   r   r   	__class__r   r   r      s   	zGradScaler.__init__c                    s*   t |ddr|j| S t j|g|R  S )N_custom_amp_unscale_gradsF)rH   unscale_gradsr   _unscale_grads_)r   	optimizerargsr   r   r   r     s   
zGradScaler._unscale_grads_c                 O   s   ddl m} d }tjtdd |d  D g}tjj|tjj	j
| d | dkr;|j|i |}d| _|S d| _|S )	Nr   rQ   c                 s   s    | ]}|  V  qd S N)item).0vr   r   r   	<genexpr>      z-GradScaler._maybe_opt_step.<locals>.<genexpr>found_inf_per_deviceopgroupFT)rW   rR   rn   cudaFloatTensorsumvaluesrZ   
all_reduceReduceOpMAXget_model_parallel_groupr   stepr   )r   r   optimizer_stater   r   rR   retval	found_infr   r   r   _maybe_opt_step  s   $zGradScaler._maybe_opt_stepNc           	         s  ddl m} | jsdS | d\ }|durIt|tr"| j| nd}t|tj	j
s/J || dks9J ||jdu sBJ || j| n fdd	| j D }t|dks_J d
|d }tjj|tjjj| d t|dkrtdt|D ]}|| }tjj|tjjj| d ||7 }q~|dkr|  jd8  _| jdkrt ||| j| j| j n|d nt ||| j| j| j | j| _ttj	jjj | _dS )z
        Updates to native grad scaler update function.
        1. Check inf across model-parallel ranks.
        2. Update hysteresis tracker.
        3. Apply hysteresis to grad scale update.
        r   rQ   Nupdatez[new_scale should be a float or a 1-element torch.cuda.FloatTensor with requires_grad=False.r*   Fc                    s.   g | ]}|d    D ]
}|j jddq
qS )r   T)r   non_blocking)r   tor   )r   stater   _scaler   r   
<listcomp>B  s    
z%GradScaler.update.<locals>.<listcomp>z,No inf checks were recorded prior to update.r   g        )!rW   rR   _enabled_check_scale_growth_trackerrz   floatr   fill_rn   r   r   numelrequires_gradcopy__per_optimizer_statesr   lenrZ   r   r   r   r   ranger   _amp_update_scale__growth_factor_backoff_factor_growth_intervalr   r   ampgrad_scaler_refresh_per_optimizer_state)	r   	new_scalerR   _growth_trackerreason
found_infsfound_inf_combinedir   r   r   r   r   %  sp   






zGradScaler.updatec                 C   s,   | j r|  | j| j| j|  | jdS i S )zM
        Add hysteresis_tracker to the native functions' state_dict.
        )scaler   r   r   r   r   )r   	get_scaler   r   r   _get_growth_trackerr   )r   r   r   r   
state_dict  s   
zGradScaler.state_dictc                 C   s   | j sdS t|dkrtd|d | _| jdur!| j|d  |d | _|d | _|d | _|d | _	| j
durB| j
|d  d	|v rM|d	 | _dS d
| _dS )z_
        Load hysteresis_tracker in addition to the state dict of the native function.
        Nr   zeThe source state dict is empty, possibly because it was saved from a disabled instance of GradScaler.r   r   r   r   r   _hysterisis_trackerr*   )r   r   RuntimeError_init_scaler   r   r   r   r   _init_growth_trackerr   r   )r   r   r   r   r   load_state_dict  s$   







zGradScaler.load_state_dict)r   r   r   r   Tr*   r   )r   r   r   r   r   r   r   r   r   r   __classcell__r   r   r   r   r      s    
\r   c                  C   s   t dd} | durnzt| dd }W n ty    d}Y nw zt| dd }W n ty7   d}Y nw |dksD|dkrj|dkrltjd tjd tj	d	 tj
d	 tjd	 tjd	 dS dS dS 	 dS )
zAThese optimizations are present in NVIDIA NGC PyTorch Containers.NVIDIA_PYTORCH_VERSIONN.r   r*         TF)rC   getenvintsplit	Exceptionrn   _C_jit_set_profiling_executor_jit_set_profiling_mode_jit_override_can_fuse_on_cpu_jit_override_can_fuse_on_gpu_jit_set_texpr_fuser_enabled%_debug_set_autodiff_subgraph_inlining)nvidia_torch_versionNVIDIA_TORCH_MAJORNVIDIA_TORCH_MINORr   r   r   enable_nvidia_optimizations  s*   r   fully_sharded_model_spacer   r   c           
         s8  ddl m}mm} ddlm} ddlm} |  }dd |	 D }t
|dr0|j|||dS t||sT|| | }	||tjd	d
 |jD d||	 |	S | }	||tjdd
 |jD dt|	d t|	d d ksxJ fdd  fddt|	d |	d d D |	d< ||	d  |	S )aR  
    Sharded state dictionary for an MainParamsOptimizerWrapper.
    Used to save and load the optimizer state when training with distributed_checkpoint.

    Returns
    -------
        dict: The sharded state dictionary for the optimizer
    Raises:
        ValueError: If a parameter ID does not match any model sharded parameter.
    r   )!get_param_id_to_sharded_param_mapmake_sharded_optimizer_tensoroptim_state_to_sharding_state)MainParamsOptimizerWrapper)init_optimizer_statesc                 S   s    i | ]\}}| d s||qS )_extra_stateendswith)r   keyvaluer   r   r   
<dictcomp>  s
    z0optimizer_sharded_state_dict.<locals>.<dictcomp>r   )
is_loadingsharding_typec                 s   s    | ]}|d  V  qdS )paramsNr   r   gr   r   r   r     r   z/optimizer_sharded_state_dict.<locals>.<genexpr>)model_sharded_state_dictoptim_params_iterc                 s   s    | ]}|V  qd S r   r   r  r   r   r   r     s    fp32_from_fp16_paramsr   param_groupsc              
      s6   z |  W S  t y } z	td|  d|d }~ww )Nz	Param id z' does not match any model sharded param)KeyErrorrG   )param_ide)id_to_sharded_param_mapr   r   get_safe  s   
z.optimizer_sharded_state_dict.<locals>.get_safec                    s.   g | ]\}} fd dt |d |D qS )c                    s"   g | ]\}} ||d dqS )zoptimizer.state.fp32_param)r   r   )r   r  
fp32_paramr  r  r   r   r     s    z;optimizer_sharded_state_dict.<locals>.<listcomp>.<listcomp>r  )zip)r   
fp32_groupstate_groupr  r   r   r     s    z0optimizer_sharded_state_dict.<locals>.<listcomp>)*megatron.core.dist_checkpointing.optimizerr   r  r  nemo.core.optimr  nemo.core.optim.optimizersr  r   itemsr{   rz   r   	itertoolschainfrom_iterabler  float16_groupsr   r  )
rP   r   r  r  r   r  r  r  r  optimizer_state_dictr   )r  r  r  r   optimizer_sharded_state_dict  sF   


 

r'  T
checkpointstrictc                 C   sT  ddl m} ddlm}m} |dur't|ts'||}|j|j|j	g}||v }z
ddl
m} d}W n tp6ty>   d}Y nw t| D ]\}	}
| durcd|v r[|d d	|	  }n|d	|	  }nd|v rl|d }n|}d}| j}t|d
r|j}|d7 }t|d
sxi }| D ]K\}}d|}}|dr|tdd }|d7 }|ds||k rd||  }||| | < q||krd||  }|||t|d < q|||< q|rt|
d
rt|
j|r|
jj||d qCz	|
j||d W qC ty' } z%|
j|dd\}}tdd |D rtd| d n|W Y d}~qCd}~ww dS )r   r   rQ   )StrictHandlingparse_strict_flagN)FullyShardedDataParallelTFr   model_moduler*   zmodule.)r)  c                 s   s    | ]}| d V  qdS )r  Nr  )r   sr   r   r   r   ^  s    z(load_model_state_dict.<locals>.<genexpr>zZLoding checkpoint created with Transformer Engine version lower than 1.13. Missing layers z will be ignored.)rW   rR   +megatron.core.dist_checkpointing.validationr*  r+  rz   boolASSUME_OK_UNEXPECTEDRAISE_UNEXPECTED	RAISE_ALL%megatron.core.distributed.custom_fsdpr,  r   ModuleNotFoundError	enumerate.get_virtual_pipeline_model_parallel_world_sizer.  r{   r!  rt   r   r   r   allr   warning)megatron_parallelr(  r)  rR   r*  r+  strict_optionsr,  have_custom_fsdpindexr.  checkpoint_state_dict	n_nestingmcore_model_state_dictr  r	  count_keyto_add	to_remover  missing_keysexpected_keysr   r   r   load_model_state_dict  sz   






rI  r	  	broadcastc                 C   s   ddl m} | dkrM| }|s@tj| }tj |kr.d|v r.tj	| d dS tj dkr>tj
| | dS dS tjj| || d dS dS )a  
    When pipeline parallelism is enabled,
    casts a tensor defined on the last pipeline stage to other ranks.

        Args:
            value (torch.Tensor): A tensor to be casted from the final pipeline stage of
                a pipeline parallelism group (e.g. loss).
                Note that this tensor should already be defined on the target rank(s) to fill with received data.
            broadcast (bool): When True, broadcasts value from the final pipeline stage rank to all ranks in its group.
                When False, only rank zero receives value from the final pipeline stage rank in its group.
                This mode exists to avoid slow one-to-many communication when not necessary. Defaults to False.
    r   rQ   r*   )r   N)rW   rR   &get_pipeline_model_parallel_world_size%get_pipeline_model_parallel_last_rankrn   rZ   get_process_group_ranksrk   get_ranksendrecvrJ  )r	  rJ  rR   src_rankpp_ranksr   r   r   _sync_from_last_pipeline_stageg  s    
rS        ?no_weight_decay_condscale_lr_condlr_multc                 C   s|  ddl m}m} ddlm} ddlm} | }	t||s%J dt| G dd d|}
dd	 | D }|||||||	j	d
}|j
D ]M}d|v sLd|v rMqB|d |d< |d |d< |d |dd }|d |dd }||d< ||d< d|v r|d |d< |d= d|v r|d |d< |d= qBt| jddrt| jddrdd	 | D }t| dkr|d n|}| D ]}||j_q|
|S )r   r   )OptimizerConfigget_megatron_optimizer)McoreDistributedOptimizerr&   zExpected OptimizerConfig, got c                   @   s    e Zd ZdZ			dddZdS )z*setup_megatron_optimizer.<locals>.McoreOptr   NFr   c                 S   s@   t | jjj}i }d|v r||d< | jj|fd|i|}|S )Nr  r  )inspect	signaturemcore_optimizerr   
parameters)r   r  r&  r  r  mcore_optimizer_sigdistrib_optim_kwargsr   r   r   r   r     s   z=setup_megatron_optimizer.<locals>.McoreOpt.sharded_state_dict)NFr   r   r   r   r   r   McoreOpt  s    ra  c                 S      g | ]}|j qS r   )r.  )r   mr   r   r   r         z,setup_megatron_optimizer.<locals>.<listcomp>)rU  rV  rW  rA   pre_lr_multpre_mult_wdlrpre_mult_lrweight_decayrW  rT  wd_multpre_wd_multoverlap_param_gatherFalign_param_gatherc                 S   rb  r   )start_param_sync)r   model_chunkr   r   r   r     rd  r*   )megatron.core.optimizerrX  rY  r  rZ  rB   r'   rz   typerA   r  rE   rH   
ddp_configr   rq   param_sync_func)rP   rq   rU  rV  rW  rX  rY  rZ  r'   rJ   ra  ddp_modules	mcore_optpgnew_lrnew_wdrs  r.  r   r   r   setup_megatron_optimizer  sN   

ry  )r    Fr   )r%   N)Fr   )T)F)NNrT  )1r[  r"  rC   collectionsr   
contextlibr   typingr   r   r   r   r   r	   r
   r   r   rn   r   nemo.lightning.megatron_initr   rB   r   r    lightning.fabric.utilities.typesr   #megatron.core.model_parallel_configr   r   r   rO   Modulero   r   r   r   r   r   r   r   r   strTensorr'  r1  rI  rS  r   ry  r   r   r   r   <module>   sv   ,
C2$ 
? 
 QK#