o
    3wi                  	   @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZddlmZ dd	lmZmZmZmZ dd
lmZ ddlmZmZ ddlmZmZmZ ddlmZ ee Z!dd Z"dd Z#dBddZ$dBddZ%dd Z&dCddZ'dCddZ(dDddZ)dCdd Z*dEd"e+d#e+d$e,fd%d&Z-	dFd"e+d'e+d$e,d(e,fd)d*Z.d+ej/j0d,ej1fd-d.Z2d+ej/j0d/e3fd0d1Z4d2ej5j6d3e3fd4d5Z7d+ej/j0fd6d7Z8d+ej/j0d8ej/j0fd9d:Z9d+ej/j0d8eej/j0ge,f fd;d<Z:d=d> Z;d?e3d8e3fd@dAZ<dS )G    N)defaultdict)nullcontext)Path)Callable   )
get_logger   )FSDP_MODEL_NAMEOPTIMIZER_NAMESAFE_WEIGHTS_NAMEWEIGHTS_NAME)get_module_class_from_name)get_non_persistent_buffersis_peft_model)get_module_children_bottom_upis_compiled_modulesave)is_torch_versionc                   C   s"   dt jvr
dt jd< dt jd< dS )z[
    Enables RAM efficient loading of Hugging Face models for FSDP in the environment.
    ACCELERATE_USE_FSDPTrueFSDP_CPU_RAM_EFFICIENT_LOADINGNosenviron r   r   X/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/accelerate/utils/fsdp_utils.py!enable_fsdp_ram_efficient_loading%   s   

r   c                   C   s   dt jd< dS )z\
    Disables RAM efficient loading of Hugging Face models for FSDP in the environment.
    Falser   Nr   r   r   r   r   "disable_fsdp_ram_efficient_loading/   s   r   Fc                 C   sN   |rt | rddlm} || | jdS |d ur#ddlm} || |dS |  S )Nr   )get_peft_model_state_dictadapter_name)get_model_state_dictoptions)r   peftr   active_adapter'torch.distributed.checkpoint.state_dictr"   
state_dict)modeladapter_only
sd_optionsr   r"   r   r   r   _get_model_state_dict6   s   r,   c                 C   sT   |rt | rddlm} || || jdS |d ur%ddlm} || ||dS | |S )Nr   )set_peft_model_state_dictr    )set_model_state_dictr#   )r   r%   r-   r&   r'   r.   load_state_dict)r)   r(   r*   r+   r-   r.   r   r   r   _set_model_state_dictE   s   
r0   c                 C   sT   d }| j dkr(ddlm} ddlm} || j|jkt| jddt| jddd}|S )	Nr   r   )StateDictOptionsStateDictTypeoffload_to_cpuF
rank0_only)full_state_dictcpu_offloadbroadcast_from_rank0)	fsdp_versionr'   r1   2torch.distributed.fsdp.fully_sharded_data_parallelr3   state_dict_typeFULL_STATE_DICTgetattrstate_dict_config)fsdp_pluginr+   r1   r3   r   r   r   _prepare_sd_optionsT   s   

r@   c                 C   sd  dd l m  m} ddlm} ddlm} ddlm}	 tj	|dd | j
|	jkr5|jdk}
|
| j_|
| j_| jdkrE|
|| j
| j| jnt }t| }| t|||d}| j
|	jkr|dkret d	nt d
| d	}tj||}|jdkrtd|  t|| td|  n| j
|	jkr|dkrt d|j d	nt d
| d|j d	}tj||}td|  t|| td|  nR| j
|	jkrtj|t d
| }tj	|dd td|  d|i}|j|||| d td|  W d    d S W d    d S W d    d S W d    d S 1 s+w   Y  d S )Nr   DefaultSavePlannerFullyShardedDataParallelr2   Texist_okr   r*   r+   .bin_zSaving model to zModel saved to _rankr)   r(   storage_writerplanner) torch.distributed.checkpointdistributed
checkpoint,torch.distributed.checkpoint.default_plannerrB   r:   rD   r3   r   makedirsr;   r<   num_processesr>   r4   r5   r9   optim_state_dict_configr   r@   r,   r	   pathjoinprocess_indexloggerinfotorchr   LOCAL_STATE_DICTSHARDED_STATE_DICTFileSystemWriter)r?   acceleratorr)   
output_dirmodel_indexr*   dist_cprB   FSDPr3   is_multi_processctxr+   r(   weights_nameoutput_model_fileckpt_dirr   r   r   save_fsdp_modele   sj   

"
 $rh   c                 C   s  dd l m  m} ddlm} ddlm} ddlm}	 |  | j	|	j
