o
    ih                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ	 d dl
mZmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!  m"Z# zd dl$Z$W n   dZ$Y edddZ%G dd dZ&dS )    N)tqdm)datetime)autocast
GradScaler)nullcontextcontextmanager)Path)DistributedDataParallel)FullyShardedDataParallel)	to_device)recursive_average)average_checkpoints)ShardedGradScalerFc                 c   s    |r#t jjjd| dd d V  W d    d S 1 sw   Y  d S | t jks-| t jkrItd| d d V  W d    d S 1 sBw   Y  d S d V  d S )NTF)enableddtypecache_enabled)r   r   )torchcudaampr   float16bfloat16r   use_deepspeed r   Q/home/ubuntu/.local/lib/python3.10/site-packages/funasr/train_utils/trainer_ds.pymaybe_autocast   s   ""
r   c                   @   s   e Zd ZdZ									d'dededed	ed
edefddZ						d(ddZ				d)ddZ							d*ddZ	i fddZ
i fddZd+ddZ				d)ddZ		d,defdd Zd+d!d"Zd#d$ Zd%d& ZdS )-Trainera   
    A simple trainer class for training a PyTorch model, saving checkpoints at the end of each epoch,
    and optionally resuming from a saved checkpoint.

    Attributes:
        max_epoch (int): Maximum number of epochs for training.
        model (torch.nn.Module): The model to be trained.
        optim (torch.optim.Optimizer): The optimizer to use for training.
        scheduler (torch.optim.lr_scheduler._LRScheduler): The learning rate scheduler.
        dataloader_train (torch.utils.data.DataLoader): DataLoader for the training dataset.
        dataloader_val (torch.utils.data.DataLoader): DataLoader for the validation dataset.
        output_dir (str): Directory where model checkpoints will be saved.
        resume (str, optional): Path to a checkpoint to resume training from.
    r      F./use_ddpuse_fsdpuse_fp16use_bf16r   
output_dirc
              	   K   s  || _ || _|| _|| _|| _|
dd| _|	| _tj	
| js(tj| jdd |
dd| _d| _|
dd| _|
d	d
| _d| _tj| _|| _|| _| jrTtj| _| jr[tj| _|
dd| _|
dd| _|
dd| _|
dd| _|
dd| _|
dd| _|
dd| _|
dd| _d| _ d| _!d| _"d| _#d| _$i | _%d| _&d| _'i | _(i | _)|
dd| _*d| _+d| _,d| _-|
dd| _.| j.rt/j0|
d d! t/j1|
|
d"d#|
d$d%|
d&d'|	d(dd) tj	2|	d*}tj|dd zdd+l3m4} ||| _5W n   d,| _5Y || _6|
d-d| _7|
d.d,}|d,ur/t8|t9r/|:d/}|| _;|
d0d,}|d,urHt8|t9rH|:d/}|| _<d,S )1a  
        Initializes the Trainer class with the model, optimizer, scheduler, dataloaders, and other settings.

        Args:
            model (torch.nn.Module): The model to be trained.
            optim (torch.optim.Optimizer): The optimizer to use for training.
            scheduler (torch.optim.lr_scheduler._LRScheduler): The learning rate scheduler.
            dataloader_train (torch.utils.data.DataLoader): The DataLoader for the training dataset.
            dataloader_val (torch.utils.data.DataLoader): The DataLoader for the validation dataset.
            **kwargs: Additional keyword arguments:
                      max_epoch (int): The maximum number of epochs for training.
                      output_dir (str): The directory where model checkpoints will be saved. Default is './'.
                      resume (str, optional): The file path to a checkpoint to resume training from.
        devicer   Texist_okresumer   	max_epochd   log_interval2   save_checkpoint_intervali  validate_intervalkeep_nbest_modelsi  avg_keep_nbest_models_typeaccavg_nbest_model
   
