o
    Ti                     @   sV  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZ	d dl
m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZ eG dd dZd Zed	Zd
d Zdd Z dd Z!dd Z"dd Z#dd Z$dd Z%dd Z&dd Z'dd Z(dd Z)d d! Z*d"d# Z+d$d% Z,d&d' Z-G d(d) d)Z.d*d+ Z/d,d- Z0dOd/d0Z1		.	.dPd1d2Z2	3	.		.dQd4d5Z3dRd6d7Z4e5d8kr)e 6 Z7e7j8d9e9d:d; e7j8d<e9d=d; e7j8d>e9d3d?d@ e7j8dAd.dBdCdD e7j8dEdFe9ddGd@ e7j8dHdBdIdJ e7j8dKdLdBdMdJ e7: Z;e;jZe3e;j<e;j=e;j>e;j?e;j@e;jAdN dS dS )S    N)tqdm)OrderedDict)	dataclass)logger)

DS_VERSIONOPTIMIZER_STATE_DICTSINGLE_PARTITION_OF_FP32_GROUPSFP32_FLAT_GROUPS
ZERO_STAGEPARTITION_COUNTPARAM_SHAPESBUFFER_NAMESFROZEN_PARAM_SHAPESFROZEN_PARAM_FRAGMENTSc                   @   sF   e Zd ZU e ed< e ed< eed< eed< e ed< e ed< dS )zero_model_statebuffersparam_shapesshared_params
ds_versionfrozen_param_shapesfrozen_param_fragmentsN)__name__
__module____qualname__dict__annotations__listint r   r   P/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/utils/zero_to_fp32.pyr   '   s   
 


r   cpuc                 C   s   |   rt| S | S N)isdigitr   textr   r   r   atoi7   s   r%   c                 C   s   dd t d| D S )z
    alist.sort(key=natural_keys) sorts in human order
    http://nedbatchelder.com/blog/200712/human_sorting.html
    (See Toothy's implementation in the comments)
    c                 S      g | ]}t |qS r   )r%   ).0cr   r   r   
<listcomp>A       z natural_keys.<locals>.<listcomp>z(\d+))resplitr#   r   r   r   natural_keys;   s   r-   c                 C   sj   t j| std|  d|dkrt j| d}n|dkr%t j| d}t j|s3td| d|S )	NDirectory '' doesn't exist   zmp_rank_00_model_states.pt   z)zero_pp_rank_0_mp_rank_00_model_states.ptz!can't find model states file at '')ospathisdirFileNotFoundErrorjoinexists)checkpoint_dir
zero_stagefiler   r   r   get_model_state_fileD   s   r<   c                 C   sB   t ttj| |td}t|dkrtd| d|  d|S )N)keyr   zcan't find z files in directory 'r2   )sortedglobr3   r4   r7   r-   lenr6   )r9   glob_pattern
ckpt_filesr   r   r   get_checkpoint_filesT   s   rC   c                 C   
   t | dS )Nz*_optim_states.ptrC   r9   r   r   r   get_optim_files^      
rG   c                 C   rD   )Nz*_model_states.ptrE   rF   r   r   r   get_model_state_filesb   rH   rI   c              	      s  g }| D ]}t j|tdd}t|vrt| d|t  tr$td   fdd|d  D }|t }g }|D ]}|	 D ]}|
| q?q9|td }	|	d urctr[td|	  |t|		 7 }d	d
 |d  D }
|td }|td }t|||
||	|d}|
| q|S )NF)map_locationweights_onlyz  is not a model state checkpointzFound buffers:c                    s"   i | ]\}}| v r||  qS r   )floatr'   kvbuffer_namesr   r   
<dictcomp>r      " z&parse_model_states.<locals>.<dictcomp>modulezFound frozen_param_shapes: c                 S   s   g | ]\}}||gqS r   r   rM   r   r   r   r)      s    z&parse_model_states.<locals>.<listcomp>r   )r   r   r   r   r   r   )torchloaddevicer   
ValueErrordebugprintitemsr   keysappendgetr   r   r   r   r   )fileszero_model_statesr;   
state_dictr   r   param_namessnamer   r   r   r   z_model_stater   rP   r   parse_model_statesf   s@   
rf   c                    s  t | }g t| ddD ]}tj|tddd}|d dd  | qtd t vr6t	| d  dd t t }d t t
 }t|tu rPt|}||krbt	d	| d