kr2|jdk}
|
| j_|
| j_| jdkrB|	|| j	| j| jnt }t| }| | j	|	j
krt||uru|jdkru|jsu| jsl| jdkrltd	 W d    d S |dkr~t dnt d| d}tj||}td	|  |j p|j}|rtj |d
d}ni }td|  n| j	|	j!kr|dkrt d|j dnt d| d|j d}tj||}td	|  tj |d
d}td|  nD| j	|	j"kr3t |vrtj|t d| n|}td	|  dt#|||di}|j ||$|| d |d }td|  t%||||d}W d    |S 1 sGw   Y  |S )Nr   )DefaultLoadPlannerrC   r2   r   zzSet the `sync_module_states` flag to `True` so that model states are synced across processes when initializing FSDP objectrH   rI   zLoading model from Tweights_onlyzModel loaded from rJ   r)   rG   )r(   storage_readerrM   )&rN   rO   rP   rQ   ri   r:   rD   r3   wait_for_everyoner;   r<   rS   r>   r4   r5   r9   rT   r   r@   typerW   is_fsdp2sync_module_states
ValueErrorr	   r   rU   rV   rX   rY   is_main_processrZ   loadr[   r\   r,   FileSystemReaderr0   )r?   r^   r)   	input_dirr`   r*   ra   ri   rb   r3   rc   rd   r+   re   input_model_file
load_modelr(   rg   load_resultr   r   r   load_fsdp_model   sz   

"	
..ry   c                 C   s  dd l m  m} ddlm} ddlm} ddlm}	 tj	|dd | j
dkr2||| j| j| jnt }
t| }|
 | j
dkrOdd	lm} ||||d
}n|||}| j|	jkr|jdkr|dkrit dnt d| d}tj||}td|  t|| td|  n9tj|t d| }tj	|dd td|  |jd|i||| d td|  W d    d S W d    d S 1 sw   Y  d S )Nr   rA   rC   r2   TrE   r   r   )get_optimizer_state_dictr#   rH   rI   zSaving Optimizer state to zOptimizer state saved in 	optimizerrK   )rN   rO   rP   rQ   rB   r:   rD   r3   r   rR   r9   r;   r>   rT   r   r@   r'   rz   optim_state_dictr<   rW   r
   rU   rV   rX   rY   rZ   r   r]   )r?   r^   r{   r)   r_   optimizer_indexra   rB   rb   r3   rd   r+   rz   optim_stateoptim_state_nameoutput_optimizer_filerg   r   r   r   save_fsdp_optimizer   sL   


 " r   c                 C   s  dd l m  m} ddlm} ddlm}	 |  | jdkr)||| j| j	| j
nt }
t| }|
 | j|	jkrtd }|jdksD| j
jss|dkrMt dnt d| d}tj||}td|  tj|dd	}td
|  n8t |vrtj|t d| n|}td|  d| i}|j||||d |d }td|  | jdkr|j|||d}|| nddlm} |||||d W d    d S W d    d S 1 sw   Y  d S )Nr   rC   r2   r   rH   rI   zLoading Optimizer state from Trj   zOptimizer state loaded from zLoading Optimizer from r{   )checkpoint_idrl   zOptimizer loaded from )r)   optimr|   )set_optimizer_state_dictr#   )rN   rO   rP   r:   rD   r3   rm   r9   r;   r>   rT   r   r@   r<   rW   r5   r
   r   rU   rV   rX   rY   rZ   rs   r(   rt   optim_state_dict_to_loadr/   r'   r   )r?   r^   r{   r)   ru   r}   r*   ra   rb   r3   rd   r+   r~   optimizer_nameinput_optimizer_filerg   flattened_osdr   r   r   r   load_fsdp_optimizer  sV   
 

"r   Tcheckpoint_dir	save_pathsafe_serializationc                 C   s   ddl m  m} ddlm  m  m} i }t|}|jdd |j||| |	 dd |r5|t
 n|t }t| dkrI|t|d  }t|||d |S )z
    Passthrough to `torch.distributed.checkpoint.format_utils.dcp_to_torch_save`

    Will save under `save_path` as either `model.safetensors` or `pytorch_model.bin`.
    r   NTrE   )rl   rM   no_distr   )r   )rN   rO   rP   )torch.distributed.checkpoint.format_utilsformat_utilsr   mkdir_load_state_dictrt   _EmptyStateDictLoadPlannerr   r   lenkeyslistr   )r   r   r   ra   dist_cp_format_utilsr(   r   r   r   )_distributed_checkpoint_to_merged_weightsJ  s    r   output_pathremove_checkpoint_dirc           
      C   s2  t | } ddlm} tddstd|  sh| d  }| d  }d|  d	}|rD|rD|d
7 }|d|  d|  d7 }|d7 }t||rV|d7 }|d|  d7 }t||rd|d7 }|d|  d7 }t|| }|jrtd|   t	| ||}	td|	  |rtd|   t
|  |  dS )a?  
    Merge the weights from sharded FSDP model checkpoints into a single combined checkpoint. Should be used if
    `SHARDED_STATE_DICT` was used for the model. Weights will be saved to `{output_path}/model.safetensors` if
    `safe_serialization` else `pytorch_model.bin`.

    Note: this is a CPU-bound process.

    Args:
        checkpoint_dir (`str`):
            The directory containing the FSDP checkpoints (can be either the model or optimizer).
        output_path (`str`):
            The path to save the merged checkpoint.
        safe_serialization (`bool`, *optional*, defaults to `True`):
            Whether to save the merged weights with safetensors (recommended).
        remove_checkpoint_dir (`bool`, *optional*, defaults to `False`):
            Whether to remove the checkpoint directory after merging.
    r   )PartialStatez>=z2.3.0z/`merge_fsdp_weights` requires PyTorch >= 2.3.0`pytorch_model_fsdp_0optimizer_0zTried to load from z) but couldn't find a valid metadata file.zE However, potential model and optimizer checkpoint directories exist.zPlease pass in either z/pytorch_model_fsdp_0 or z/optimizer_0zinstead.z8 However, a potential model checkpoint directory exists.zPlease try passing in z/pytorch_model_fsdp_0 instead.z< However, a potential optimizer checkpoint directory exists.z/optimizer_0 instead.zMerging FSDP weights from z.Successfully merged FSDP weights and saved to z"Removing old checkpoint directory N)r   accelerate.stater   r   rq   existsrr   rX   rY   r   shutilrmtreerm   )
