o
    }oiU                  
   @   s  d dl Z d dlmZmZ d dlmZ d dlmZ d dlmZm	Z	m
Z
mZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ zLd dlmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 dZ4W n e5e6fy Z7 zdZ4de7 Z8W Y dZ7[7ndZ7[7ww ede9fddZ:G dd deeZ;G dd  d eZ<G d!d" d"eZ=G d#d$ d$e;Z>d%e	e9ef d&e
e? fd'd(Z@dS ))    N)ABCabstractmethod)contextmanager)time)AnyDictOptionalUnion)CheckpointIO)get_filesystem)_PATH)Callback)_WrappingCheckpointIO)logging)dist_checkpointing)extract_matching_values)ShardedBase)!get_default_load_sharded_strategy!get_default_save_sharded_strategy)tensorstore)AsyncCallsQueueAsyncRequest)SaveShardedStrategy) FullyParallelLoadStrategyWrapper FullyParallelSaveStrategyWrapper)TorchDistSaveShardedStrategy)StrictHandling)get_data_parallel_groupTFzmegatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt. Exact error: namec                 c   sT    t  }zdV  W t|  dt  | dd dS t|  dt  | dd w )z8Simple context manager for timing functions/code blocks.Nz took .3fs)r   r   debug)r   start r#   U/home/ubuntu/.local/lib/python3.10/site-packages/nemo/utils/callbacks/dist_ckpt_io.py_debug_time>   s
   Br%   c                
   @   s>   e Zd ZdZe	d
deeef dede	e ddfdd	Z
dS )AsyncCompatibleCheckpointIOa  CheckpointIO that can be used together with async saving.

    Differs from the regular CheckpointIO only by the `save_checkpoint`
    return type. The `save_checkpoint` method itself is synchronous, but returns
    callbacks that can be performed asynchronously.
    N
checkpointpathstorage_optionsreturnr   c                 C   s   t )zAInterface to implement save_checkpoint and return an AsyncRequest)NotImplementedError)selfr'   r(   r)   r#   r#   r$   save_checkpointP   s   z+AsyncCompatibleCheckpointIO.save_checkpointN)__name__
__module____qualname____doc__r   r   strr   r   r   r-   r#   r#   r#   r$   r&   H   s    
r&   c                	       sx   e Zd ZdZdeddf fddZddeeef de	d	e
e ddfd
dZedddefddZd fddZ  ZS )AsyncFinalizableCheckpointIOa  CheckpointIO wrapper for async checkpoint saving and synchronous finalization.

    Runs main part of the checkpoint save in a separate process (not thread as the PTL
    AsyncCheckpointIO does). Allows to perform a (synchronous) finalization
    function after all ranks finish checkpoint saving.

    NOTE: for correctness, this plugin must be used together with the
    AsyncFinalizerCallback callback which performs the finalization checks.

    Args:
        checkpoint_io (CheckpointIO): wrapped checkpoint_io object. Must be
            of type AsyncCompatibleCheckpointIO.
    Requires the underlying checkpoint_io.save_checkpoint to return save_fn, save_args, finalize_fn.
    checkpoint_ior*   Nc                    s@   t sttt|tstdt| t | t	 | _
d S )Nz)Incompatible wrapped checkpoint_io type: )HAVE_MEGATRON_COREImportErrorIMPORT_ERROR
isinstancer&   
ValueErrortypesuper__init__r   async_calls_queue)r,   r5   	__class__r#   r$   r=   h   s   
z%AsyncFinalizableCheckpointIO.__init__r'   r(   r)   c                 C   sl   |pi  dd}t| jtsJ t| j| j|||}|dur&|| | j|}t	
d|  dS )at  Executes async request returned from the underlying checkpoint_io asynchronously.

        Requires the underlying checkpoint_io.save_checkpoint to return an AsyncRequest.
        It is then applied with `self.async_calls_queue` asynchronously.

        Args:
            checkpoint (Dict[str, Any]): checkpoint to save. Passed to underlying
                checkpoint_io without modifications.
            path (_PATH): path to save the checkpoint. Passed to underlying
                checkpoint_io without modifications.
            storage_options (Any, optional): storage control modifiers. This class
                consumed the `finalize_fn` parameter (if any), which is expected to be
                a callback and is appended to async finalization functions.

        Applies underlying checkpoint_io finalize callback first, then the external one (postfix order).
        finalize_fnNzScheduled an async call #)popr9   r5   r&   r;   r-   add_finalize_fnr>   schedule_async_requestr   r!   )r,   r'   r(   r)   external_finalize_fnasync_requestcall_idxr#   r#   r$   r-   q   s   
