o
    }oi                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZmZmZmZ d dlZd dlmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dl m!Z! d dl"m#Z# G dd deZdS )    N)	timedelta)Path)AnyDictIterableListLiteralOptionalUnion)proxy)ModelCheckpoint)_is_local_file_protocol)rank_zero_info)ckpt_to_dir)TrainerContext)logging)AppStatec                       s  e Zd ZdZdZ													dYd	ee d
edeeee	d f  de
dedede
dee
 dee dee dee dedef fddZ fddZdd Zdd Z fdd Zd!eeef d"df fd#d$ZdZ fd%d&Z fd'd(Zd)ed"dfd*d+Zd[d.d/Zed0eeef d"efd1d2Zed0eeef d"efd3d4Zed\d0eeef d"dfd5d6Zed\d0eeef d"dfd7d8Zd]d)ed,d-d9ed"efd:d;Zd,d<d"eeej f f fd=d>Z!d\d,d<d)ed?ed"df fd@dAZ"d,d-d)ed"df fdBdCZ#d,d-d)edDe
fdEdFZ$d\d,d-d)ed"df fdGdHZ%d)ed"efdIdJZ&dKe'e d"efdLdMZ(d)eeef d"efdNdOZ)e*d"e'e fdPdQZ+edReeef d"dfdSdTZ,d,d<dUedVed"efdWdXZ-  Z.S )^r   a	  Light wrapper around Lightning's ModelCheckpoint to force a saved checkpoint on train_end.
    Adds support for asyncronous checkpointing and provides some additional logic to clean up invalid checkpoints

    Args:
        monitor: Metric to monitor when saving top-k checkpoints.
        verbose: Verbosity mode.
        save_last: When ``True``, saves a `*-last` copy whenever a checkpoint file gets saved.
        save_top_k: When ``True``, saves the top-k checkpoints according to ``monitor``.
        save_weights_only:  if ``True``, then only the model's weights will be saved. Optimizer states will
            be omitted from all checkpoints.
        mode: One of {min, max}. Whether the objective is to minimize or maximize the monitored quantity.
        every_n_epochs: Number of epochs between checkpoints.
        every_n_train_steps: Number of train steps between checkpoints.
        train_time_interval: After each interval, monitor checkpoints. Not to be used with
            ``every_n_epochs`` or ``every_n_train_steps``.
        save_on_train_epoch_end: Whether to run checkpointing at the end of the training epoch
        save_optim_on_train_end: Whether to include the optimizer states in the final checkpoint
            at the end of training. Only applicable when save_weights_only is ``False``.
        always_save_context: Whether to dump the artifacts needed to reinintialize the current
            model, trainer, and dataloader to allow for reproducibility of experiments.
        save_context_on_train_end: Whether to dump the artifacts on_train_end regardless of whether
            ``always_save_context`` is ``True``.
        async_save: Whether to enable asynchronous checkpointing.

    Attributes:
        UNFINISHED_CHECKPOINT_SUFFIX (str): Suffix for unfinished checkpoint files.
        deferred_ckpts_to_remove (List[List[str]]): List of deferred checkpoints
            to remove once async save is completed.
        ckpts_to_link (Dict[str, str]): Dictionary of checkpoint paths that need to be symlinked.
        future_last_model_path (str): Path to the future 'last' checkpoint, used for symbolic linking.
        best_k_models (dict): Dictionary of best-k checkpoints based on the monitored metric.
        best_model_score (float): Score of the best checkpoint.
        best_model_path (str): Path to the best checkpoint.
        kth_best_model_path (str): Path to the kth best checkpoint.
    z-unfinishedval_lossT   FminNmonitorverbose	save_lastlink