r   r   r   r   r   model_path_existsoptimizer_path_existserrstater   r   r   r   merge_fsdp_weightsf  s<   

r   r)   devicec           	         s   t |dd }|s
S i  |D ]%}|d}d|d d |d }}||}t ||}d  t|< qdtjjf fdd}|S )N_tied_weights_keys.modulec                    s   t t}| jddD ]\}}t| v r|t| | q
| } | D ]\}}|D ]} | }|d u r>t| | |< q,t| || q,q&| S )NF)recurse)r   r   named_parametersidappenditemsr=   setattr)r   params_to_tienparamid_key_param_names
param_name_tied_paramsparam_init_fnr   r   param_init_fn_tied_param  s   	z7ensure_weights_retied.<locals>.param_init_fn_tied_param)r=   splitrV   get_submoduler   rZ   nnModule)	r   r)   r   _tied_namesnamer   modr   r   r   r   r   ensure_weights_retied  s   


r   full_sdc                 C   s@  ddl m} ddlm} | }i }dd }dd }| jr\t| | D ]4\\}	}
}|j	}|

 |j}
|j|
d| d ||
||j}|||	|
\}}||||}|||	< q&n;| D ]6\}	}|j	}tj| |j|jd	}|j|d| d ||||j}|||	|\}}||||}|||	< q`|j|d
d |S )a  
    Loads the full state dict (could be only on rank 0) into the sharded model. This is done by broadcasting the
    parameters from rank 0 to all other ranks. This function modifies the model in-place.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`):
            The model to load the state dict into, expected to be on meta device or a VRAM spike can occur
        full_sd (`dict`): The full state dict to load, can only be on rank 0
    r   N)distribute_tensorc           
      S   s   z|  |}W n ty"   |dd\}}| |}t||}Y nw ttd}d }|o1|jtjk}	|jj	r;|	s;|j}|d uoB|
 |fS )Nr   r   float8_e4m3fn)get_parameter_or_bufferAttributeErrorrsplitr   r=   hasattrrZ   dtyper   is_floating_pointis_contiguous)