| d| d|dkrit n|dkrpt nt	d|  fddtt D }|||fS )NzLoading checkpoint shardsdescTF)rJ   mmaprK   optimizer_state_dictr   z is not a zero checkpointz	Expected z of '*_optim_states.pt' under 'z' but found zu files. Possibly due to an overwrite of an old checkpoint, or a checkpoint didn't get saved by one or more processes.r0   r1   zunknown zero stage c                    s   g | ]
}| t    qS r   )r   )r'   ifp32_groups_keystate_dictsr   r   r)      s    z&parse_optim_states.<locals>.<listcomp>)r@   r   rU   rV   rW   popr]   r
   r   rX   r   typer   maxr   r	   range)r_   ds_checkpoint_dirtotal_filesfra   r:   
world_sizefp32_flat_groupsr   rl   r   parse_optim_states   s.   
rx   c                 C   s   t d|  d t| }t|| \}}}t d| d|  t| }t|}t d|d j  |dkr;t||||S |dkrFt||||S d	S )
z
    Returns fp32 state_dict reconstructed from ds checkpoint

    Args:
        - ``ds_checkpoint_dir``: path to the deepspeed checkpoint folder (where the optimizer files are)

    zProcessing zero checkpoint 'r2   z'Detected checkpoint of type zero stage z, world_size: z)Parsing checkpoint created by deepspeed==r   r0   r1   N)rZ   rG   rx   rI   rf   r   *_get_fp32_state_dict_from_zero2_checkpoint*_get_fp32_state_dict_from_zero3_checkpoint)rs   exclude_frozen_parametersoptim_filesr:   rv   rw   model_filesr`   r   r   r   )_get_fp32_state_dict_from_zero_checkpoint   s    r~   c                 C   s6  |d j d u st|d j dkrd S |d j }|d j}tr`tdd | D }tdt d|  t|}tdd | D }tdd | D }td	| d
 td| d| d d}d}	| D ]%\}
}|d7 }|	 }|	|7 }	||
 | |
< trt|
 d| d| d qhtd| d|	 d d S )Nr   c                 s       | ]}|  V  qd S r!   numelr'   rc   r   r   r   	<genexpr>       z-_zero2_merge_frozen_params.<locals>.<genexpr>zrank 0: 	.numel = c                 s   r   r!   r   r   r   r   r   r      r   c                 S      g | ]}|  qS r   r   r'   pr   r   r   r)      r*   z._zero2_merge_frozen_params.<locals>.<listcomp>Frozen params: Have  numels to process.Frozen params: Need  numels in  params    full shape:  unpartitioned numel  *Reconstructed Frozen fp32 state dict with  params 	 elements)
r   r@   r   rY   sumvaluesrZ   r   r[   r   )ra   r`   r   r   num_elemwanted_paramswanted_numelavail_numeltotal_paramstotal_numelrd   shapeunpartitioned_numelr   r   r   _zero2_merge_frozen_params   s.    

r   c                 C   s   t | |d }t|S r!   )getattrcallable)objfnattrr   r   r   _has_callable   s   r   c                    s2  |d j }tr-t|D ]!tt|d D ]}tt d d| d| | j  qqt|d }g }t|D ]fdd|D }t|d}	|	|	 q9t
dd |D }
trt
dd |D }t
d	d |D }td
|
 d td| d| d d}d}t||D ]\}}	d}|	 }
| D ]9\}}t|dr| nt|}||7 }|d7 }trt| d| d| d |	d|||| |< ||7 }qd|   fdd}trtd| d|
  ||}||
}
trtd| d|
  ||
krtd| d|
 dqtd| d| d d S ) Nr   [z][].shape=c                    s   g | ]}|  qS r   r   )r'   sd)rk   r   r   r)     r*   z1_zero2_merge_trainable_params.<locals>.<listcomp>c                 S   r   r   r   )r'   full_single_fp32_vectorr   r   r   r)     r*   c                 S   r&   r   )r@   r'   shapesr   r   r   r)     r*   c                 S   s"   g | ]}t d d | D qS )c                 s   r   r!   r   r'   r   r   r   r   r     r   z;_zero2_merge_trainable_params.<locals>.<listcomp>.<genexpr>)r   r   r   r   r   r   r)     rS   zHave r   zNeed r    params.r   r   r   r   r   r0   c                    s    t |    S r!   mathceil)x)align_tor   r   zero2_align2  s   z2_zero2_merge_trainable_params.<locals>.zero2_alignzoriginal offset=z, avail_numel=zaligned  offset=	consumed  numels out of  - something is wrongz#Reconstructed fp32 state dict with r   r   )r   rY   rr   r@   rZ   r	   r   rU   catr]   r   zipr   r[   r   r   prodnarrowviewrX   )ra   rv   rw   r`   r   jnum_param_groups&merged_single_partition_of_fp32_groupsmerged_partitionsr   r   r   r   r   r   r   offsetrd   r   r   r   r   )r   rk   r   _zero2_merge_trainable_params   sZ   