save_top_ksave_weights_onlymodeevery_n_epochsevery_n_train_stepstrain_time_intervalsave_on_train_epoch_endsave_optim_on_train_endalways_save_contextsave_context_on_train_endc                    sP   || _ || _|| _d| _g | _i | _t jd|||||||||	|
d
| d S )N )
r   r   r   r   r   r   r   r   r   r     )r"   r#   r!   future_last_model_pathdeferred_ckpts_to_removeckpts_to_linksuper__init__)selfr   r   r   r   r   r   r   r   r   r    r!   r"   r#   kwargs	__class__r%   e/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/pytorch/callbacks/model_checkpoint.pyr*   K   s(   
zModelCheckpoint.__init__c                    s"  ddl m}m} ddlm} ddlm} t }| jdkr(|j	r(t
d |   | r|j}|j}	t|	dkrot|d}
d}|
D ]
}| rL|d7 }qBtt|d	|  }| so|  |	D ]}tt|t| qb|jr|jD ]}t|}t||j }| st|| qu|jr|d
 }| st|ddd}|d|j W d   n1 sw   Y  | \}}|r|d }| st|ddd}|d| d ||  W d   n1 sw   Y  t
|d  ||d |d  t j!" rt j!#  t$ %|| dS )a'  
        Initializes checkpointing by handling previous runs,
        setting up file logging, and managing files to move or copy.

        This method handles:
        - Moving old files to new folders
        - Copying relevant files to the log directory
        - Creating command argument and git information logs
        - Setting up logging for errors and Lightning logs

        Args:
            trainer (pl.Trainer): The PyTorch Lightning trainer object.
            pl_module (pl.LightningModule): The Lightning model to be trained.
        r   )get_git_diffget_git_hashis_global_rank_zero)add_filehandlers_to_pl_loggerzChecking previous runszrun_*   run_zcmd-args.logwzutf-8)encoding Nzgit-info.logzcommit hash: 