z,AsyncFinalizableCheckpointIO.save_checkpointz;AsyncFinalizableCheckpointIO.maybe_finalize_save_checkpointFblockingc                 C   sl   | j  dkr	dS t }| j |}|r!tddd |D   t }td|| dd t|dkS )	a&  Performs checkpoint finalization (if possible).

        Args:
            blocking (bool, optional): if True, waits until all async saves are
                completed. Otherwise, finalizes only those async calls which are
                already done on all ranks. Defaults to False.
        r   FzFinalized async calls: c                 S   s   g | ]}d | qS )#r#   ).0idxr#   r#   r$   
<listcomp>   s    zOAsyncFinalizableCheckpointIO.maybe_finalize_save_checkpoint.<locals>.<listcomp>zAsync finalization time took r   z s)r>   get_num_unfinalized_callsr   maybe_finalize_async_callsr   r!   infolen)r,   rH   
start_timecall_idx_finalizedend_timer#   r#   r$   maybe_finalize_save_checkpoint   s   	c                    s*   t    | j dkrtd dS dS )z0Warns if there are any pending checkpoint saves.r   z<Some async checkpoint saves might be not finalized properly.N)r<   teardownr>   rM   r   warningr,   r?   r#   r$   rU      s   
z%AsyncFinalizableCheckpointIO.teardownr.   F)r*   N)r/   r0   r1   r2   r&   r=   r   r3   r   r   r   r-   r%   boolrT   rU   __classcell__r#   r#   r?   r$   r4   X   s    (	r4   c                   @   s<   e Zd ZdZdddZddd	Zdd
dZdefddZdS )AsyncFinalizerCallbackzCallback which finalizes async saves initiated by the AsyncFinalizableCheckpointIO.

    Tries to perform non-blocking finalization on train_batch_end and train_epoch_end.
    On train_end performs a blocking finalization of all pending checkpoints.
    trainer
pl.Trainerr*   Nc                 O      |  |jdd dS >Override hook to finalize pending checkpoint(s) if they exist.FrH   N_get_checkpoint_iorT   r,   r\   argskwargsr#   r#   r$   on_train_batch_end      z)AsyncFinalizerCallback.on_train_batch_endc                 O   r^   r_   rb   rd   r#   r#   r$   on_train_epoch_end   rh   z)AsyncFinalizerCallback.on_train_epoch_endc                 O   s8   |  |}|j dkrtd |  |jdd dS )r`   r   zAPending async checkpoint saves. Finalizing them synchronously nowTra   N)rc   r>   rM   r   rO   rT   )r,   r\   re   rf   r5   r#   r#   r$   on_train_end   s   

z#AsyncFinalizerCallback.on_train_endc                 C   s&   |j j}t|tstd|j |S )Nz@Async finalizer requires an async compatible CheckpointIO, got: )strategyr5   r9   r4   r:   r@   )r,   r\   r5   r#   r#   r$   rc      s   

z)AsyncFinalizerCallback._get_checkpoint_io)r\   r]   r*   N)	r/   r0   r1   r2   rg   ri   rj   r4   rc   r#   r#   r#   r$   r[      s    


r[   c                       sN  e Zd ZdZ								d*dededed d	ed
ee dedededef fddZe	d+de
d	efddZed	d,deeef dedee ded fddZed				d-dedee deeef dededf dee deeef fd dZdedeeef fd!d"Zed#deddfd$d#Zed.d&d'Zd(d) Z  ZS )/DistributedCheckpointIOa  CheckpointIO for a distributed checkpoint format.

    Args:
        save_ckpt_format (str): Distributed checkpoint format to use for checkpoint saving.
        load_directly_on_device (bool, optional): if True, loads the weights directly
            on GPU. Has effect only for `zarr` based checkpoints (PyT Distributed
            always loads on device). Defaults to True.
        load_strictness (StrictHandling, optional): defines loading strictness.
            If not None, overwrites the `strict` flag passed to `load_checkpoint`.
            Defaults to None.
        async_save (bool): whether to save asynchronously. Should be set to True if
            this class will be wrapped with AsyncFinalizableCheckpointIO.
        torch_dist_multiproc (int, optional): number of extra processes per rank
            used during ckpt save with PyTorch distributed format. Defaults, to None
            which means using an MCore default (2).
        parallel_save (bool): parallelizes the save across ranks. Defaults to True
        parallel_load (bool): parallelizes the load across ranks (followed by params all gather).
            Defaults to False due to some extra memory usage requirement.
    TNFsave_ckpt_formatload_directly_on_deviceload_strictnessr   
async_savetorch_dist_multiprocassume_constant_structureparallel_saveparallel_save_within_dpparallel_loadc
           
         s\   t    tstt|| _|| _|| _|| _|| _	|| _
|| _|| _|	| _d | _d| _d S )NF)r<   r=   r6   r7   r8   rm   rn   ro   rp   rq   rr   rs   rt   ru   _save_sharded_strategyvalidated_consistency)
r,   rm   rn   ro   rp   rq   rr   rs   rt   ru   r?   r#   r$   r=      s   