*

r   c                 C   s   t  }|d j}|| trtdt| d |s t|| t|| || |d jD ]}|d |v r>||d  ||d < q,|S Nr   zadded z buffersr   )	r   r   updaterY   rZ   r@   r   r   r   rv   rw   r`   r{   ra   r   pairr   r   r   ry   E  s   


ry   c                 C   s.   | | }|r
|| nd}t | | }||fS Nr   r   )r   rv   	remainderpadding_numelpartitioned_numelr   r   r   zero3_partitioned_param_info\  s   r   c                    s  |d j d u st|d j dkrd S trmt|D ]}tdd || j D }td| dt d|  q|d j }t|}tdd | D }tdd	 |d j D | }td
| d td| d| d d}	d}
|d j 	 D ]G\ }|	d7 }	|
 }|
|7 }
t fdd|D }t|ddd|||  < t||\}}trtd|	 d  d| d| d| 
 qxtd|	 d|
 d d S )Nr   c                 s   r   r!   r   r   r   r   r   r   i  r   z-_zero3_merge_frozen_params.<locals>.<genexpr>zrank z: r   c                 s   r   r!   r   r   r   r   r   r   n  r   c                 S   r   r   r   r   r   r   r   r)   o  r*   z._zero3_merge_frozen_params.<locals>.<listcomp>r   r   r   r   r   r   c                 3   s    | ]}|j   V  qd S r!   )r   )r'   model_staterd   r   r   r   z  s    zFrozen params: r   r    partition0 numel= partitioned_padding_numel=r   r   r   )r   r@   rY   rr   r   r   r   rZ   r   r[   r   tuplerU   r   r   r   r   )ra   rv   r`   rk   r   r   r   r   r   r   r   r   r   param_fragsr   partitioned_padding_numelr   r   r   _zero3_merge_frozen_paramsc  s6    
   r   c                   @   s    e Zd ZdZdd Zdd ZdS )GatheredTensorz|
    A pseudo tensor that collects partitioned weights.
    It is more memory efficient when there are multiple groups.
    c                 C   s4   || _ || _|| _|| _|| _| j d d j| _d S r   )flat_groupsflat_groups_offsetr   r   r   dtype)selfr   r   r   r   r   r   r   r   __init__  s   zGatheredTensor.__init__c                 C   s8  | j | j }t| j}g }t|D ]r}| j| }d}d}tt| jD ]1}| j| | j   kr8| j|d  k r<n n|}| j| |  k rN| j|d  krTn q#|} nq#t||d D ]&}|| }	| j | j|  }
t|| j|d  | j|  }||	|
|  q\qtj	|dd}|d| j
  | j
 }|S )zR
        Merge partitioned weights from flat_groups into a single tensor.
        Nr   r   )dim)r   r   r@   r   rr   r   minr]   rU   r   r   r   r   
contiguous)r   end_idxrv   pad_flat_param_chunksrank_iflat_groups_at_rank_istart_group_idend_group_idgroup_idflat_tensorstart_offset
end_offsetpad_flat_paramparamr   r   r   r     s.   

*(zGatheredTensor.contiguousN)r   r   r   __doc__r   r   r   r   r   r   r     s    r   c                 C   s  |d j }tdd |d D | }dd |D }tr[t|D ]}tt d| d|| j  qt|}tdd	 | D }|d 	 | }td
| d td| d| d d}	d}
d}dgt
tdd |d D  }t| ddD ]<\}}|	 }|
|7 }
|d7 }t||\}}trtd| d| d| d| d| 
 t|||	||}|| |< |	|7 }	qz|	|9 }	|	|krtd|	 d| dtd| d|
 d d S )Nr   c                 S   r   r   r   )r'   