znemo_error_log.txtzlightning_logs.txt)&nemo.utils.exp_managerr0   r1   nemo.utils.get_rankr3   !nemo.utils.lightning_logger_patchr4   r   r   restorer   debugnemo_topk_check_previous_runlog_dirfiles_to_movelenr   globis_direxistsmkdirshutilmovestrfiles_to_copynamecopycmd_argsopenwritejoinadd_err_file_handlertorchdistributedis_initializedbarrierr)   on_train_start)r+   trainer	pl_moduler0   r1   r3   r4   	app_staterB   rC   other_run_dirs	run_countfoldnew_run_dir_filesrc_pathdst_pathcmd_args_filegit_repogit_hashgit_info_filer-   r%   r/   rX   |   s`   



zModelCheckpoint.on_train_startc                    s  z j   j  j  j W n ty   tdw i  _ d _d _d _t fdd jD }|D ]Q}t|}|dd dksJ|dd d	krKq4| j	t
 j	 d
 }|t
 j	krtd||d }|rx||||  d
  }n||d }t| j |< q4t
 j d
k rdS  jdkrdnd}t j  j j|d}t
| j }td|}td|    j}	t|D ]-}
|d} j |  | |	r j |r  | td|  q|d  _|d  _ j  j  _dS )a  
        Verifies and cleans up the top-k checkpoint state from previous training runs.

        This method ensures that:
        - The top-k models are correctly loaded and ordered.
        - Any outdated or invalid checkpoints are removed.
        - The best model is determined based on the monitored metric.

        Raises:
            AttributeError: If the expected attributes for the top-k model are not found.
        zTLightning's ModelCheckpoint was updated. NeMo's ModelCheckpoint will need an update.r$   Nc                 3   s    | ]
}  |s|V  qd S N_is_ema_filepath).0pathr+   r%   r/   	<genexpr>   s    z?ModelCheckpoint.nemo_topk_check_previous_run.<locals>.<genexpr>i
-last.ckpt-lastr6   z[A-z]r   FTkeyreverser   zNumber of models to delete: r5   zRemoved checkpoint: )best_k_modelskth_best_model_pathbest_model_scorebest_model_pathAttributeErrorlist_saved_checkpoint_pathsrK   findr   rD   researchstartfloatr   sortedgetr   maxr   r@   _has_ema_ckptsrangepop_del_model_without_trainer_fsrG   _ema_format_filepath)r+   checkpoints
checkpointindexmatchvalue_reversert   models_to_deleteema_enabled_modelr%   rl   r/   rA      sZ   
 




z,ModelCheckpoint.nemo_topk_check_previous_runc                    s   dt dtffdd  fddj D _tjdkrHjdk}tjjj|d	}|d
 _jj _	|d _
jj
 _dS d_d_	d_
d_dS )a  
        Removes invalid (incomplete or non-existing) checkpoints from the list of top-k checkpoints.

        This function is necessary when checkpointing might have been abruptly interrupted, leaving behind
        incomplete or corrupted checkpoints. The invalid checkpoints are identified by checking if their
        corresponding directory exists and if the checkpoint is not unfinished.

        After removing invalid entries, the method updates the best-k models based on the existing, valid checkpoints.

        Attributes Updated:
            - `best_k_models`: A dictionary of valid checkpoints from top-k models.
            - `best_model_path`: Path to the best model based on the current sorting order.
            - `best_model_score`: The score associated with the best model.
            - `kth_best_model_path`: Path to the kth best model.
            - `kth_value`: The score associated with the kth best model.
        	ckpt_pathreturnc                    s"   t j| d}|o |  S )N.ckpt)osrk   isdirremovesuffixis_checkpoint_unfinished)r   rG   rl   r%   r/   __is_ckpt_ok"  s   zGModelCheckpoint._remove_invalid_entries_from_topk.<locals>.__is_ckpt_okc                    s   i | ]\}} |r||qS r%   r%   )rj   kv)_ModelCheckpoint__is_ckpt_okr%   r/   
<dictcomp>&  s    zEModelCheckpoint._remove_invalid_entries_from_topk.<locals>.<dictcomp>r   r   rq   r5   r$   N)rK   boolrt   itemsrD   r   r   r   ru   	kth_valuerw   rv   )r+   reverse_arrbest_k_models_arrr%   )r   r+   r/   !_remove_invalid_entries_from_topk  s   



z1ModelCheckpoint._remove_invalid_entries_from_topkc                    s"   t   }| jdkr| j|d< |S )a  
        Returns the state dictionary of the model.

        This function adds additional logic to handle the case when using symlinks. If the model is configured
        to save the last checkpoint as a symlink, the path to the last checkpoint is updated in the returned
        state dictionary to avoid off-by-one errors in the checkpointing system.

        Returns:
            Dict[str, Any]: The state dictionary of the model, including any necessary modifications for symlinks.
        r   last_model_path)r)   
state_dictr   r&   )r+   stater-   r%   r/   r   4  s   


zModelCheckpoint.state_dictr   r   c                    s   t  | |   dS )ay  
        Loads the state dictionary into the model and removes invalid entries from the top-k checkpoints.

        This method ensures that after loading the model state, any invalid (incomplete or missing) checkpoints
        are removed from the top-k models list.

        Args:
            state_dict (Dict[str, Any]): The state dictionary to load into the model.
        N)r)   load_state_dictr   )r+   r   r-   r%   r/   r   E  s   
zModelCheckpoint.load_state_dictc                    sj   ddl m} | rtd t| j tj	 rtj
  t|jdd| _t j|g|R i | dS )aQ  
        Initializes the model and removes any unfinished checkpoints before training.

        This method is responsible for ensuring that unfinished checkpoints are removed prior to starting the training.
        It also synchronizes all ranks in a distributed setting to ensure that unfinished checkpoints are removed
        across all ranks.

        Args:
            trainer: The trainer instance used for training.
            *args: Additional arguments passed to the parent setup method.
            **kwargs: Additional keyword arguments passed to the parent setup method.
        r   r2   z)Removing unfinished checkpoints if any...
async_saveFN)r=   r3   r   r@   r   _remove_unfinished_checkpointsdirpathrT   rU   rV   rW   getattrstrategyr   r)   setup)r+   rY   argsr,   r3   r-   r%   r/   r   R  s   


zModelCheckpoint.setupc                    s   ddl m} |jrdS | jrq|jdkrqd}t|jtr%|j|j dkr%d}t|jtr5|j|j dkr5d}|rX| 	|}| j
| || jkrQtd| j
 d nt || | jrq| jsq| rqt|jt| j
d d	gd
 t || dS )a  
        Handles actions to be performed when training ends, such as saving the last checkpoint.

        This method ensures that the last checkpoint is saved if needed, particularly when validation steps
        aren't always run based on the interval. It also manages saving the training context to disk, if configured.

        Args:
            trainer: The trainer instance used for training.
            pl_module: The model being trained.
        r   r2   NFTzLast checkpoint z already savedcontextr   
yaml_attrs)r=   r3   fast_dev_runr   val_check_interval
isinstancer   global_stepint_monitor_candidatesr   format_checkpoint_nameCHECKPOINT_NAME_LASTr   r@   r)   _save_last_checkpointr#   r"   r   from_trainerio_dumpr   on_train_end)r+   rY   rZ   r3   should_save_last_checkpointmonitor_candidatesr-   r%   r/   r   k  s&   

zModelCheckpoint.on_train_endfilepathc                 C   s|   ddl m} t|}| r0zt|}tj|dd td|  W n   td| d Y tj	
 r<tj	  dS dS )	a  
        Deletes the checkpoint model directory from distributed storage without requiring the trainer.

        This method ensures that distributed checkpoints are properly removed when necessary, especially
        if the model file is no longer needed or is incomplete. The removal only happens on the rank-zero process.

        Args:
            filepath (str): The path to the checkpoint model file to be deleted.
        r   r2   T)ignore_errorsz Removed distributed checkpoint: z(Tried to remove distributed checkpoint: z but failed.N)r=   r3   r   r   rI   rmtreer   inforT   rU   rV   rW   )r+   r   r3   	dist_ckptr%   r%   r/   r     s   
z*ModelCheckpoint._del_model_without_trainerrY   lightning.pytorch.Trainerc                 C   s.   ddl m} d}|jD ]	}t||r|}q|S )a#  
        Retrieves the Exponential Moving Average (EMA) callback from the list of trainer callbacks.

        This method scans through the list of callbacks attached to the trainer and returns the EMA callback
        instance if present. The EMA callback is often used to track the exponential moving average of model parameters
        during training.

        Args:
            trainer ('lightning.pytorch.Trainer'): The trainer instance.

        Returns:
            EMA: The EMA callback instance if found, or None if not present.
        r   )EMAN)!nemo.collections.common.callbacksr   	callbacksr   )r+   rY   r   ema_callbackcallbackr%   r%   r/   _ema_callback  s   

zModelCheckpoint._ema_callbackcheckpoint_pathc                 C   s&   t | d}|d}t|tj S )a  Format the path to the unfinished checkpoint marker file.

        If the marker file exists, corresponding checkpoint is considered unfinished/incomplete.
        NOTE: Marker path for the EMA checkpoint part is the same as for the original checkpoint.

        Args:
            checkpoint_path: Path to the checkpoint file or dir.
              Does not need to exist.

        Returns:
            Path to the unfinished checkpoint marker file.
        r   -EMA)rK   r   r   r   UNFINISHED_CHECKPOINT_SUFFIX)r   marker_filepathr%   r%   r/   (format_checkpoint_unfinished_marker_path  s   
z8ModelCheckpoint.format_checkpoint_unfinished_marker_pathc                 C   s   t |  S )zCheck if the checkpoint is unfinished.

        Args:
            checkpoint_path: Path to the checkpoint file or dir.
              Does not need to exist.

        Returns:
            True if the checkpoint is unfinished, False otherwise.
        )r   r   rG   )r   r%   r%   r/   r     s   z(ModelCheckpoint.is_checkpoint_unfinishedc                 C   sX   ddl m} | rt| }|jjddd |  |r(tj	 r*tj
  dS dS dS )a  Marks given checkpoint as unfinished.

        Args:
            checkpoint_filepath: Path to the checkpoint file or dir.
              Does not need to exist.
            barrier_after: Synchronize ranks after writing the marker file.
              Defaults to False.
        r   r2   T)parentsexist_okN)r=   r3   r   r   parentrH   touchrT   rU   rV   rW   )r   barrier_afterr3   marker_pathr%   r%   r/    set_checkpoint_unfinished_marker  s   

z0ModelCheckpoint.set_checkpoint_unfinished_markerc                 C   sd   ddl m} z%|rtj rtj  | r&t| }| r)|	  W dS W dS W dS    Y dS )a  Clear unfinished marker for given checkpoint.

        Args:
            checkpoint_path: Path to the checkpoint file or dir.
              Does not need to exist.
            barrier_before: Synchronize ranks before removing the marker file.
              Defaults to False.
        r   r2   N)
r=   r3   rT   rU   rV   rW   r   r   rG   unlink)r   barrier_beforer3   r   r%   r%   r/   #remove_checkpoint_unfinished_marker  s   


z3ModelCheckpoint.remove_checkpoint_unfinished_markercheck_dist_ckptc                 C   s0   | j |p|o| j tt|}|j|S )zLChecks if a file or a file without a suffix (distributed checkpoint) exists.)r   rG   rK   r   r   	broadcast)r+   r   rY   r   rG   r%   r%   r/   file_exists  s   $zModelCheckpoint.file_existsz
pl.Trainerc                    s   t  |}ddlm} ddlm} td| j}dD ])}||v s&|| j	krD||vr6t
jdt
j d||< t|j|rD||| dd	 q|S )
z(Broadcast loss from last pipeline stage.r   )_sync_from_last_pipeline_stage)MegatronStrategyz[\{](.*?)[:\}])reduced_train_lossg        )deviceT)r   )r)   r   nemo.lightning._strategy_libr   3nemo.lightning.pytorch.strategies.megatron_strategyr   r|   findallfilenamer   rT   tensorcudacurrent_devicer   r   )r+   rY   r   r   r   keys	loss_namer-   r%   r/   r     s   z#ModelCheckpoint._monitor_candidateslinkpathc                    sx   t t|ddt t|k}|s| || dS | jr*|s*t || jt |< dS t|}t|}t ||| dS )zCheck to see whether this step has already been saved as top_k
        in which case we can create a symlink
        otherwise, we have to save the checkpoint
        rp   r$   N)rK   r   replace_save_checkpointr   r(   r)   _link_checkpoint)r+   rY   r   r   override_asyncsaved_current_stepr-   r%   r/   r     s    
z ModelCheckpoint._link_checkpointc           
         s  ddl m} | j|dd | |}|j| _| jdkr2tt|| _	tt|
ds2|  j	d7  _	|dur| jr=td	|| t || W d   n1 sTw   Y  ||% td
|  | |}| jrutd
|  t || W d   n1 sw   Y  | j|dd dS | jp| j o|j|jk}| |||j}| jr|jj}ddlm} t||stdt|d}	| j g  nd}	|j!|||	d | j"r| rt#$|j%t|d dgd | jr|| _&t'(d|  dS |  dS )a`  Saves the checkpoint to the given filepath

        Args:
            trainer (lightning.pytorch.Trainer): the trainer obj
            filepath (str): path to save checkpoint to.

        Raises:
            ValueError: (mcore) async_save with EMA not supported
            ValueError: (mcore) Async save requires async compatible CheckpointIO
        r   r2   Tr   r   lastrn   Nz!async_save with EMA not supportedz*Saving EMA weights to separate checkpoint r   )AsyncFinalizableCheckpointIOz1Async save requires async compatible CheckpointIO)finalize_fn)storage_optionsr   r   r   z$Scheduled async checkpoint save for ))r=   r3   r   r   r   _last_global_step_savedr   rK   r   r&   endswithr   