z DistributedCheckpointIO.__init__	model_cfgc                 C   sP   | | dd| dd| dd|| dd| dd	| d
d	| dd	dS )ak  Instantiates a DistributedCheckpointIO from a config dict.

        Args:
            model_cfg (dict): model config dict. Most of the configuration
                is extracted from this config.
            async_save (bool, optional): async_save flag is not part of the model config,
                it should be provided separately. Defaults to False.
        dist_ckpt_format
torch_distdist_ckpt_load_on_deviceTdist_ckpt_load_strictnessNdist_ckpt_torch_dist_multiprocdist_ckpt_parallel_saveF!dist_ckpt_parallel_save_within_dpdist_ckpt_parallel_load)rm   rn   ro   rp   rq   rs   rt   ru   get)clsrx   rp   r#   r#   r$   from_config   s   







z#DistributedCheckpointIO.from_configz'DistributedCheckpointIO.save_checkpointr'   r(   r)   r*   r   c                    s   t }|jdd | jo| j }d| _tj }t| t }t	j
|| j|| jd}t }	dd|  dur>d  ndd|d	d
d|	| d	d
f}
ddd |
D }t|  fdd}| jrs|dusnJ || |S )a>  Saves a distributed checkpoint. Creates the checkpoint root directory if doesn't exist.

        Args:
            checkpoint (Dict[str, Any]): sharded state dict to save
            path (_PATH): checkpoint directory
            storage_options (Any, optional): Optional parameters when saving the checkpoint
        T)exist_ok)sharded_state_dictcheckpoint_dirsharded_strategyvalidate_access_integrityasync_sharded_savezGlobal Checkpoint SavezRank: NzIteration: zStart time: r   r    zSave duration: z : c                 s   s    | ]	}|d ur|V  qd S r.   r#   )rJ   partr#   r#   r$   	<genexpr>/  s    z:DistributedCheckpointIO.save_checkpoint.<locals>.<genexpr>c                      s    t dt dd  d S )Nz-Successfully saved checkpoint from iteration 7dz to )r   rO   intr#   	iterationr(   r#   r$   iter_finalize_fn2  s    zADistributedCheckpointIO.save_checkpoint.<locals>.iter_finalize_fn)r   makedirsrw   rr   torchdistributedget_rank_get_iteration_from_checkpointr   r   savesave_sharded_strategyrp   joinr   rO   rC   )r,   r'   r(   r)   fsvalidate_sharding_integrityrankrQ   async_save_requestrS   	log_partslog_messager   r#   r   r$   r-     s8   


z'DistributedCheckpointIO.load_checkpointmap_locationr   strictr   c              	   C   s.  |du rt d|durt d| jdkr| jrtjdd}nd}| jr4|du r,t|}t|tdd}|durAt	
d| d	 t|trV|sN| ||}|rStjntj}| jdur^| j}|du retj}t	d
|  t }tj|||||d}t }	|	| }
t	
dtj  d|dd|
dd |S )a\  Loads a distributed checkpoint.

        Args:
            path (_PATH): checkpoint directory
            map_location (Any, optional): required to be None in this implementation
            sharded_state_dict (Dict[str, Any], optional): state dict which
                defines the loading procedure for the distributed checkpoint.
                Defaults to None to comply with the CheckpointIO interface,
                but it's a required argument.
            strict (bool, StrictHandling, optional): adjust load strictness. bool value
                is translated to StrictHandling instance. Gets overwritten by
                `self.load_strictness`. Defaults to None. If `self.load_strictness`
                is also None, strict becomes StrictHandling.ASSUME_OK_UNEXPECTED.

        Returns:
            Dist[str, Any]: loaded checkpoint.
        NzWDistributedCheckpointIO requires passing sharded_state_dict argument to load_checkpointz;DistributedCheckpointIO doesnt handle map_location argumentzarrT)rn   with_context_parallelUsing z dist-ckpt load strategy.zDist ckpt load strictness: )r   r   r   r   r   z Global Checkpoint Load : Rank : z : Start time : r   z#s : Time spent in load_checkpoint: r    )r:   rm   rn   r   TensorStoreLoadShardedStrategyru   r   r   r   r   rO   r9   rY   adjust_non_strict_loadr   ASSUME_OK_UNEXPECTEDLOG_ALLro   r!   r   r   loadr   r   r   )r,   r(   r   r   r   r   r   rQ   retrS   durationr#   r#   r$   load_checkpoint;  sX   


