o
    }oi;K                     @   s  d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dl m!Z! d dl"m#Z# d dlm$Z$ d dl%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 zd dl1m2Z2 W n e3y   eZ2Y nw edej4dZ5ede$j6dZ7eG dd de.e
e5 Z8dee9ef defddZ:G dd de2e.Z;de	de	fd d!Z<de=fd"d#Z>d$e	e9ef dee? fd%d&Z@dS )'    N)	dataclassfield)Path)AnyCallableDictGenericOptionalTypeVarUnion)CheckpointIO)get_filesystem)_PATH)!get_default_load_sharded_strategy!get_default_save_sharded_strategy)SaveShardedStrategy) FullyParallelLoadStrategyWrapper FullyParallelSaveStrategyWrapper)TorchDistSaveShardedStrategy)get_data_parallel_group)nn)Selfoverride)WEIGHTS_PATHckpt_to_dir)
IOProtocol)IOMixin)logging)AsyncCompatibleCheckpointIOLightningModuleT)boundModuleTc                   @   sv   e Zd ZU dZeed< ejed< ee	dZ
eeef ed< edejdefddZedejdeeef fd	d
ZdS )TrainerContexta  
    A context wrapper for a PyTorch Lightning Trainer and its associated model.

    This class ensures that both the trainer and its LightningModule extend `IOMixin`
    and provides additional context information.

    Attributes:
        model (LightningModuleT): The Lightning model associated with the trainer.
        trainer (pl.Trainer): The PyTorch Lightning trainer instance.
        extra (Dict[str, Any]): Additional context data, such as the `datamodule`, if available.
    modeltrainer)default_factoryextrareturnc                 C   sD   t |dstdt dt |jdstd| ||j| |dS )a  
        Creates a `TrainerContext` instance from a given `pl.Trainer`.

        Ensures that the trainer and its associated LightningModule support the `IOMixin` interface.

        Args:
            trainer (pl.Trainer): A PyTorch Lightning Trainer instance.

        Returns:
            TrainerContext: A new instance containing the trainer, model, and extra context.

        Raises:
            ValueError: If the trainer or its LightningModule does not extend `IOMixin`.
        __io__zTrainer must be an instance of z#. Please use the Trainer from nemo.z$LightningModule must extend IOMixin.)r$   r#   r&   )hasattr
ValueErrorr   lightning_moduleconstruct_extra)clsr$    r.   H/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/io/pl.pyfrom_trainerH   s
   
zTrainerContext.from_trainerc                 C   s*   i }t |drt |jdr|jj|d< |S )as  
        Constructs an `extra` dictionary containing additional relevant context.

        If the trainer has a `datamodule` that supports `IOMixin`, it will be added to `extra`.

        Args:
            trainer (pl.Trainer): A PyTorch Lightning Trainer instance.

        Returns:
            Dict[str, Any]: A dictionary containing extra context information.
        
datamoduler(   )r)   r1   r(   )r-   r$   r&   r.   r.   r/   r,   _   s   zTrainerContext.construct_extraN)__name__
__module____qualname____doc__r   __annotations__plTrainerr   dictr&   r   strr   classmethodr   r0   r,   r.   r.   r.   r/   r"   6   s   
 
"r"   filepathr'   c                 C   s   t | d} | }t|trJ |jd tkr!|t }| s|r!|}t|dr?|jjd tkr?|jt }| s9|r?|jt |_|rQ|jd tksJJ |j| ksQJ |S )z|Given an input checkpoint filepath, clean it using `ckpt_to_dir`
    and then return the weights subdirectory, if it exists.)r<   base_model_path)	r   
isinstancer:   partsr   is_dirr)   r>   parent)r<   	is_savingbase_dirmaybe_base_dirmaybe_base_model_pathr.   r.   r/   ckpt_to_weights_subdirs   s   