ValueErrorsave_original_optimizer_stater)   r   save_ema_modelr   r   r   r   r   r!   	max_steps&_get_finalize_save_checkpoint_callbackr   checkpoint_io!nemo.utils.callbacks.dist_ckpt_ior   r   dictr'   appendsave_checkpointr"   r   r   r   _last_checkpoint_savedr   r   )
r+   rY   r   r3   r   r   r   r   r   r   r-   r%   r/   r   /  sR   





z ModelCheckpoint._save_checkpointr   c                    s    fdd}|S )zLCreates a callback that can be used to finalize async (and sync) ckpt saves.c                     s   t d d    _jrjD ]	} | t qj dd js*d S t 	d d  d t
 jv rJj j dd jsOJ jd	}t d
|  |D ]
}j|dd q_d S )Nz"Finalize callback called for step z, filepath Tr   zAsync checkpoint save for step z (z) finalized successfully.)r   r   zCheckpoints to remove: )r   r@   r  is_global_zerologgersafter_save_checkpointr   r   r   r   rK   r(   r   r   r'   _remove_checkpoint)loggerckpts_to_removeckpt_to_remover   r   r+   rY   r%   r/   _cb  s"   

zCModelCheckpoint._get_finalize_save_checkpoint_callback.<locals>._cbr%   )r+   rY   r   r   r  r%   r  r/   r   |  s   z6ModelCheckpoint._get_finalize_save_checkpoint_callbackc              
      s  | j r|st| jdkr| jg  | jd | dS | j|dd z	t || W n tyI } zt	d| d|  W Y d}~nd}~ww | 