accum_gradr   	grad_clipg      $@grad_clip_typeg       @         reset_gpu_cacheF	use_wandbwandb_tokenkeywandb_project
my_project
wandb_teammy_teamwandb_exp_namemy_exptraining)configprojectentitynamedirjob_typereinittensorboard)SummaryWriterNdeepspeed_configexcludes,effective_save_name_excludes)=rank
local_rank
world_sizer   r    getr$   r#   ospathexistsmakedirsr'   start_epochr(   r*   batch_totalr   float32r   r!   r"   r   r   r,   r-   r.   r/   r1   r3   r4   r5   train_acc_avgtrain_loss_avgval_acc_avgval_loss_avgbest_acc_idxsaved_ckptsstep_or_epochbest_step_or_epochval_acc_step_or_epochval_loss_step_or_epochr9   start_data_split_i
start_stepstep_in_epochr:   wandblogininitjointensorboardXrM   writerr   rN   
isinstancestrsplitrO   rQ   )selfrR   rS   rT   r   r    r!   r"   r   r#   kwargstensorboard_dirrM   rO   rQ   r   r   r   __init__7   s   


	




zTrainer.__init__Nc                 K   sX  | j s| jr
t  |du rdn|}| jrtd| d| j d || j| j	| j
| j| j|||dd|dd| j|d	d|d
dd}	|}t|drW|j |	d< |r_| |	d< tj| jdd |du rqd| }
nd| d| }
tj| j|
}t  |j| j|
|	d W d   n1 sw   Y  td| d ttj| jd}t  |j| jd|	d W d   n1 sw   Y  | jdkr|
| _| jdkrC| j	|
 | j	| j kr$|
| _ttj| jd}t  |j| jd|	d W d   n	1 sw   Y  td| j	| j dd|  ntd| j	|
 dd| j	| j ddtj| j| j  nq| jdkr| j
|
 | j
| j kr|
| _ttj| jd}t  |j| jd|	d W d   n	1 s{w   Y  td| j
| j dd|  n#td | j
|
 dd!| j
| j ddtj| j| j  ntd" | jdkrt| d#| j d$|
 | j|
< | jdkrt| j| jkr| jdkrt | j| jjd%}n	t!| j| jjd%}|| jv r| j|= tj| j|}td&|  tj"|rt#$| n| jrn| jdkrtd| d| j d'| j d || | | j| j	| j
| j| j|||dd|dd| j|d	d|d
dd(}	|}t|drp|j }n| }| j%durtd)| j%  i }|& D ]*}| j%D ]}|'d*d}|(|rtd+| d,| d-  nq|| ||< q||	d< n||	d< |r| |	d< tj| jdd |du rd| }
nd| d| }
tj| j|
}t)|	| td| d ttj| jd}t)|	| | jdkr|
| _| jdkrf| j	|
 | j	| j krG|
| _ttj| jd}t)|	| td| j	| j dd|  nxtd| j	|
 dd| j	| j ddtj| j| j  nY| jdkr| j
|
 | j
| j kr|
| _ttj| jd}t)|	| td| j
| j dd|  n#td | j
|
 dd!| j
| j ddtj| j| j  ntd" t| d#| j d$|
 | j|
< | jdkrt| j| jkr| jdkrt | j| jjd%}n	t!| j| jjd%}|| jv r | j|= tj| j|}td&|  tj"|rt#$| | j s$| jr*t  dS dS ).a`  
        Saves a checkpoint containing the model's state, the optimizer's state,
        and the scheduler's state at the end of the given epoch. This method is
        intended to be called at the end of each epoch to save the training progress.

        Args:
            epoch (int): The epoch number at which the checkpoint is being saved.
        NzSave checkpoint: , rank: 
data_split_ir   data_split_numr   r^   r]   )epochrb   re   rf   rd   r/   stepri   ry   rz   r[   r^   r]   module
state_dictscaler_stateTr%   model.pt.ep.)save_dirtagclient_statez
Checkpoint saved to model.ptr8   r0   zmodel.pt.bestzUpdate best acc: z.4f, zNo improvement in acc: z < losszUpdate best loss: zNo improvement in loss: z > Undoval__step_or_epochr<   zDelete: z, local_rank: )r{   	optimizer	schedulerrb   re   rf   rd   r/   r|   ri   ry   rz   r[   r^   r]   zeffective_save_name_excludes: module.key:  matching: z, not save it)*r   r    distbarrierr   logginginforS   rb   re   rf   rd   r/   rU   r[   hasattrr}   r~   rV   rY   r#   rW   rm   r   no_gradsave_checkpointr   printrR   getattrr.   lenminmaxrX   
misc_utilssmart_removerQ   keysreplace
startswithsave)rs   r{   r|   modeloptimr   scalerri   rt   state	ckpt_namefilenamelatest	best_ckptr=   r~   dst_state_dictkk_exk_tmpr   r   r   r      s~  









4

4









4
4

zTrainer.save_checkpointc              	   C   sp  | j r(| jrtj| jd}tj|r|| jd\}}|d | _|d | _	d|v r0|d ni | _
d|v r;|d ni | _d|v rF|d nd| _d|v rQ|d nd	| _d
|v r\|d
 nd	| _d|v rg|d nd	| _| jdu rqd	n| j| _d|v r}|d nd	| _| jdu rd	n| j| _t|d  d|v r|d nd	| _d|v r|d nd	| _|| j td| d nqtd| d ngtj| jd}tj|r tj|dd}|d | _|d }| }	|	 D ]u}
