o
    ॵiv                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ G dd deZ e
j!ej"dG dd deZ"dS )    N)mpu)Hooks)EpochBasedTrainer)HOOKS)BestCkptSaverHookCheckpointHookCheckpointProcessor)LoadCheckpointHook)Hook)load_checkpointsave_checkpoint)DistributedParallelType)create_device)
get_logger)is_megatron_initialized)get_local_rankc                   @   sN   e Zd ZdZdd Zdd Zdd Zdd	 Z	
	dddZdd Z	dd Z
d
S )MpuProcessormodelc              	   C   sD   zt  }|dkrW dS t  }d|W S  ttfy!   Y dS w )N    z_mp_rank_{:02d})r   $get_tensor_model_parallel_world_sizeget_tensor_model_parallel_rankformatImportErrorAssertionError)selftp_world_sizemp_rank r   g/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/trainers/hooks/distributed/megatron_hook.py	rank_name   s   zMpuProcessor.rank_namec                 C   s   t  }d|}d| dS )Nz{:02d}mp_rank_z_model_states.pt)r   r   r   )r   r   rankr   r   r   get_bin_filename&   s   
zMpuProcessor.get_bin_filenamec                 C   s   t j  pt dkS Nr   )torchdistributedis_initializedr   get_data_parallel_rankr   trainerr   r   r   should_save_on_rank+   s   
z MpuProcessor.should_save_on_rankc                 C   s6   |j }t|||| j tjtj|| jdd d S )NT)exist_ok)cfgr   copy_files_and_dump_config_BIN_FILE_DIRosmakedirspathjoin)r   r*   
output_dirconfigr   r   r   prepare_output0   s   
zMpuProcessor.prepare_outputNTc                 C   s  | |j}||   tj }t|||r|jnd |r|jnd |dd tj	
|}tj	|}	|  }
tj	||	d |
 }t||dd |}tj	|| j|
}tj	|r[t| z	t|| W d S  ty } zt d| d| d| d t|| W Y d }~d S d }~ww )	NF)meta
with_model_)	with_metazLink z to z error: z@, changing to copy the bin file, this may case more space usage.)unwrap_moduler   r    r   TRAINER_STATE_SUFFIXr   	optimizerlr_schedulerr0   r2   dirnamebasenamer#   r3   r/   isfileunlinklinkOSErrorr   errorshutilcopyfile)r   r*   checkpoint_path_prefixr4   r7   save_optimizersr   _train_state_filesave_dirprefixbin_fileprefix_bin_filesrc_file	dest_fileer   r   r   save_checkpoints8   s<   

zMpuProcessor.save_checkpointsc                 C   s|   ||    tj }tj|rt| tj|}tj|}| 	 }tj
||d | }tj|r<t| d S d S Nr9   )r    r   r<   r0   r2   rA   remover?   r@   r#   r3   )r   r*   rH   rJ   rK   rL   rM   absolute_filer   r   r   remove_checkpoints^   s   

zMpuProcessor.remove_checkpointsc                 C   s   | |j}tj|r"|}|  }tj||}t||d d  d S ||   t	j
 }	t||	|}
tj|}tj|}|  }tj||d | }t||d d  |
S rS   )r;   r   r0   r2   isdirr#   r3   r   r    r   r<   r	   load_trainer_stater?   r@   )r   rH   r*   load_all_statestrictr   rK   rM   
model_filerJ   r7   rL   r   r   r   load_checkpointsk   s$   
zMpuProcessor.load_checkpointsNT)__name__
__module____qualname__r/   r    r#   r+   r6   rR   rV   r\   r   r   r   r   r      s    
&r   )module_namec                   @   sF   e Zd ZdZdd ZdefddZdd Zd	d
 Zdd Z	dd Z
dS )MegatronHookr   c                 C   s
   d| _ d S )NF)wrapped)r   r   r   r   __init__   s   
zMegatronHook.__init__r*   c                 C   s   t  }|t}t|dkrt|d jt s|d | |t}t|dkr7t|d jt s7|d | |t}t|dkrSt|d jt sU|d | d S d S d S r$   )	r   get_hookr   len
isinstance	processorset_processorr   r	   )r   r*   rh   	ckpt_hookbest_ckpt_hookload_ckpt_hookr   r   r   register_processor   s"   




zMegatronHook.register_processorc                 C   sb   t  sJ t }td| |_|j|j t |jt	j
< t |jt	j< t |jt	j< d S )Nzcuda:)r   r   r   devicer   tor   get_data_parallel_groupparallel_groupsr   DPget_tensor_model_parallel_groupTP!get_pipeline_model_parallel_groupPP)r   r*   
local_rankr   r   r   
after_init   s    
zMegatronHook.after_initc                 C      |  | d S Nwrap_moduler)   r   r   r   
before_run      zMegatronHook.before_runc                 C   ry   rz   r{   r)   r   r   r   
before_val   r~   zMegatronHook.before_valc                 C   s,   |j r| js||j|_d| _d S d S d S r]   )_distrc   to_parallelr   r)   r   r   r   r|      s   
zMegatronHook.wrap_moduleN)r^   r_   r`   r/   rd   r   rm   rx   r}   r   r|   r   r   r   r   rb      s    rb   )#r0   rF   r%   megatron_utilr   modelscope.metainfor   modelscope.trainersr   !modelscope.trainers.hooks.builderr   4modelscope.trainers.hooks.checkpoint.checkpoint_hookr   r   r   9modelscope.trainers.hooks.checkpoint.load_checkpoint_hookr	   modelscope.trainers.hooks.hookr
   modelscope.utils.checkpointr   r   modelscope.utils.constantr   modelscope.utils.devicer   modelscope.utils.loggerr   modelscope.utils.megatron_utilsr   modelscope.utils.torch_utilsr   r   register_modulerb   r   r   r   r   <module>   s&    k