|}|dur| |}z	t || W n ty~ } zt	d| d|  W Y d}~nd}~ww | j|dd dS )	zPerforms checkpoint removal.

        With async save, `self._remove_checkpoint` is called before the checkpoint
        is actually finished so we can't remove it. Instead we add it to
        `self.deferred_ckpts_to_remove` for future removal.
        r   r5   NTr   zJError removing checkpoint, common if doing manual cleanup and restarting: z: r   )r   rD   r'   r  r   r)   r  	Exceptionr   warningr   r   r   )r+   rY   r   r   er   r-   r%   r/   r    s4   


z"ModelCheckpoint._remove_checkpointc                 C   s   | | jd| j S )zFormats given path for EMA checkpoint

        Args:
            filepath (str): filepath

        Returns:
            str: EMA-formatted filepath
        r   )r   FILE_EXTENSIONr+   r   r%   r%   r/   r        	z$ModelCheckpoint._ema_format_filepathr   c                    s   t  fdd|D S )zCheckes whether filepaths are EMA-formatted

        Args:
            checkpoints (Iterable[Path]): paths to check

        Returns:
            bool: True indicates path is EMA-formatted.
        c                 3   s    | ]}  |V  qd S rg   rh   )rj   r   rl   r%   r/   rm     s    z1ModelCheckpoint._has_ema_ckpts.<locals>.<genexpr>)any)r+   r   r%   rl   r/   r     r  zModelCheckpoint._has_ema_ckptsc                 C   s   t |d| j S )zCheckes whether filepaths are EMA-formatted

        Args:
            filepath (Union[Path, str]): path to check

        Returns:
            bool: True indicates path is EMA-formatted.
        r   )rK   r   r  r  r%   r%   r/   ri     r  z ModelCheckpoint._is_ema_filepathc                    s\   dd t  jdD }|rt fdd|S dd t  jdD }t fdd|S )	a  
        Retrieves a list of saved checkpoint paths while filtering out unfinished checkpoints.

        - If distributed checkpoints (directories) exist, return only those.
        - Otherwise, return individual checkpoint files with a .ckpt extension.
        - Filters out any checkpoints that are marked as unfinished.

        Returns:
            Iterable[Path]: An iterable containing valid checkpoint paths.
        c                 S   s   g | ]}|  r|qS r%   )rF   rj   dr%   r%   r/   
