o
    }oiJ                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZ ddlmZ ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlm Z  ddl!m"Z"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- e.e/Z0e-d\Z1Z2e	G dd de"Z3G dd de4Z5G dd deZ6G dd deZ7dej8fddZ9d5de:dej;fd d!Z<d5d"e:de+fd#d$Z=d%e>d&e?defd'd(Z@d)ed*e?dee? fd+d,ZAdejBfd-d.ZCd6d/d0ZDd1eEddfd2d3ZFe/d4kr
eD  dS dS )7a  
Test NeMo 2.0 local checkpointing functionality with NVRx resiliency extensions.

This test verifies the ability to save local checkpoints during training, simulate
a crash, and then successfully resume training from the latest available local
checkpoint, ensuring the trainer state (specifically global_step) is correctly restored.

The test proceeds in two main runs:

Run 1:
  - Initializes a LlamaModel and trains it using a MockDataModule.
  - Uses `LocalCheckpointCallback` from NVRx to save checkpoints locally at a
    specified interval (`--local-checkpoint-interval`).
  - Uses `CrashCallback` to simulate a crash by raising an exception at a
    predefined step (`--crash-step`).
  - After the simulated crash, the test verifies that the latest local checkpoint
    found on the node corresponds to the expected step based on the crash step
    and the local checkpoint interval.

Run 2:
  - Re-initializes the model, optimizer, and trainer.
  - Uses `nl.AutoResume` to automatically detect and load the latest checkpoint
    (which should be the local checkpoint saved just before the crash in Run 1).
  - Includes `CheckResumeStepCallback` which asserts, at the `on_train_start`
    hook, that `trainer.global_step` matches the step number of the local
    checkpoint from which it resumed. This confirms state restoration.
  - Continues training until `--max-steps`.
  - Finally, verifies that the latest local checkpoint after Run 2 completes
    corresponds to the expected final step based on `max_steps` and the local
    checkpoint interval.

Requirements:
  - nvidia_resiliency_ext (`res_module`) must be installed.
  - A shared filesystem for `--log-dir` if running on multiple nodes.
    N)	dataclass)Path)Optional)Callback)DistributedDataParallelConfig)OptimizerConfig)	lightning)llm)MockDataModule)Llama3Config
LlamaModel)+get_global_step_from_global_checkpoint_path)ModelCheckpoint)"update_trainer_local_checkpoint_io)MegatronOptimizerModule)safe_importz$nvidia_resiliency_ext.ptl_resiliencyc                   @   sV   e Zd ZU dZeed< dZeed< dZeed< dZeed< d	Z	eed
< dZ
eed< dS )Llama3Config145Mi  rotary_basei    
seq_length   
num_layersi   hidden_sizei
  ffn_hidden_size   num_attention_headsN)__name__
__module____qualname__r   int__annotations__r   r   r   r   r    r    r    Y/home/ubuntu/.local/lib/python3.10/site-packages/tests/collections/llm/test_local_ckpt.pyr   T   s   
 r   c                   @   s   e Zd ZdZdS )CrashExceptionz;Custom exception for triggering simulated crashes in tests.N)r   r   r   __doc__r    r    r    r!   r"   ^   s    r"   c                   @   (   e Zd ZdZdefddZd	ddZdS )
CrashCallbackzW
    Callback to simulate a crash at a specific step based on trainer.global_step.
    
crash_stepc                 C   s@   |dkrt d|| _| jrtd| j  d S td d S )Nr   z-crash_step must be a positive integer or NonezFCrashCallback initialized. Will simulate crash if global_step reaches zMCrashCallback initialized. Crash simulation is disabled (crash_step is None).)
ValueErrorr&   loggerdebug)selfr&   r    r    r!   __init__i   s   zCrashCallback.__init__returnNc                 G   s2   |j }|| jkrd| d}t| t|d S )Nz2Simulating crash via CrashCallback at global_step !)global_stepr&   r(   errorr"   )r*   trainerargscurrent_global_stepmsgr    r    r!   on_train_batch_endr   s   

z CrashCallback.on_train_batch_endr,   N)r   r   r   r#   r   r+   r4   r    r    r    r!   r%   d   s    	r%   c                   @   r$   )
CheckResumeStepCallbackz
    Callback to verify the trainer's global_step at the start of training.
    Used to ensure resumption from a checkpoint happened correctly.
    expected_resume_stepc                 C   s
   || _ d S )Nr7   )r*   r7   r    r    r!   r+      s   
z CheckResumeStepCallback.__init__r,   Nc                 G   s.   | j }|j}||ksJ d| d| dd S )NzHResumption check failed!
            Expected trainer.global_step to be z( after resuming, 
            but found z.
            )r7   r.   )r*   r0   r1   expected_step_at_startr2   r    r    r!   on_train_start   s   