flat_groupr   r   r   r)     r*   z1_zero3_merge_trainable_params.<locals>.<listcomp>c                 S   s$   i | ]}|  D ]\}}||qqS r   )r[   )r'   drN   rO   r   r   r   rR     s   $ z1_zero3_merge_trainable_params.<locals>.<dictcomp>r   r   c                 s   r   r!   r   r   r   r   r   r     r   z0_zero3_merge_trainable_params.<locals>.<genexpr>zTrainable params: Have r   zTrainable params: Need r   r   c                 S   r   r   r   )r'   r   r   r   r   r)     r*   zGathering sharded weightsrg   r   zTrainable params: r   r   r   r   r   r   r   z-Reconstructed Trainable fp32 state dict with r   r   )r   r   rY   rr   rZ   r	   r   r@   r   r   r   npcumsumr   r[   r   r   rX   )ra   rv   rw   r`   r   r   rk   r   r   r   r   r   r   rd   r   r   r   r   tensorr   r   r   _zero3_merge_trainable_params  s>   
 " 
r   c                 C   s   t  }|d j}|| trtdt| d |s!t|| | t|| || |d jD ]}|d |v r?||d  ||d < q-|S r   )	r   r   r   rY   rZ   r@   r   r   r   r   r   r   r   rz     s   

rz   Fc                 C   st   i }i }|   D ]/\}}t|}||v r|||  }|||< q|||< |r1tj|j|jd||< q| ||< q|S )z>
    Convert state_dict of GatheredTensor to torch tensor
    )r   )r[   idrU   emptyr   r   r   )ra   return_empty_tensortorch_state_dictconverted_tensorsrd   r   	tensor_idshared_tensorr   r   r   to_torch_tensor  s   
r   c                 C   s   |du r4t j| d}t j|r-t|d}|  }W d   n1 s'w   Y  ntd| t j| |}t j|sIt	d| dt
||}|rR|S t|S )a  
    Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated state_dict that can be loaded with
    ``load_state_dict()`` and used for training without DeepSpeed or shared with others, for example
    via a model hub.

    Args:
        - ``checkpoint_dir``: path to the desired checkpoint folder
        - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in 'latest' file. e.g., ``global_step14``
        - ``exclude_frozen_parameters``: exclude frozen parameters
        - ``lazy_mode``: get state_dict in lazy mode. It returns a dict of pesduo tensor instead of torch tensor, which is more memory efficient.
          Convert the pesduo tensor to torch tensor by ``.contiguous()``

    Returns:
        - pytorch ``state_dict``

    A typical usage might be ::

        from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
        # do the training and checkpoint saving
        state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir) # already on cpu
        model = model.cpu() # move to cpu
        model.load_state_dict(state_dict)
        # submit to model hub or save the model to share with others

    In this example the ``model`` will no longer be usable in the deepspeed context of the same
    application. i.e. you will need to re-initialize the deepspeed engine, since
    ``model.load_state_dict(state_dict)`` will remove all the deepspeed magic from it.

    If you want it all done for you, use ``load_state_dict_from_zero_checkpoint`` instead.

    Note: the above usage may not work if your application doesn't have sufficient free CPU memory.
    You may need to use the offline approach using the ``zero_to_fp32.py`` script that is saved with
    the checkpoint. Or you can load state_dict in lazy mode ::

        from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
        state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir, lazy_mode=True) # not on cpu
        for name, lazy_tensor in state_dict.item():
            tensor = lazy_tensor.contiguous()  # to cpu
            print(name, tensor)
            # del tensor to release memory if it no longer in use
    Nlatestrz Unable to find 'latest' file at r.   r/   )r3   r4   r7   isfileopenreadstriprX   r5   r6   r~   r   )r9   tagr{   	lazy_modelatest_pathfdrs   ra   r   r   r   (get_fp32_state_dict_from_zero_checkpoint  s   -
r  5GBc                    s  |rzddl m} W n ty   td  w |dur0zddlm} W n ty/   td  w t| ||dd |r<d	nd
}|durZ|dddd}	t dd}
||
|	|d}nddl	m
} |dddg}|d|t  id}tj|dd |j }t|ddD ]>\}} fdd|D }t|}tj||}|r|||ddid nt|| t| D ]} |= ||= q~t  q|jr|j|jd}|rd nd!}tj||}t|d"d#d$}tj|d%dd&d' }|| W d   dS 1 sw   Y  dS dS )(a  
    Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated ``state_dict`` file that can be
    loaded with ``torch.load(file)`` + ``load_state_dict()`` and used for training without DeepSpeed.

    Args:
        - ``checkpoint_dir``: path to the desired checkpoint folder. (one that contains the tag-folder, like ``global_step14``)
        - ``output_dir``: directory to the pytorch fp32 state_dict output files
        - ``max_shard_size``: the maximum size for a checkpoint before being sharded, default value is 5GB
        - ``safe_serialization``:  whether to save the model using `safetensors` or the traditional PyTorch way (that uses `pickle`).
        - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in the file named ``latest`` in the checkpoint folder, e.g., ``global_step14``
        - ``exclude_frozen_parameters``: exclude frozen parameters
    r   )	save_filezIIf you want to use `safe_serialization`, please `pip install safetensors`N)"split_torch_state_dict_into_shardszIIf you want to use `max_shard_size`, please `pip install huggingface_hub`T)r   zmodel.safetensorszpytorch_model.binz.binz{suffix}.binz.safetensorsz{suffix}.safetensors)r   )filename_patternmax_shard_size)
namedtupleStateDictSplit
is_shardedfilename_to_tensorsF)r	  r
  )exist_okzSaving checkpoint shardsrg   c                    s   i | ]}| | qS r   r   )r'   tensor_namera   r   r   rR     s    z>convert_zero_checkpoint_to_fp32_state_dict.<locals>.<dictcomp>formatpt)metadata)r  
weight_mapzmodel.safetensors.index.jsonzpytorch_model.bin.index.jsonwzutf-8)encodingr0   )indent	sort_keys
)safetensors.torchr  ImportErrorrZ   huggingface_hubr  r  replacer   collectionsr  r   r\   r3   makedirsr
  r[   r   r4   r7   rU   savegccollectr	  r  tensor_to_filenamer   jsondumpswrite)r9   
output_dirr  safe_serializationr   r{   r  r  weights_namer  empty_state_dictstate_dict_splitr  r  r
  
shard_filetensorsshard_state_dictoutput_pathr  indexsave_index_fileru   contentr   r  r   *convert_zero_checkpoint_to_fp32_state_dictV  sr   

"r0  c                 C   s8   t d t||}t d |  } | j|dd | S )ay  
    1. Put the provided model to cpu
    2. Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated ``state_dict``
    3. Load it into the provided model

    Args:
        - ``model``: the model object to update
        - ``checkpoint_dir``: path to the desired checkpoint folder. (one that contains the tag-folder, like ``global_step14``)
        - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in the file named ``latest`` in the checkpoint folder, e.g., ``global_step14``

    Returns:
        - ``model`: modified model

    Make sure you have plenty of CPU memory available before you call this function. If you don't
    have enough use the ``zero_to_fp32.py`` utility to do the conversion. You will find it
    conveniently placed for you in the checkpoint folder.

    A typical usage might be ::

        from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
        model = load_state_dict_from_zero_checkpoint(trainer.model, checkpoint_dir)
        # submit to model hub or save the model to share with others

    Note, that once this was run, the ``model`` will no longer be usable in the deepspeed context
    of the same application. i.e. you will need to re-initialize the deepspeed engine, since
    ``model.load_state_dict(state_dict)`` will remove all the deepspeed magic from it.

    zExtracting fp32 weightsz#Overwriting model with fp32 weightsF)strict)r   infor  r    load_state_dict)modelr9   r   ra   r   r   r   $load_state_dict_from_zero_checkpoint  s   