d}| jdur| jD ]}|
dd}||rtd|
 d| d d} nq|rq|
ds,d|
 | v r,d|
 }n|
drCd|
 | vrC|
ddd}n|
}|| v rS|| |	|
< qtd|
 d|  q| |	 | |d   | |d!  |durd"|v r| |d"  |d | _	d|v r|d ni | _
d|v r|d ni | _d|v r|d nd| _d|v r|d nd	| _d
|v r|d
 nd	| _d|v r|d nd	| _| jdu rd	n| j| _d|v r|d nd	| _| jdu rd	n| j| _t|d  d|v r|d nd	| _d|v r|d nd	| _|| j td| d ntd| d | j!s0| j"r6t#$  dS dS )#z
        Resumes training from a checkpoint at the given file path.
        Loads the model's state, the optimizer's state, and the scheduler's state.

        Args:
            resume_path (str): The file path to the checkpoint to resume from.
        r   r{   rb   re   rf   rd   r8   ry   r   r[   r|   Nri   r]   r^   z%Checkpoint loaded successfully from ''zNo checkpoint found at 'z', does not resume status!cpu)map_locationr~   Fr   r   r   z
, excludedTr   zMiss key in ckpt: model: z, ckpt: r   r   r   )%r'   r   rV   rW   rm   r#   rX   load_checkpointrZ   rb   re   rf   rd   rg   r[   rh   ri   r   r]   r^   tor$   isfiler   loadr~   r   rO   r   r   r   r   load_state_dictr   r    r   r   )rs   r   r   r   r   ckpt_
checkpoint	src_state	dst_stater   excludes_flagr   r   k_ddpr   r   r   resume_checkpoint  s   









zTrainer.resume_checkpointc                 K   s  | j s	| js	| jrt  td| d| j d |  | j	}	|
  i }
td| j}|j| t }|}t|D ]E\}}|  jd7  _|  jd7  _i |||dd|dd||dd | j| jd	}t }|| d
|d d< t|| j}t}| j s| jr||	 dkr|jn|}| 8 t }| j|||d t }|| d
|d d< | j|||d t }|| d
|d d< W d   n1 sw   Y  | j|||||d t | d
}t }|| d
|d d< ||d d< | d |d< t||d< | j| |d   ! "  |d  | _d|d v r=| j#| |d d   ! "  |d  | _#| j$|dd | j| j% dkr\| j&|||| j'|d | jd | j| j( dkr| j)||||||d | j|dd|dd| j| j#d t }q@| j s| js| jrtj| jtj*d| j}tj| j#tj*d| j}tj+|tj,j-d tj+|tj,j-d |  ! " | j. | _|  ! " | j. | _#dS dS )z
        Defines the training process for a single epoch with gradient accumulation.
        Args:
            epoch (int): The current epoch number.
        zTrain epoch: rw   rx   r   r   ry   rz   rh   )speed_statsr{   	batch_idxry   rz   log_stepr[   ri   0.3fr   	data_load	loss_dictforward_timebackward_timeN
optim_time
total_timelrbatch_num_epochr   r0   statstrainr   )r   dataloader_valr{   ro   r|   ri   )
r   r   r   r   r|   ri   ry   rz   r^   r]   r   op)/r   r    r   r   r   r   r   rR   r   r3   	zero_gradr   tensorr   r$   batch_sampler	set_epochtimeperf_counter	enumerater[   ri   rU   r   r   no_syncforward_stepbackward_stepupdate_stepget_last_lrr   r^   detachr   itemr]   logr-   validate_epochro   r,   r   r\   
all_reduceReduceOpSUMrT   )rs   r   r   r   r   dataloader_trainr   r{   rt   r3   r   iterator_stoptime_begtime5r   batchr   time1
my_contexttime2time3time4r   r^   r]   r   r   r   train_epoch"  s   

 	