z&CheckResumeStepCallback.on_train_startr5   )r   r   r   r#   r   r+   r:   r    r    r    r!   r6   z   s    r6   r,   c                 C   s*   t jd| j| j| j|d| j| j||ddS )z-Creates a PyTorch Lightning Trainer instance.gpu
   F)acceleratordevices	num_nodes	max_steps	callbackslog_every_n_stepsval_check_intervallimit_val_batchespluginsstrategyenable_progress_bar)nlTrainerr>   r?   r@   rC   rD   )r1   rA   rE   rF   r    r    r!   get_trainer   s   rJ   T
async_savec                 C   s   t jdddddd|dt d	S )z)Creates a NeMo MegatronStrategy instance.   NF)	tensor_model_parallel_sizepipeline_model_parallel_sizepipeline_dtype$virtual_pipeline_model_parallel_sizecontext_parallel_sizesequence_parallelckpt_async_saveckpt_parallel_loadddp)rH   MegatronStrategyr   )r1   rK   r    r    r!   get_megatron_strategy   s   rW   bf16_enabledc                 C   s(   t ddddddddd	| dd
}t|dS )z+Creates a MegatronOptimizerModule instance.adamg-C6?g?g?gffffff?g:0yE>g      ?FN)	optimizerlrweight_decay
adam_beta1
adam_beta2adam_eps	clip_gradlog_num_zeros_in_gradtimersbf16use_distributed_optimizerconfig)r   r   )rX   
opt_configr    r    r!   get_optimizer   s   
rh   log_dirrankc                 C   s   t | d t  t| S )z2Gets the expected local checkpoint directory path.
local_ckpt)r   socketgethostnamestr)ri   rj   r    r    r!   get_my_local_ckpt_node_dir   s   ro   local_ckpt_node_dirglobal_rankc                 C   s   d}|   std|   dS td| d}|  D ]}||j}|r4t|	d}||kr4|}q|dkr=|}|S d}|S )zMFinds the step number of the latest local checkpoint based on the input path.z+Local checkpoint node directory not found: Nziter_(\d+)__localrL   )
is_dirr(   warningrecompileiterdirmatchnamer   group)rp   rq   latest_steppatternitemry   stepresultr    r    r!   find_latest_local_ckpt_step   s    r   c                  C   s   t jdd} | jdtddd | jdtdd	d
 | jdtddd
 | jdtddd
 | jdtddd
 | jdtddd
 | jdtddd
 | jdtddd
 | jdddd | jd td!d"d
 | jd#dd$d | jd%td&g d'd(d) | S )*zCreates the argument parser.zLlama3 Local Ckpt Crash Test)descriptionz	--log-dirTzFilesystem output dir.)typerequiredhelpz--num-nodesrL   z+Total nodes in the job (usually from SLURM))r   defaultr   z	--devicesr   z"GPUs per node (usually from SLURM)z--max-steps   zTotal steps for trainingz--checkpoint-intervalP   zGlobal checkpoint intervalz--local-checkpoint-interval-   zLocal checkpoint intervalz--val-check-intervalzValidation intervalz--limit_val_batchesr<   zValidation batches limitz--async-save