c                    sJ   t | g g dtf fdd}t||\}}td  |S )z=Remove unexpected keys from being loaded into the state dict.xc                    s8   t | tr| j v r| j dS | j dS dS )NFT)r9   r   keyappend)r   ckpt_sharded_metadataloaded_keysunexpected_keysr#   r$   "should_remove_missing_sharded_base  s   

zZDistributedCheckpointIO.adjust_non_strict_load.<locals>.should_remove_missing_sharded_basezEThe following keys are not in the checkpoint and will not be loaded: )r   load_tensors_metadatar   r   r   rO   )r,   r(   r   r   _r#   r   r$   r     s   

z.DistributedCheckpointIO.adjust_non_strict_loadz)DistributedCheckpointIO.remove_checkpointc                 C   s   t j|dd dS )zRemove a distributed checkpoint.

        Due to potentially large number of files, the implementation remove the whole directory at once.
        T)ignore_errorsN)shutilrmtree)r,   r(   r#   r#   r$   remove_checkpoint  s   r   c                 C   s   | j du r
|  | _ | j S )zHConditionally initialize and get the sharded strategy to use for saving.N)rv   "_determine_dist_ckpt_save_strategyrW   r#   r#   r$   r     s   

z-DistributedCheckpointIO.save_sharded_strategyc                 C   s   | j dkr
td | jr| j dkrtd| jdu ri nt| jd}| j dkr5|r5t| j dfi |}nt| j d}t	|drD| j
|_| jrX| jrOtd	d
nd}t||| j
}td| d |S )a  Determine the saving strategy based on constructor args.

        Relies on the default MCore strategy unless extra PyT Distributed format arguments
        are passed in config or in case of a fully parallel save in which case
        a parallelization wrapper is applied.
        r   z`zarr` distributed checkpoint backend is deprecated. Distributed optimizer checkpoint saving might be extremely slow. Please switch to PyTorch Distributed format (model.dist_ckpt_format=torch_dist).rz   z9Async dist-ckpt save supported only for torch_dist formatN)thread_count   use_cached_ckpt_structureTr   r   z dist-ckpt save strategy.)rm   r   rV   rp   r:   rq   dictr   r   hasattrrr   r   rs   rt   r   r   rO   )r,   torch_dist_kwargssave_strategyparallelization_groupr#   r#   r$   r     s(   

z:DistributedCheckpointIO._determine_dist_ckpt_save_strategy)TNFNFFFFrX   r.   )NNNT)r*   r   )r/   r0   r1   r2   r3   rY   r   r   r=   classmethodr   r   r%   r   r   r   r-   r	   r   r   r   propertyr   r   rZ   r#   r#   r?   r$   rl      s    	

.

Nrl   r'   r*   c                 C   s,   |  di  di  di  di  dd S )Nloopsfit_loopzepoch_loop.batch_progresstotal	completedr   )r'   r#   r#   r$   r     s   
r   )Ar   abcr   r   
contextlibr   r   typingr   r   r   r	   lightning.pytorchpytorchplr   lightning.fabric.pluginsr
   #lightning.fabric.utilities.cloud_ior    lightning.fabric.utilities.typesr   r   $lightning.pytorch.plugins.io.wrapperr   
nemo.utilsr   megatron.corer   +megatron.core.dist_checkpointing.dict_utilsr   (megatron.core.dist_checkpointing.mappingr   .megatron.core.dist_checkpointing.serializationr   r   +megatron.core.dist_checkpointing.strategiesr   7megatron.core.dist_checkpointing.strategies.async_utilsr   r   0megatron.core.dist_checkpointing.strategies.baser   :megatron.core.dist_checkpointing.strategies.fully_parallelr   r   1megatron.core.dist_checkpointing.strategies.torchr   +megatron.core.dist_checkpointing.validationr   megatron.core.parallel_stater   r6   r7   ModuleNotFoundErrorer8   r3   r%   r&   r4   r[   rl   r   r   r#   r#   r#   r$   <module>   sV   
	N  "