zTrainer.train_epochc                 C   sv   t | j| jd |di |}W d    n1 sw   Y  |\}}}dd | D }||d< ||d< ||d< d S )Nr   c                 S   s   i | ]\}}|d ur||qS Nr   .0r   vr   r   r   
<dictcomp>  s    z(Trainer.forward_step.<locals>.<dictcomp>r   r   weightr   )r   r   r   items)rs   r   r   r   retvalr   r   r   r   r   r   r     s   
zTrainer.forward_stepc                 C   sH   |d }| j r||}d S || j }|r||  d S |  d S )Nr   )r   backwardr3   scale)rs   r   r   r   r   scaled_lossr   r   r   r     s   
zTrainer.backward_stepc                 C   s   |d }| j r|  d S |d | j dkrc| jdkr=tjjj| | j| j	d}t
|s=td| d |  d S | jsC| jrGt  |rS|| |  n|  |  |jdd d S d S )	Nr   r   r   )max_norm	norm_typezThe grad norm is z. Skipping updating the model.T)set_to_none)r   r|   r3   r4   r   nnutilsclip_grad_norm_
parametersr5   isfiniter   warningr   r   r    r   r   update)rs   r   r   r   r   r   r   	grad_normr   r   r   r     s2   




zTrainer.update_stepc                 K   s  d| _ d| _| js| js| jrt  td| d| j	 d |
  t  i }t }|}|j| t|D ]\}	}
i ||	|dd|dd|	|d	d | j|	d dd
	}t }|| d|d d< t|
| j}
t }| j||
|d t }|| d|d d< t | d}t }||d d< t||d< | j|dd t }| j |	 |d     |	d  | _ d|d v r| j|	 |d d     |	d  | _q<| js| js| jr-tj| j tjd| j}tj| jtjd| j}tj|tj j!d tj|tj j!d |   | j" | _ |   | j" | _W d   n	1 s8w   Y  |dddu rLd| }nd| d|d }| j| j#|< | j | j$|< | jso| jso| jrst  |%  dS )z
        Defines the validation process for a single epoch.
        Should be implemented with the actual model validation steps.

        Args:
            epoch (int): The current epoch number.
        r6   zValidate epoch: rw   rx   ry   r   rz   r   rh   )	r   r{   r   ry   rz   r   r[   ri   r   r   r   r   r   r   r   r   valr   r   r0   r   r   r   Nri   r   r   )&r`   r_   r   r    r   r   r   r   r   rR   evalr   r   r   r   r   r   r   rU   r[   r   r$   r   r   r   r   r   r   r   r\   r   r   r   r   rT   re   rf   r   )rs   r   r   r{   ro   rt   r   r   r   r   r   r   r   r   r   r   r`   r_   r   r   r   r   r     s|   

;zTrainer.validate_epochr   r   c              	   K   sJ  |d     }|d }|d }|d }|d }|d }	|d }
|d }|d	 }|d
 }|d }|dd }|d | j dkr|d urH|n|}dtj d d d tj	 d d d tj
 d d d tj d d d }t| | d}t| | d}dg | d| j d| d| j d| d| d|d  d|	 d| d| d|dd|ddt|dd |dd!|