<listcomp>  s    z;ModelCheckpoint._saved_checkpoint_paths.<locals>.<listcomp>*c                         |  S rg   r   prl   r%   r/   <lambda>      z9ModelCheckpoint._saved_checkpoint_paths.<locals>.<lambda>c                 S   s   g | ]}|qS r%   r%   rj   fr%   r%   r/   r    s    *.ckptc                    r  rg   r  r  rl   r%   r/   r    r  )r   r   rE   filterrglob)r+   dist_checkpointscheckpoint_filesr%   rl   r/   rz     s
   z'ModelCheckpoint._saved_checkpoint_pathscheckpoint_dirc           	      C   s   ddl m} | stdt| } dd | dtj D }dd | dD }|D ]}t|}||v rCt	
d	|  t| q+d
d | dD }|D ]}t|}||v rht	
d|  t| qP|D ]}t| qkdS )aB  
        Removes all unfinished checkpoints and their associated marker files from the filesystem.

        - Ensures this function runs only on rank 0.
        - Deletes individual unfinished checkpoint files.
        - Removes directories corresponding to unfinished distributed checkpoints.
        - Deletes the marker files indicating unfinished checkpoints.

        Args:
            checkpoint_dir (Union[Path, str]): Path to the directory containing checkpoints.

        Raises:
            AssertionError: If the function is called from a non-rank 0 process.
        r   r2   z8_remove_unfinished_checkpoints should run only on rank 0c                 S      h | ]
}|  r| qS r%   )is_fileresolver  r%   r%   r/   	<setcomp>  s
    zAModelCheckpoint._remove_unfinished_checkpoints.<locals>.<setcomp>r  c                 S   s   h | ]}|  qS r%   )r)  r  r%   r%   r/   r*    s    r!  z Removing unfinished checkpoint: c                 S   r'  r%   )rF   r)  r  r%   r%   r/   r*  !  s    z%Removing unfinished dist checkpoint: N)r=   r3   AssertionErrorr   rE   r   r   r#  r   r   r  r   removerI   r   )	r&  r3   existing_marker_filepathscheckpoint_filepathsr   possible_marker_pathall_dirpathsckpt_dirpathr   r%   r%   r/   r     s0   