rG   c                   @   s   e Zd ZdZ								d%dededed	ee d
edededefddZe	d&de
eef dedee ddfddZe				d'dedee ded eB de
eef fddZe	deddfddZdd Zed(d d!Zded"e
eef fd#d$ZdS ))MegatronCheckpointIOzCheckpointIO that utilizes :func:`torch.save` and :func:`torch.load` to save and load checkpoints respectively,
    common for most use cases.

    .. warning::  This is an :ref:`experimental <versioning:Experimental API>` feature.

    
torch_distTFNsave_ckpt_formatload_directly_on_device
async_savetorch_dist_multiprocassume_constant_structureparallel_saveparallel_save_within_dpparallel_loadc	           	      C   s@   || _ || _|| _|| _|| _|| _|| _|| _d | _d| _	d S )NF)
rJ   rK   rL   rM   rN   rO   rP   rQ   _save_sharded_strategyvalidated_consistency)	selfrJ   rK   rL   rM   rN   rO   rP   rQ   r.   r.   r/   __init__   s   
zMegatronCheckpointIO.__init__
checkpointpathstorage_optionsr'   c                    s4  ddl m} |durt|dkrt| jj d|d tdd}t|}|j	|dd | j
o3| j }d| _
tj }t| t }	|j||| j|| jd	}
t }d
d|  durcd  ndd|	ddd||	 ddf}ddd |D }t|  fdd}| jr|
dusJ |
| |
S )a  Save model/training states as a checkpoint file through state-dump and file-write.

        Args:
            checkpoint: dict containing model and trainer state
            path: write-target path
            storage_options: not used in ``TorchCheckpointIO.save_checkpoint``

        Raises
        ------
            TypeError:
                If ``storage_options`` arg is passed in

        r   dist_checkpointingNz7 does not support storage_options, but storage_options=z- was provided. Ignoring given storage_optionsTrC   )exist_ok)sharded_state_dictcheckpoint_dirsharded_strategyvalidate_access_integrityasync_sharded_savezGlobal Checkpoint SavezRank: zIteration: zStart time: .3fszSave duration: z : c                 s   s    | ]	}|d ur|V  qd S Nr.   ).0partr.   r.   r/   	<genexpr>   s    z7MegatronCheckpointIO.save_checkpoint.<locals>.<genexpr>c                      s    t dt dd  d S )Nz-Successfully saved checkpoint from iteration 7dz to )r   infointr.   	iterationrW   r.   r/   iter_finalize_fn   s    z>MegatronCheckpointIO.save_checkpoint.<locals>.iter_finalize_fn)megatron.corerZ   lenr   warning	__class__r2   rG   r   makedirsrS   rN   torchdistributedget_rank_get_iteration_from_checkpointtimesavesave_sharded_strategyrL   joinri   add_finalize_fn)rT   rV   rW   rX   rZ   r^   fsvalidate_sharding_integrityrank
start_timeasync_save_requestend_time	log_partslog_messagerm   r.   rk   r/   save_checkpoint   sH   



z$MegatronCheckpointIO.save_checkpointmap_locationstrictStrictHandlingc              	   C   s  ddl m} ddlm} |durtdt|}||s$td| ||s1td| dt	|d	d
}t
|drF|j sF|jj|_| jdkrZ| jrZddlm} |dd}	nd}	| jro|	du rgt|}	t|	tdd}	|	dur|td|	 d t|tr|s| ||}|r|jn|j}|du r|j}t }
|j|t||	|d}t|}t }||
 }tdt j!"  d|
dd|dd |S )a  Loads checkpoint using :func:`torch.load`, with additional handling for ``fsspec`` remote loading of files.

        Args:
            path: Path to checkpoint
            map_location: a function, :class:`torch.device`, string or a dict specifying how to remap storage
                locations.

        Returns: The loaded checkpoint.

        Raises
        ------
            FileNotFoundError: If ``path`` is not found by the ``fsspec`` filesystem

        r   rY   )r   NzT`map_location` argument is not supported for `MegatronCheckpointIO.load_checkpoint`.zCheckpoint file not found: z6Distributed checkpoints should be a directory. Found: .Fr[   r>   zarr)TensorStoreLoadShardedStrategyT)rK   with_context_parallelUsing z dist-ckpt load strategy.)r]   r^   r_   r   z Global Checkpoint Load : Rank : z : Start time : rb   z#s : Time spent in load_checkpoint: rc   )#rn   rZ   +megatron.core.dist_checkpointing.validationr   r*   r   existsFileNotFoundErrorisdirrG   r)   r>   rB   rJ   rK   7megatron.core.dist_checkpointing.strategies.tensorstorer   rQ   r   r   r   r   ri   r?   booladjust_non_strict_loadASSUME_OK_UNEXPECTEDLOG_ALLrw   loadr:   _fix_tensors_devicers   rt   ru   )rT   rW   r]   r   r   rZ   r   r|   r   r_   r   rV   r   durationr.   r.   r/   load_checkpoint   sd   




z$MegatronCheckpointIO.load_checkpointc                 C   s8   t |}||r|j|dd td|  dS dS )ziRemove checkpoint file from the filesystem.

        Args:
            path: Path to checkpoint

        T)	recursivezRemoved checkpoint: N)r   r   rmr   debug)rT   rW   r|   r.   r.   r/   remove_checkpoint9  s
   
z&MegatronCheckpointIO.remove_checkpointc                 C   s   | j dkr
td | jr| j dkrtd| jdu ri nt| jd}| j dkr5|r5t| j dfi |}nt| j d}t	|drD| j
|_| jrX| jrOtd	d
nd}t||| j
}td| d |S )a  Determine the saving strategy based on constructor args.

        Relies on the default MCore strategy unless extra PyT Distributed format arguments
        are passed in config or in case of a fully parallel save in which case
        a parallelization wrapper is applied.
        r   z`zarr` distributed checkpoint backend is deprecated. Distributed optimizer checkpoint saving might be extremely slow. Please switch to PyTorch Distributed format (model.dist_ckpt_format=torch_dist).rI   z9Async dist-ckpt save supported only for torch_dist formatN)thread_count   use_cached_ckpt_structureTr   r   z dist-ckpt save strategy.)rJ   r   rp   rL   r*   rM   r9   r   r   r)   rN   r   rO   rP   r   r   ri   )rT   torch_dist_kwargssave_strategyparallelization_groupr.   r.   r/   "_determine_dist_ckpt_save_strategyF  s(   

