o
    }oiS                     @   s  d dl Z d dlZd dlmZmZ d dlmZ d dlmZm	Z	 d dl
Zd dlZd dlmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZ d d
lmZ d dlmZ ed\ZZdejde ej!e  fddZ"eddG dd dej#Z$eddG dd dej#Z%eddG dd dej#Z&eddG dd dej#Z'eddG dd dej#Z(eddG dd dej#Z)eddG dd  d ej#Z*eddG d!d" d"ej#Z+dS )#    N)	dataclassfield)Path)CallableOptional)Callback)WandbLogger)YamlSerializer)MemoryProfileCallbackNsysCallbackPreemptionCallback)MegatronStrategy)logging)safe_importz$nvidia_resiliency_ext.ptl_resiliencypartial	callbacksc                 C   s\   t | dr,t | jdr#| jjr#|D ]}|| jjvr | jj| qd S t|| j_d S d S )Ntrainerr   )hasattrr   r   appendcopydeepcopy)r   r   callback r   N/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/run/plugins.py_merge_callbacks(   s   
r   T)kw_onlyc                   @   sZ   e Zd ZU dZdZeed< edd dZe	e
je  ed< de
je
jB d	e
jfd
dZdS )PreemptionPlugina`  
    A plugin for setting up Preemption callback and preemption signals.

    Args:
        preempt_time (int): The time, in seconds, before the task's time limit at which the executor
                             will send a SIGTERM preemption signal. This allows tasks to be gracefully
                             stopped before reaching their time limit, reducing waste and
                             promoting fair resource usage. The default value is 60 seconds (1 minute).
                             This is only supported for ``run.SlurmExecutor``.
        callbacks (list[run.Config[Callback]]): A list of callback configurations that the plugin
                                                will merge with the task's existing callbacks.
                                                By default, the list includes NeMo's preemption callback.
    <   preempt_timec                   C   s   t tgS N)runConfigr   r   r   r   r   <lambda>C   s    zPreemptionPlugin.<lambda>)default_factoryr   taskexecutorc                 C   sn   t |tjrtd| jj d dS t |tjr.t| jj d| j	 d d| j	 |_
t|| jd dS )zSet up the preemption plugin.The B will have no effect on the task as it's an instance of run.ScriptNz will send a SIGTERM z= seconds before the job's time limit for your Slurm executor.zTERM@r   )
isinstancer    Scriptr   warning	__class____name__SlurmExecutorinfor   signalr   r   selfr$   r%   r   r   r   setupE   s   zPreemptionPlugin.setupN)r-   
__module____qualname____doc__r   int__annotations__r   r   listr    r!   r   Partialr*   Executorr3   r   r   r   r   r   2   s
   
   r   c                   @   s^   e Zd ZU dZdZeed< dZeed< dZeed< dZ	eed	< d
e
je
jB de
jfddZdS )FaultTolerancePlugina  
    A plugin for setting up the fault tolerance callback from nvidia-resiliency-ext.
    This plugin enables workload hang detection, automatic calculation of timeouts used for hang detection,
    detection of rank(s) terminated due to an error and workload respawning in case of a failure.
    Note: FaultTolerancePlugin does not work with the NsysPlugin.
    Args:
        num_in_job_restarts (int): Max number of restarts on failure, within the same job. Default is 3.
        num_job_retries_on_failure (int): Max number of new job restarts on failure. Default is 2.
        initial_rank_heartbeat_timeout (int): Timeouts are time intervals used by a rank monitor to detect
            that a rank is not alive. This is the max timeout for the initial heartbeat. Default is 1800.
        rank_heartbeat_timeout (int): This is the timeout for subsequent hearbeats after the initial heartbeat.
            Default is 300.
       num_in_job_restarts   num_job_retries_on_failurei  initial_rank_heartbeat_timeouti,  rank_heartbeat_timeoutr$   r%   c                 C   s   t sJ dtj| j| j| jd|_| j|_t	|tj
sJ tjtjdd|jjdg}|jjr3J dt|drMt|jdrMttdd	 |jjsMJ d
t||d dS )z"Set up the fault tolerance plugin.zQnvidia-resiliency-ext.ptl_resiliency is required to use the FaultTolerancePlugin.)max_restartsrA   rB   T)
autoresumecalculate_timeoutsexp_dirz1Nsys not supported with the FaultTolerancePlugin.r   r   c                 S   s   dt | v r| jtk S dS )N__fn_or_cls__T)dirrG   r   )cbr   r   r   r"      s    z,FaultTolerancePlugin.setup.<locals>.<lambda>z-Nsys not supported with FaultTolerancePlugin.r(   N)HAVE_RESr    FaultTolerancer>   rA   rB   launcherr@   retriesr)   r:   r!   
res_moduleFaultToleranceCallbackloglog_dirnsys_profiler   r   allmapr   r   )r2   r$   r%   r   r   r   r   r3   l   s.   zFaultTolerancePlugin.setupN)r-   r4   r5   r6   r>   r7   r8   r@   rA   rB   r    r:   r*   r;   r3   r   r   r   r   r<   W   s   
  r<   c                   @   sr   e Zd ZU dZeed< eed< dZeee  ed< dZ	eee
  ed< dZeed< d	ejejB d
ejfddZdS )
NsysPlugina  
    A plugin for nsys profiling.

    The NsysPlugin allows you to profile your run using nsys.
    You can specify when to start and end the profiling, on which ranks to run the profiling,
    and what to trace during profiling.

    Args:
        start_step (int): The step at which to start the nsys profiling.
        end_step (int): The step at which to end the nsys profiling.
        ranks (Optional[list[int]]): The ranks on which to run the nsys profiling. If not specified,
            profiling will be run on rank 0.
        nsys_trace (Optional[list[str]]): The events to trace during profiling. If not specified,
            'nvtx' and 'cuda' events will be traced.
    
start_stepend_stepNranks
nsys_traceF	gen_shaper$   r%   c                 C   sx   t |tjr tjt| j| j| jpdg| jd}|g}t	||d |
 }d|_| jp-ddg|_t |tjr:d|_dS dS )	z!Set up the nsys profiling plugin.r   )rV   rW   rX   rZ   r(   TnvtxcudazEprofile_%p_%q{SLURM_JOB_ID}_node%q{SLURM_NODEID}_rank%q{SLURM_PROCID}N)r)   r    r:   r!   r   rV   rW   rX   rZ   r   get_launcherrR   rY   r.   nsys_filename)r2   r$   r%   nsys_callbackr   rL   r   r   r   r3      s    

zNsysPlugin.setup)r-   r4   r5   r6   r7   r8   rX   r   r9   rY   strrZ   boolr    r:   r*   r;   r3   r   r   r   r   rU      s   
  rU   c                   @   sJ   e Zd ZU dZeed< dZeee	  ed< de
je
jB de
jfddZdS )	MemoryProfilePlugina  
    A plugin for memory profiling.

    The MemoryProfilePlugin allows you to profile a timeline of memory allocations during you run.
    The memory profiling plugin creates snapshots during the entire training. You can specify
    which ranks to run the profiling.

    Args:
        dir (str): Directory to store the memory profile dump .pickle files
        ranks (Optional[list[int]]): The ranks on which to run the memory profiling. If not specified,
            profiling will be run on rank 0.
    rH   NrX   r$   r%   c                 C   s@   t |tjrtjt| j| jpdgd}|g}t||d dS dS )z#Set up the memory profiling plugin.r   )rH   rX   r(   N)r)   r    r:   r!   r
   rH   rX   r   )r2   r$   r%   memprof_callbackr   r   r   r   r3      s   
zMemoryProfilePlugin.setup)r-   r4   r5   r6   r`   r8   rX   r   r9   r7   r    r:   r*   r;   r3   r   r   r   r   rb      s
   
  rb   c                   @   sX   e Zd ZU dZeed< edeje	 f ed< dZ
