o
    Ti                     @   sX   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlT dd Z	G dd deZ
dS )	    N)CheckpointEngine)loggerlog_dist)*c                 C   s   t jt j| S N)ospathbasenamedirname)r    r   p/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/checkpoint_engine/nebula_checkpoint_engine.py_get_tag_from_path   s   r   c                       sL   e Zd Zd fdd	Zdd ZdefddZddefd	d
Zdd Z  Z	S )NebulaCheckpointEngineNc                    sd   t  | d | _d | _|j| _|j| _| jd u r|j| _t|jt	|j
t|ji}tjdi | d S )Nr   )super__init__
checkpointtag_flagenable_nebula_load	load_pathnebula_load_pathpersistent_storage_pathNEBULA_PERSISTENT_STORAGE_PATHNEBULA_PERSISTENT_TIME_INTERVALpersistent_time_interval"NEBULA_NUM_OF_VERSION_IN_RETENTIONnum_of_version_in_retentiontorch_nebulainit)selfconfig_paramsnebula_config_params	__class__r   r   r      s   
zNebulaCheckpointEngine.__init__c                 C   s&   t d| dgd t|d| _d S )Nz"[Nebula] Start Checkpoint for tag:r   )ranks)r   r   
Checkpointr   )r   tagr   r   r   create&   s   zNebulaCheckpointEngine.creater   c                 C   sj   t d td| t|}tj|}td| d| d | j	|| td| d| d d S )Nz([Nebula] Create dummy files for loading. z[Nebula] Saving  under tag ...z[Nebula] Saved .)
r   torchsaver   r   r   r	   r   infor   )r   
state_dictr   r&   partition_namer   r   r   r-   ,   s   zNebulaCheckpointEngine.savec              	   C   s~  t |}| jd u p| j|k}| js2|r2|| _td| d tj||dd}td| d |S tj	|}td| d| d	| j
 d
 d }|dv rVtj| j
d}ntj|| j
d}|d u sk|d ur|jdkrtd| d| j
 d tj| j
d}|d u s|d ur|jdkrtd t }td| d d S |j}d| _|j||d}td| d| d| j
 d |S )Nz6[Nebula] Disable nebula load. Loading checkpoint from z ...F)map_locationweights_onlyz5[Nebula] Disable nebula load. Loaded checkpoint from z .z[Nebula] Loading r)   z from nebula path r*   )Nlatestlatest_universal)persist_path)r&   r5   r(   z$Unable to find valid checkpoint tag:z= from Nebula, try to get latest checkpoint again from nebula z path!znUnable to find latest checkpoint from Nebula tier3, try to get latest checkpoint again from nebula tier1 path!z6Unable to find valid checkpoint from Nebula under tag:r+   )r1   z[Nebula] Loaded z from )r   r   r   r   r.   r,   loadr   r   r	   r   r   get_latest_checkpointget_checkpointr&   warning)r   r   r1   r&   first_load_flag	partitionr0   r   r   r   r   r7   7   s>   
  zNebulaCheckpointEngine.loadc                 C   s2   t d| d | j }|st d dS |S )Nz[Nebula] all files for z4 are saved in tier1. It is ready to start persistingz?[Nebula] failed to commit the checkpoint, please check the log.F)r   r.   r   commiterror)r   r&   
commit_rlsr   r   r   r=   d   s   

zNebulaCheckpointEngine.commitr   )
__name__
__module____qualname__r   r'   strr-   r7   r=   __classcell__r   r   r!   r   r      s    -r   )r   r,   r   5deepspeed.runtime.checkpoint_engine.checkpoint_enginer   deepspeed.utilsr   r   deepspeed.nebula.constantsr   r   r   r   r   r   <module>   s   