z.ModelCheckpoint._remove_unfinished_checkpointspreviouscurrentc                 C   s   ||krdS t |sdS t| }|jdurt|j nd}|dur8||kr8t|dr6|jdr6ndS | jdu rEt| j	 dt| j }||j
v S )a  Checks if the previous checkpoint should be deleted.
        A checkpoint won't be deleted if any of the cases apply:
        - The previous checkpoint is the same as the current checkpoint (means the old was already overwritten by new)
        - The previous checkpoint is not in the current checkpoint directory and the filesystem is local
        - The previous checkpoint is the checkpoint the Trainer resumed from and the filesystem is local
            and the resumed from checkpoint is not the last checkpoint
        FTNrn   z.dirpath is None.)r   r   absoluter   rK   r   rM   r   r   r.   r   )r+   rY   r2  r3  resume_pathr   r%   r%   r/   _should_remove_checkpoint,  s   

z)ModelCheckpoint._should_remove_checkpoint)r   TTr   Fr   NNNFFTT)r   N)rY   r   )F)T)/__name__
__module____qualname____doc__r   r	   rK   r   r
   r   r   r   r*   rX   rA   r   r   r   r   r   r   r   r   r   staticmethodr   r   r   r   r   r   rT   Tensorr   r   r   r   r  r   r   r   ri   propertyrz   r   r6  __classcell__r%   r%   r-   r/   r   $   s    $	
1ME&$
  M
$%"0r   )$r   r|   rI   datetimer   pathlibr   typingr   r   r   r   r   r	   r
   	lightninglightning.pytorchpytorchplrT   _weakrefr   ,lightning.pytorch.callbacks.model_checkpointr   PTLModelCheckpointr   lightning.pytorch.utilitiesr   nemo.lightning.ckpt_utilsr   nemo.lightning.io.plr   
nemo.utilsr   nemo.utils.app_stater   r%   r%   r%   r/   <module>   s$   $