eed< dejejB dejfd	d
ZdS )WandbPlugina  
    A plugin for setting up Weights & Biases.

    This plugin sets a ``WandbLogger`` to ``NeMoLogger``'s ``wandb`` arg,
    which in turn initializes the Pytorch Lightning `WandbLogger
    <https://lightning.ai/docs/pytorch/stable/extensions/generated/lightning.pytorch.loggers.WandbLogger.html>`_.

    This plugin is only activated if the ``WANDB_API_KEY`` environment variable is set.
    The ``WANDB_API_KEY`` environment variables will also be set in the executor's environment variables.
    Follow https://docs.wandb.ai/quickstart to retrieve your ``WANDB_API_KEY``.

    If `log_task_config` is True, the plugin will log the task configuration as a config dictionary
    to the Weights and Biases logger.

    Args:
        name (str): The name for the Weights & Biases run.
        logger_fn (Callable[..., run.Config[WandbLogger]]): A callable that returns a Config of ``WandbLogger``
        log_task_config (bool, optional): Whether to log the task configuration to the logger.
            Defaults to True.

    Raises:
        logging.warning: If the task is an instance of `run.Script`, as the plugin has no effect on such tasks.
    name.	logger_fnTlog_task_configr$   r%   c                 C   s   t |tjrtd| jj d dS dtjv rotjd |j	d< t