z7MegatronCheckpointIO._determine_dist_ckpt_save_strategyr   c                 C   s   | j du r
|  | _ | j S )zF
        initializes (if needed) the sharding strategy and returns itsN)rR   r   )rT   r.   r.   r/   ry   l  s   

z*MegatronCheckpointIO.save_sharded_strategyr]   c                    sp   ddl m} ddlm} ddlm  ||g g dtf fdd}|||\}}t	d  |S )	a.  
        Adjusts the loading of a non-strict sharded checkpoint by filtering out missing keys.

        This function loads the checkpoint's metadata and removes any `ShardedBase` keys from
        `sharded_state_dict` that do not exist in the checkpoint. It also logs unexpected keys
        that were not found in the checkpoint.

        Args:
            path (_PATH): The path to the checkpoint.
            sharded_state_dict (Dict[str, Any]): The state dictionary containing sharded parameters.

        Returns:
            Dict[str, Any]: The adjusted state dictionary with missing keys removed.

        Notes:
            - Keys that exist in `sharded_state_dict` but are not found in the checkpoint metadata
            are considered "unexpected" and are logged.
            - Missing keys are not computed yet. To fully determine missing keys:
            1. Perform an `all_gather_object` operation on `loaded_keys`.
            2. Compute `missing_keys` as the difference between `ckpt_sharded_metadata.keys()`
                and `loaded_keys`.
        r   rY   )extract_matching_values)ShardedBasexc                    s8   t |  r| jv r| j dS | j dS dS )a  
            Helper function to determine if a `ShardedBase` key should be removed.

            Args:
                x (Any): The object to check.

            Returns:
                bool: True if the key should be removed, False otherwise.
            FT)r?   keyappend)r   r   ckpt_sharded_metadataloaded_keysunexpected_keysr.   r/   "should_remove_missing_sharded_base  s   


zWMegatronCheckpointIO.adjust_non_strict_load.<locals>.should_remove_missing_sharded_basezEThe following keys are not in the checkpoint and will not be loaded: )
rn   rZ   +megatron.core.dist_checkpointing.dict_utilsr   (megatron.core.dist_checkpointing.mappingr   load_tensors_metadatar   r   ri   )rT   rW   r]   rZ   r   r   _r.   r   r/   r   t  s   
z+MegatronCheckpointIO.adjust_non_strict_load)rI   TFNFTFFrd   )NNN)r'   r   )r2   r3   r4   r5   r:   r   r	   rj   rU   r   r   r   r   r   r   r   r   r   propertyry   r   r.   r.   r.   r/   rH      s`    		
*<

T&rH   ckptc                    sX   t j sJ t j t j ft jdt j d ddlm}  fdd}||| S )z4Ensure checkpoint tensors are on the correct device.cuda)indexr   )dict_list_map_outplacec                    s*   t | tjr| jr| j kr|  } | S rd   )r?   rs   Tensoris_cudadeviceto)tcur_devr.   r/   _fix_device  s   
z(_fix_tensors_device.<locals>._fix_device)rs   r   is_initializedis_availabler   current_devicer   r   )r   r   r   r.   r   r/   r     s
   "
r   c                 C   s0   ddl m} t| }t|}||o||S )a  Check if the given path corresponds to a distributed checkpoint directory.

    This function determines if the specified path is a directory that contains a distributed
    checkpoint by checking the directory's metadata.

    Args:
        path (Union[str, Path]): The path to check for being a distributed checkpoint.

    Returns
    -------
        bool: True if the path is a distributed checkpoint directory, False otherwise.

    r   rY   )rn   rZ   r   r   r   check_is_distributed_checkpoint)rW   rZ   r^   r|   r.   r.   r/   is_distributed_ckpt  s   r   rV   c                 C   s,   |  di  di  di  di  dd S )Nloopsfit_loopzepoch_loop.batch_progresstotal	completed)get)rV   r.   r.   r/   rv     s   
rv   )Arw   dataclassesr   r   pathlibr   typingr   r   r   r   r	   r
   r   lightning.pytorchpytorchr7   rs   lightning.fabric.pluginsr   #lightning.fabric.utilities.cloud_ior    lightning.fabric.utilities.typesr   .megatron.core.dist_checkpointing.serializationr   r   0megatron.core.dist_checkpointing.strategies.baser   :megatron.core.dist_checkpointing.strategies.fully_parallelr   r   1megatron.core.dist_checkpointing.strategies.torchr   megatron.core.parallel_stater   r   typing_extensionsr   r   nemo.lightning.ckpt_utilsr   r   nemo.lightning.io.capturer   nemo.lightning.io.mixinr   
nemo.utilsr   !nemo.utils.callbacks.dist_ckpt_ior   ImportErrorLightningModuler   Moduler!   r"   r:   rG   rH   r   r   r   rj   rv   r.   r.   r.   r/   <module>   sH   $<  )"