store_truezUse async global ckpt save)actionr   z--crash-stepd   z(Global step for Rank 0 to simulate crashz--cleanup-log-dirz(Rank 0 cleans up log dir before startingz--log-levelINFO)DEBUGr   WARNINGERRORCRITICALzSet the logging level)r   r   choicesr   )argparseArgumentParseradd_argumentrn   r   )parserr    r    r!   
get_parser   s(   r   c                  C   s   t   } tt| j tj}tj|dtj	d t
sJ d| j| jks(J d| j| jk s2J d| jdks;J dtj| jdd	}t| }tt|}t|| d S )
Nz)%(asctime)s - %(levelname)s - %(message)s)levelformatstreamz0nvidia_resiliency_ext is required for this test.z<Crash step must be after the first local checkpoint intervalz#Crash step must be before max_stepsr   z*Local checkpoint interval must be positivetorchrun)ntasks_per_nodelauncher)r   
parse_argsgetattrlogging	log_levelupperr   basicConfigsysstdoutHAVE_RESr&   local_checkpoint_intervalr@   runLocalExecutorr>   varsPartialrun_test)r1   log_level_numericexecutor	args_dictscriptr    r    r!   main   s"   
r   r   c           %   
   C   s`  t jd<i | }ttjdd}ttjdd}|dkr\td|  td|j  td|j	
   |jdkrItd|j d	 t|j s\|js\J d
|j dt|j}|jr|dkr| rqt| td|  |jddd tj rtj rt  t|j|}td|  d}|| }t }t|j||d}	tjdd}
t||jd}t |d}t!dd}t"ddd|j#dddd}t$j%j&|j'd}t(|jdkr|jnd d}||g}tj)|jd |d}t*|||
g|d}t+|dsJ dtd t,||jt- tj.ddd }d!}|dkr$t/d" zt0j1||	||||d#d$ W n t2yO } ztd%|  d}W Y d }~nd }~ww tj rZt  d&}td' |shJ d(td)|j d* t3||}|d usJ d+| d,|j|j' |j' }td-|  ||ksJ d.| d/| td0|  |dkrt/d1d2| d3 d4  tj rt  t|j||d}	tjdd}t||jd}t |d}t!dd}t|j||d}	t"ddd|j#dddd}t$j%j&|j'd}t4|d5}||g} tj)|jd |d}!t*|| |g|d}"t+|"ds(J d|"j5|ks2J d6|"j6|ks<J d7t,|"|jt- tj.dd!d }#td8| d9 t0j1||	|"|!||#d#d$ td:| d	 t3||}$|$d uszJ d+| d;|j7|j' |j' }td-|  |$|ksJ d.|$ d/| td0|$  tj rt  d S d S )=NRANKr   
WORLD_SIZErL   zTest started. World size: zUsing shared log directory: zLogging level set to: z!Crash simulation enabled at step .zLog directory z. does not exist and cleanup was not requested.zCreating log directory: T)parentsexist_okz.Expecting local checkpoints for this node in: )r   global_batch_sizemicro_batch_sizez
bf16-mixed)	precision)rK   re   )rX   val_lossz6{model_name}--{val_loss:.2f}-{step}-{consumed_samples})	save_lastmonitor
save_top_kevery_n_train_stepssave_on_train_epoch_endsave_optim_on_train_endfilename)r   )r&   )ri   wandbckpt)rA   rE   rF   rq   z$Trainer needs global_rank populated.z<Initializing Run 1 Trainer complete. Patching for local I/O.)resume_if_existsresume_ignore_no_checkpointFzL
==================== Starting Run 1: Train until crash ====================data)modelr   r0   logoptimresume	tokenizerz%
Successfully caught expected crash: rr   z%
Verifying checkpoints after Run 1...z#Training did not crash as expected!z Run 1 finished (crashed at step z).zNo local checkpoints found in z after crash!z'Expected latest local checkpoint step: zLatest local step z is not equal to the expected z'Latest local checkpoint found at step: z
====================z. Starting Run 2: All ranks resuming from step  z====================r8   z'Trainer rank differs from env var rank!z3Trainer world size differs from env var world size!z[Rank z*] Starting Run 2 training loop (resuming).zRun 2 resumed from step z after resuming!r    )8r   	Namespacer   osenvirongetr(   r)   ri   r   r   r&   r   existscleanup_log_dirshutilrmtreemkdirtorchdistributedis_availableis_initializeddistbarrierro   r   r
   r   rH   MegatronMixedPrecisionrW   rK   r   rh   r   checkpoint_interval
res_modulelocal_checkpoint_callbackLocalCheckpointCallbackr   r%   
NeMoLoggerrJ   hasattrr   r   
AutoResumeinfor	   trainr"   r   r6   rq   
world_sizer@   )%r   r1   rq   r   log_dir_pathmy_local_ckpt_node_dirmbsgbsmodel_configr   precision_pluginrF   
model_run1
optim_run1checkpoint_callbackr   crash_callbackcallbacks_run1nemo_logger_plugintrainer_run1resume_logiccrashedelatest_local_step_run1expected_latest_stepprecision_plugin_run2strategy_run2
model_run2
optim_run2checkpoint_callback_run2local_checkpoint_callback_run2check_resume_step_callbackcallbacks_run2nemo_logger_plugin_run2trainer_run2resume_logic_run2latest_local_step_run2r    r    r!   r     sZ  




	



	




	
	

r   __main__)Tr5   )Gr#   r   r   r   rv   r   rl   r   dataclassesr   pathlibr   typingr   nemo_runr   r   torch.distributedr   r   lightning.pytorch.callbacksr   megatron.core.distributedr   megatron.core.optimizerr   nemor   rH   nemo.collectionsr	   "nemo.collections.llm.gpt.data.mockr
   $nemo.collections.llm.gpt.model.llamar   r   (nemo.collections.llm.recipes.log.defaultr    nemo.lightning.pytorch.callbacksr   !nemo.lightning.pytorch.local_ckptr   %nemo.lightning.pytorch.optim.megatronr   nemo.utils.import_utilsr   	getLoggerr   r(   r   r   r   	Exceptionr"   r%   r6   rI   rJ   boolrV   rW   rh   rn   r   ro   r   r   r   r   dictr   r    r    r    r!   <module>   sZ   $
	
 
X
