o
    }oil                     @   s  d dl Z d dlZd dlZd dlmZmZmZmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZm Z  e !e"Z#G dd deeZ$dede%deege&f ddfddZ'dS )    N)AnyCallableDictOptional)CheckpointIO)_PATH)MCoreTensorAwareStateDict)TensorAwareStateDict)BaseCheckpointManager)LocalCheckpointManager)LazyCliqueReplicationStrategy)dict_list_map_inplace)HierarchicalCheckpointIOLocalCheckpointCallback)Trainer)AsyncCompatibleCheckpointIOAsyncFinalizableCheckpointIOc                       s   e Zd ZdZ				ddededeegef de	d	e
d
eejj de	f fddZdee
ef defddZ	ddefddZ  ZS )MCoreHierarchicalCheckpointIOa  HierarchicalCheckpointIO implementation compatible with MCore distributed checkpointing.

    Args:
        wrapped_checkpoint_io (CheckpointIO): previously used checkpoint_io (for global checkpoints).
        local_ckpt_manager (BaseCheckpointManager): local checkpoint manager used to store the local checkpoints
        get_global_ckpt_iteration_fn (Callable[[_PATH], int]): a function that retrieves the iteration
            of a global checkpoint that will be compared with local checkpoint iteration
            in order to decide which to resume from.
        async_save (bool, optional): enables asynchronous save. Passed down to the local checkpoint
            manager unless overriden with `local_ckpt_options` in `_save_local_checkpoint`.
            If True, MCoreHierarchicalCheckpointIO must be wrapped with `AsyncFinalizableCheckpointIO` wrapper
        local_ckpt_algo (str, optional): local checkpoint save algorithm. See MCoreTensorAwareStateDict for details.
            By default, uses a fully parallel save and load algorithm ('fully_parallel`).
        parallelization_group (ProcessGroup, optional): save/load parallelization group
        allow_cache (bool, optional): if True, subsequent checkpoint saves will reuse
            the cached parallelization metadata.
    Ffully_parallelNwrapped_checkpoint_iolocal_ckpt_managerget_global_ckpt_iteration_fn
async_savelocal_ckpt_algoparallelization_groupallow_cachec                    s.   t  |||| || _|| _d | _|| _d S N)super__init__r   r   cached_metadatar   )selfr   r   r   r   r   r   r   	__class__ U/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/pytorch/local_ckpt.pyr   ;   s
   

z&MCoreHierarchicalCheckpointIO.__init__
checkpointreturnc                 C   s@   t j|| j| j| jd\}}dd }t||j | jrd| _|S )zSpecialized implementation using MCoreTensorAwareStateDict.

        Wraps the state dict in MCoreTensorAwareStateDict and makes sure
        that "common" state dict doesn't have any CUDA tensors.
        )algor   r   c                 S   s4   t | tjr| jjdkrtd | jddd} | S )NcpuzMoving CUDA tensor to CPUT)non_blocking)
isinstancetorchTensordevicetypeloggerdebugto)xr#   r#   r$   to_cpuX   s   
zHMCoreHierarchicalCheckpointIO.to_tensor_aware_state_dict.<locals>.to_cpuN)r   from_state_dictr   r   r   r   commonr   )r    r%   state_dict_for_save_r3   r#   r#   r$   to_tensor_aware_state_dictK   s   
z8MCoreHierarchicalCheckpointIO.to_tensor_aware_state_dicttensor_aware_checkpointc                 C   sB   t |tsJ dt| |durtd |j|| j| jdS )z8Unwraps MCoreTensorAwareStateDict to a plain state dict.z)Unexpected tensor aware state dict type: NzEMCoreTensorAwareStateDict does not yet support the 'strict' argument.)r'   r   )r*   r   r.   r/   warningto_state_dictr   r   )r    r9   sharded_state_dictstrictr#   r#   r$   from_tensor_aware_state_dictc   s   
z:MCoreHierarchicalCheckpointIO.from_tensor_aware_state_dict)Fr   NF)NN)__name__
__module____qualname____doc__r   r
   r   r   intboolstrr   r+   distributedProcessGroupr   r   r   r	   r8   r>   __classcell__r#   r#   r!   r$   r   (   s4    
r   trainerlocal_checkpoint_base_dirr   r&   c                 K   s   | j }tdd |D }|sdS | jj}t| jdd}|r+t|ts(J t||j}| jdkr4t	 }nd}t
tj|dt |d}	t||	|fd|i|}
|rVt|
}
|
| j_dS )	a  Update the Trainer with the corresponding MCoreHierarchicalCheckpointIO if local checkpointing is used.

    Args:
        trainer (nl.Trainer): Trainer object to drive training loop.
        local_checkpoint_base_dir (str): Root directory under which to save local checkpoints.
        get_global_ckpt_iteration_fn (Callable): a function that retrieves the iteration of a global checkpoint
            that will be compared with local checkpoint iteration in order to decide which to resume from.
        **kwargs (dict): Additional kwargs passed to initialize MCoreHierarchicalCheckpointIO.

    Note:
        Async saving of local checkpoints is inferred based on what was configured on the strategy, if available.

    c                 s   s    | ]}t |tV  qd S r   )r*   r   ).0cbr#   r#   r$   	<genexpr>   s    z5update_trainer_local_checkpoint_io.<locals>.<genexpr>Nr   F   
local_ckpt)repl_strategy)	callbacksanystrategycheckpoint_iogetattrr*   r   r.   	num_nodesr   r   ospathjoinsocketgethostnamer   )rI   rJ   r   kwargsrQ   use_local_ckptrT   r   rP   r   hierarchical_checkpointing_ior#   r#   r$   "update_trainer_local_checkpoint_iot   s8   
r_   )(loggingrW   rZ   typingr   r   r   r   r+   lightning_fabric.pluginsr    lightning_fabric.utilities.typesr   8megatron.core.dist_checkpointing.tensor_aware_state_dictr   9nvidia_resiliency_ext.checkpointing.local.base_state_dictr	   Dnvidia_resiliency_ext.checkpointing.local.ckpt_managers.base_managerr
   Envidia_resiliency_ext.checkpointing.local.ckpt_managers.local_managerr   @nvidia_resiliency_ext.checkpointing.local.replication.strategiesr   0nvidia_resiliency_ext.fault_tolerance.dict_utilsr   >nvidia_resiliency_ext.ptl_resiliency.local_checkpoint_callbackr   r   nemo.lightning.pytorch.trainerr   !nemo.utils.callbacks.dist_ckpt_ior   r   	getLoggerr?   r/   r   rE   rC   r_   r#   r#   r#   r$   <module>   s6   
L