r)   r   empty_param	old_parambase_param_namelocal_param_name	submoduleis_torch_e4m3fn_availablecasting_dtypeis_param_float8_e4m3fnr   r   r   _infer_parameter_dtype  s   

z:fsdp2_load_full_state_dict.<locals>._infer_parameter_dtypec                 S   s$   |d ur
| j |d} |r|  } | S )N)r   )to
contiguous)tensorto_contiguousr   r   r   r   _cast_and_contiguous  s
   z8fsdp2_load_full_state_dict.<locals>._cast_and_contiguous)srcgroup)r   r   T)assign)torch.distributedrO   torch.distributed.tensorr   r(   rr   zipr   valuesdevice_meshdetachr   device_type	broadcast	get_group
placementsrZ   emptysizer   r/   )r^   r)   r   distr   meta_sharded_sd
sharded_sdr   r   r   
full_paramsharded_paramr   sharded_tensorr   r   full_tensorr   r   r   fsdp2_load_full_state_dict  sD   

r   r{   mappingc                    sD   z| j D ]} fdd|d D |d< qW dS  ty!   tdw )a  
    Switches the parameters of the optimizer to new ones (sharded parameters in usual case). This function modifies the
    optimizer in-place.

    Args:
        optimizer (`torch.optim.Optimizer`): Optimizer instance which contains the original model parameters
        mapping (`dict`): Mapping from the original parameter (specified by `data_ptr`) to the sharded parameter

    Raises:
        KeyError:
            If a parameter in the optimizer couldn't be switched to its sharded version. This should never happen and
            indicates a bug. If we kept the original params instead of raising, the training wouldn't be numerically
            correct and weights wouldn't get updated.
    c                    s   g | ]} |j  qS r   )data_ptr.0pr   r   r   
<listcomp>)  s    z5fsdp2_switch_optimizer_parameters.<locals>.<listcomp>paramszA parameter in the optimizer couldn't be switched to its sharded version. This breaks the training. Please raise an issue on GitHub.N)param_groupsKeyError)r{   r   param_groupr   r   r   !fsdp2_switch_optimizer_parameters  s   
r   c           	      C   s   ddl m} t| jj|}t|dddd D ]3\}}t|ddkr-|dd\}}nd}|}|r8|	|n|}||rJ||d	d
}|
|| q|S )a8  
    Applies the activation checkpointing to the model.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`): The model to apply the activation checkpointing to

    Returns:
        `torch.nn.Module`: The model with the activation checkpointing applied
    r   )checkpoint_wrapperT)return_fqnsNr   r   r   F)preserve_rng_state);torch.distributed.algorithms._checkpoint.checkpoint_wrapperr   fsdp2_prepare_auto_wrap_policyr   r?   r   r   r   r   r   register_module)	r^   r)   r   auto_wrap_policy_func
layer_namelayerparent_name
child_nameparent_moduler   r   r   fsdp2_apply_ac2  s   r
  returnc                    s(  ddl m}m}m} t||pt|ot|j|}|r|S | jj}|	| |
 }|j|j|jp3| d}d}	| D ]\}
}|jjdkrJd}	 nq<|jrv|	svt|ddd t fdd	| D }|td
}t|drv|  t||}|durt|dd D ]}||rt||s||fi | qt||s||fi | |jrt| || |jr|	s| D ](\}}|| j}d|v r|dd\}}| |}n|}|}|j!||dd qt|dr|  t"|dd}| j#dkr|du s|tj$kr|tj$}| j%rt&'d |S )a"  Prepares the model for FSDP2 in-place. Also returns the model to avoid misuse of the original model.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`): The model to prepare

    Returns:
        `torch.nn.Module`: Prepared model
    r   )