r5  __main__r9   z?path to the desired checkpoint folder, e.g., path/checkpoint-12)rp   helpr$  zVdirectory to the pytorch fp32 state_dict output files(e.g. path/checkpoint-12-output/)z--max_shard_sizea@  The maximum size for a checkpoint before being sharded. Checkpoints shard will then be each of sizelower than this size. If expressed as a string, needs to be digits followed by a unit (like `5MB`We default it to 5GB in order for models to be able to run easily on free-tier google colab instanceswithout CPU OOM issues.)rp   defaultr7  z--safe_serialization
store_truezbWhether to save the model using `safetensors` or the traditional PyTorch way (that uses `pickle`).)r8  actionr7  z-tz--tagzMcheckpoint tag used as a unique identifier for checkpoint. e.g., global_step1z--exclude_frozen_parameterszexclude frozen parameters)r:  r7  z-dz--debugzenable debug)r  r%  r   r{   )F)NFF)r  FNFr!   )BargparserU   r?   r   r3   r+   r  r!  numpyr   r   r  r   dataclassesr   deepspeed.utilsr   deepspeed.checkpoint.constantsr   r   r   r	   r
   r   r   r   r   r   r   rY   rW   r%   r-   r<   rC   rG   rI   rf   rx   r~   r   r   r   ry   r   r   r   r   rz   r   r  r0  r5  r   ArgumentParserparseradd_argumentstr
parse_argsargsr9   r$  r  r%  r   r{   r   r   r   r   <module>   s   0	
	
.( I$.5

C

U
'