|drit
|jdrk| j| jd|j_| jrmtt |}| j| j| t |tjrZtj|jjt|jjnd|jd|d	< ||jj_dS dS dS dS td| jj d
 dS )zSet up the wandb plugin.r&   r'   NWANDB_API_KEYrP   wandb)re   )id	task_namer%   remote_directorylocal_directory
experimentzF will have no effect as WANDB_API_KEY environment variable is not set.)r)   r    r*   r   r+   r,   r-   osenvironenv_varsr   rP   rf   re   ri   rg   yaml	safe_loadr	   	serializeexperiment_idr/   r.   pathjointunneljob_dirr   config)r2   r$   r%   partial_configr   r   r   r3      s2   


zWandbPlugin.setupN)r-   r4   r5   r6   r`   r8   r   r    r!   r   rg   ra   r:   r*   r;   r3   r   r   r   r   rd      s   
  rd   c                   @   sj   e Zd ZU dZdZeed< dZeed< dZeed< dZ	eed< dZ
eed< d	ejejB d
ejfddZdS )ConfigValidationPluginaG  
    A plugin for validating a NeMo task with its executor.

    This plugin is used to ensure that the NeMo environment, task, and executor meet certain criteria.
    The validation checks include preemption, checkpoint directory,
    serialization, and Weights and Biases (wandb) integration.

    Attributes:
        validate_preemption (bool): Whether to validate the preemption callback. If set to True, the plugin will
            assert that the task has a `PreemptionCallback`. Defaults to True.
        validate_checkpoint_dir (bool): Whether to validate the checkpoint directory. If set to True and the executor
            is a `SlurmExecutor`, the plugin will assert that the task's log directory exists in the mounts
            specified in the `SlurmExecutor`. Defaults to True.
        validate_serialization (bool): Whether to validate task serialization. If set to True, the plugin will
            assert that the task can be successfully serialized and deserialized using NeMo-Run's
            `ZlibJSONSerializer`. Defaults to True.
        validate_wandb (bool): Whether to validate Weights and Biases integration. If set to True, the plugin will
            assert that the executor's environment variables contain a `WANDB_API_KEY`
            and that NeMo Logger's `wandb` is set. Defaults to False.
        validate_nodes_and_devices (bool): Whether to validate the number of devices and nodes. If set to True,
            the plugin will assert that the task's trainer is configured to use the same number of nodes and devices
            as the executor. Defaults to True.
    Tvalidate_preemptionvalidate_checkpoint_dirvalidate_serializationFvalidate_wandbvalidate_nodes_and_devicesr$   r%   c                    s  t  tjsJ td jj d|jj d | jr-td t	t
dd  jjs-J | jret |tjre|jdg }tt
dd |}td	 jj d
|   jjsXJ t	t
 fdd|seJ | jrddlm} td | }||  ksJ | jrtd d|j v sJ  jjsJ | jrtd t |tjr jj|jksJ  jj|  ksJ dS dS dS )z*Set up the plugin to configure validation.zValidating z and .zValidating preemption callbackc                 S   s
   | j tkS r   )rG   r   )r   r   r   r   r"   ;  s   
 z.ConfigValidationPlugin.setup.<locals>.<lambda>z	/nemo_runc                 S   s   |  dd S )N:)split)mr   r   r   r"   @  s    zValidating checkpoint dir z exists in c                    s   t | t  jjjv S r   )r   rP   rQ   parents)mountr$   r   r   r"   C  s    r   )ZlibJSONSerializerz1Validating serialization/de-serialization of taskz6Validating that Weights and Biases is enabled for taskrh   z=Validating that nodes and devices match for task and executorN)!r)   r    r:   r   r/   rG   r5   r,   r}   anyrT   r   r   r~   r.   container_mountsr9   rP   rQ   r   %nemo_run.core.serialization.zlib_jsonr   deserializert   r   rq   keysri   r   	num_nodesnodesdevicesnproc_per_node)r2   r$   r%   mountsr   
serializerr   r   r   r3   5  s:    



zConfigValidationPlugin.setupN)r-   r4   r5   r6   r}   ra   r8   r~   r   r   r   r    r:   r*   r;   r3   r   r   r   r   r|     s   
  r|   c                   @   s   e Zd ZU dZdZeed< dZeed< dZ	eed< dZ
ee ed	< dZeed
< dZeed< dd ZdejejB dejfddZdS )PerfEnvPlugina  
    A plugin for setting up performance optimized environments.

    Attributes:
        enable_layernorm_sm_margin (bool): Set SM margin for TransformerEngine's Layernorm, so
            in order to not block DP level communication overlap.
        layernorm_sm_margin (int): The SM margin for TransformerEngine Layernorm.
        enable_vboost (bool): Whether to steer more power towards tensor cores via
            `sudo nvidia-smi boost-slider --vboost 1`. May not work on all systems.
    Tenable_layernorm_sm_margin   layernorm_sm_marginFenable_vboostNnccl_pp_comm_chunksizegpu_sm100_or_neweruser_buffer_registrationc                 C   sH   ddl }dddd| dtj|dd	tj|d
d|dg	}|S )zCCreate the vboost `sudo nvidia-smi boost-slider --vboost 1` commandr   N z
# Command 0: enable vboost

srunz	--ntasks=z--outputz
vboost.outz--errorz
vboost.errzbash -c z'sudo nvidia-smi boost-slider --vboost 1)shlexrw   ro   rv   quote)r2   r   ry   r   
vboost_cmdr   r   r   get_vboost_srun_cmdl  s   z!PerfEnvPlugin.get_vboost_srun_cmdr$   r%   c                 C   s  |j jjtkr|j jj}|j jj}| jr"|dks|dkr"d|jd< n| js2|dks-|dkr2d|jd< | jrEt	| j
|jd< t	| j
|jd< |j jj}|dkrh| jdurht| jtr^| jdks`J t	| j|jd< d|jd	< | jrd|jd
< d|jd< d|jv r|jd d}d|v rtd |d d||jd< | jrt|tjr| |j|jj}|jrt|jdkr|j| n||_dS dS dS )z+Enable the performance environment settings   32CUDA_DEVICE_MAX_CONNECTIONS1NVTE_FWD_LAYERNORM_SM_MARGINNVTE_BWD_LAYERNORM_SM_MARGINNNCCL_P2P_NET_CHUNKSIZETORCH_NCCL_HIGH_PRIORITYNCCL_NVLS_ENABLENCCL_CTA_POLICYPYTORCH_CUDA_ALLOC_CONF,zexpandable_segments:TruezPYTORCH_CUDA_ALLOC_CONF=expandable_segments:True is not currently compatible withuser buffer registration. Removing expandable_segments:True from the list.r   )r   strategyrG   r   tensor_model_parallel_sizecontext_parallel_sizer   rq   r   r`   r   pipeline_model_parallel_sizer   r)   r7   r   r   r   r+   removerw   r   r    r.   r   r   rx   ry   setup_lineslen)r2   r$   r%   tp_sizecp_sizepp_sizepytorch_cuda_alloc_confr   r   r   r   r3     sF   









zPerfEnvPlugin.setup)r-   r4   r5   r6   r   ra   r8   r   r7   r   r   r   r   r   r   r    r:   r*   r;   r3   r   r   r   r   r   X  s   
  r   c                   @   sX   e Zd ZdZddlmZ er(ddlmZ edkr*de	j
e	jB de	jfdd	Zd
S d
S d
S )TritonCacheSetupz
    A plugin for setting up Triton cache environment variables.
    This should not be neccessay for Triton 3.2.0 and above.
    r   )TRITON_AVAILABLE)__version__z3.1.0r$   r%   c                 C   s   |j d |jd< d|jd< dS )z.Set up the Triton cache environment variables.triton_cahceTRITON_CACHE_DIRz?megatron.core.ssm.triton_cache_manager:ParallelFileCacheManagerTRITON_CACHE_MANAGERN)ry   rq   r1   r   r   r   r3     s   zTritonCacheSetup.setupN)r-   r4   r5   r6   nemo.core.utils.optional_libsr   tritonr   triton_versionr    r:   r*   r;   r3   r   r   r   r   r     s     r   ),r   ro   dataclassesr   r   pathlibr   typingr   r   nemo_runr    rr   lightning.pytorchr   lightning.pytorch.loggersr    nemo_run.core.serialization.yamlr	    nemo.lightning.pytorch.callbacksr
   r   r   3nemo.lightning.pytorch.strategies.megatron_strategyr   
nemo.utilsr   nemo.utils.import_utilsr   rN   rJ   r:   r9   r!   r   Pluginr   r<   rU   rb   rd   r|   r   r   r   r   r   r   <module>   s@   
$3,>B]