FSDPModuleMixedPrecisionPolicyfully_shard)reshard_after_forwardoffload_policy	mp_policyF
Params4bitT)r   fqnsc                    s   i | ]\}}| v r||qS r   r   r   kvnon_persistent_buffer_fqnsr   r   
<dictcomp>      z'fsdp2_prepare_model.<locals>.<dictcomp>metatie_weightsNr   r   r   )
persistentr   noz~FSDP upcast of low precision parameters to fp32 (since mixed_precision != 'no') may affect the precision of model checkpoints.)(torch.distributed.fsdpr  r  r  
isinstancer   	_orig_modr   r?   set_auto_wrap_policyr(   r  r7   mixed_precision_policyr   	__class____name__cpu_ram_efficient_loadingr   copydeepcopynamed_buffersr   rZ   r   r   r  r  r   r   r   r   r   register_bufferr=   mixed_precisionfloat32rr   warningswarn)r^   r)   r  r  r  is_type_fsdpfsdp2_pluginoriginal_sdfsdp2_kwargsmodel_has_params4bitr   r   original_non_persistent_buffersr  r   fqnbuffer_tensor
parent_fqnlocal_buffer_namer	  model_dtyper   r  r   fsdp2_prepare_modelS  sp   




	




"r:  c           
         s   ddl m}m}  j}t|tjr|j}||u r^t|dd}|du r$g }t	|} j
dur0 j
}t |D ]}t||}|du rHtd| d| q5dtjjdtf fd	d
}	|	S ||u rqdtjjdtf fdd
}	|	S dS )a!  Prepares the auto wrap policy based on its type, done to mimic the behaviour of FSDP1 auto wrap policy.

    Args:
        fsdp2_plugin (`FullyShardedDataParallelPlugin`):
            Instance of `FullyShardedDataParallelPlugin` containing the configuration options
        auto_wrap_policy_type (`str`):
            Either `transformer` or `size`
        model (`torch.nn.Module`):
            The model to wrap

    Returns:
        `Callable[[torch.nn.Module], bool]`:
            The auto wrap policy function to be applied to the model
    r   )size_based_auto_wrap_policytransformer_auto_wrap_policy_no_split_modulesNz+Could not find the transformer layer class z in the model.r   r  c                    s    j d u rdS t| tS )NF)transformer_cls_names_to_wrapr   tuple)r   r0  transformer_cls_to_wrapr   r   policy  s   
z.fsdp2_prepare_auto_wrap_policy.<locals>.policyc                    s    t dd |  D }| jkS )Nc                 s   s    | ]}|  V  qd S )N)numelr   r   r   r   	<genexpr>  s    zAfsdp2_prepare_auto_wrap_policy.<locals>.policy.<locals>.<genexpr>)sum
parametersmin_num_params)r   module_num_params)r0  r   r   rB    s   
)torch.distributed.fsdp.wrapr;  r<  auto_wrap_policyr   	functoolspartialfuncr=   r   r>  setr   rq   addrZ   r   r   bool)
r0  r)   r;  r<  fnno_split_modulesr>  layer_classtransformer_clsrB  r   r@  r   r    s.   

r  c                  K   s   ddl m} |di | S )a  
    Returns a `GradScaler` for FSDP2, as the current implementation of `get_grad_scaler` doesn't accept other args. We
    need this as current `get_grad_scaler` accepts only `distributed_type` as arg, which doesn't differentiate between
    FSDP1 and FSDP2
    r   )
GradScalerNr   )torch.amp.grad_scalerrU  )kwargsrU  r   r   r   get_fsdp2_grad_scaler  s   rX  named_paramsc                 C   s:   dd |   D } dd |   D } dd |   D } | S )a6  Removes parameter name modifiers in order to map them back to their original names.

    See huggingface/accelerate#3554 for more context.

    Args:
        named_params (`dict`): The named parameters dictionary to canonicalize.

    Returns:
        `dict`: The canonicalized named parameters dictionary
    c                 S      i | ]\}}| d d|qS )z._checkpoint_wrapped_module replacer  r   r   r   r    r  z,fsdp2_canonicalize_names.<locals>.<dictcomp>c                 S   s,   i | ]\}}| d r|d dn||qS )z
_orig_mod.r[  )
startswithr]  r  r   r   r   r    s     c                 S   rZ  )z
._orig_modr[  r\  r  r   r   r   r    r  )r   )rY  r   r   r   fsdp2_canonicalize_names  s   r_  )FN)r   F)r   )T)TF)=r'  rK  r   r   r-  collectionsr   
contextlibr   pathlibr   typingr   rZ   loggingr   	constantsr	   r
   r   r   dataclassesr   modelingr   r   otherr   r   r   versionsr   r%  rX   r   r   r,   r0   r@   rh   ry   r   r   strrP  r   r   r   r   r   r   dictr   r   	Optimizerr   r
  r:  r  rX  r_  r   r   r   r   <module>   sZ   




:
H
03
7.M!$o5