dd"d#d$ | D  d%| d%| }t| d&| j d'| |d&| j d(| |
i}| j}|d ur|d&| j d'| || |d&| j d(| |
| | D ](\}}|d)| j d*| d| | | | |d)| j d*| d| < q7| D ](\}}|d)| j d*| d| t|| t||d)| j d*| d| < qd| jrtd urtj||d+ d S d S d S d S ),Nr   r{   r   ri   r[   r   r   r   r   ry   rz   r   r   r   zWGPU, memory: usage: {:.3f} GB, peak: {:.3f} GB, cache: {:.3f} GB, cache_peak: {:.3f} GBi   	_loss_avg_acc_avgr8   rw   z	, epoch: /z, data_slice: z, step_in_slice: z, step_in_epoch: z, total step: z, (loss_avg_rank: z.3fz), (loss_avg_slice: z), (ppl_avg_slice: z.3ez), (acc_avg_slice: z), (lr: z), c                 S   s*   g | ]\}}|t |   d fqS )   )roundr   r   r   r   r   r   r   
<listcomp>`  s   * zTrainer.log.<locals>.<listcomp>r   rR   z_loss/z_lr/
stats_rankr   )r|   )r   r   r   rU   r*   formatr   r   memory_allocatedmax_memory_allocatedmemory_reservedmax_memory_reservedr   rm   rR   r(   mathexpr   r   r   ro   
add_scalarr  r:   rj   r   )rs   r   r   rt   r   r{   r   ri   r[   r   r   r   r   ry   rz   r   gpu_infoloss_avg_epochacc_avg_epochdescriptiondescription_dictro   r=   varr   r   r   r   1  s   
	


&$&$
0zTrainer.logc                 C   sB   | j s| jr
t  |d ur|  | j s| jrtj  d S d S r   )r   r    r   r   closer   distributeddestroy_process_group)rs   ro   r   r   r   r  {  s   zTrainer.closec           
      K   s  | j rSddlm} ddlm} ddlm} ttj	
dd}ttj	
dd}ttj	
dddkrNtd	 ||||| d
 td ||||| d
 d }	 |S | jrvttj	
dd}	||	}t||	g|
di 
ddd}|S |j|
ddd}|S )Nr   ).estimate_zero2_model_states_mem_needs_all_live).estimate_zero3_model_states_mem_needs_all_live)*convert_zero_checkpoint_to_fp32_state_dictLOCAL_WORLD_SIZEr   
WORLD_SIZERANKz/Estimating model states memory needs (zero2)...)num_gpus_per_node	num_nodesz/Estimating model states memory needs (zero3)...
LOCAL_RANK
train_conffind_unused_parametersF)
device_idsr*  r$   r   )r$   )r   $deepspeed.runtime.zero.stage_1_and_2r   deepspeed.runtime.zero.stage3r!  deepspeed.utils.zero_to_fp32r"  intrV   environrU   r   r   r   r   DDPr   )
rs   r   rt   r   r!  r"  local_world_sizerT   r$   rS   r   r   r   
warp_model  sF   


zTrainer.warp_modelc                    s  ddl m} ddlm} ddlm}m} dd l}t	d  
dd}||v s)J |
|}	|	| fi  
d}t	d	  
d
d}
|
|v sLJ |
|
|fi  
d}
| jrdd l}|d| ji}t| jd}||}W d    n1 sw   Y  d|v r|d d rtj| _d|v r|d d rtj| _d|v rd }d
|v rd }
n fdd}
|j||||
| d\}}}}
|||
fS )Nr   )optim_classes)scheduler_classes)	OmegaConf
DictConfigzBuild optimr   adam
optim_confzBuild schedulerr   warmuplrscheduler_confrN   rbf16r   fp16r   c                    s   | fi   dS )Nr;  )rU   )optrt   scheduler_classr   r   r     s   z/Trainer.warp_optim_scheduler.<locals>.scheduler)argsr   r   lr_schedulermodel_parameters)funasr.optimizersr4  funasr.schedulersr5  	omegaconfr6  r7  jsonr   r   rU   r  r   	deepspeedcreaterN   openr   r   r   r   r   
initialize)rs   r   rt   r4  r5  r6  r7  rH  r   optim_classr   rI  rB  fin
ds_configsr   r   r   r@  r   warp_optim_scheduler  sJ   




zTrainer.warp_optim_scheduler)	r   r   r   FFFFFr   )NNNNNN)NNNN)NNNNNNNr   )Nr   )__name__
__module____qualname____doc__boolrq   rv   r   r   r   r   r   r   r   dictr   r  r3  rP  r   r   r   r   r   '   s|    	

q
 t
 
z
"
`

J.r   )NF)'r  rV   r   r   r   r   r   torch.distributedr  r   torch.cuda.ampr   r   
contextlibr   r   pathlibr   torch.nn.parallelr	   r1  torch.distributed.fsdpr
   FSDPfunasr.train_utils.device_funcsr   funasr.train_utils.recursive_opr   'funasr.train_utils.average_nbest_modelsr   *torch.distributed.fsdp.sharded_grad_scalerr   funasr.utils.miscr   miscr   rj   r   r   r   r   r   r   <module>   s2    