o
    Ti                    @   s2  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlZd dl	Z	d dlm
Z
 d dlmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d d	lmZ d
dlmZ d dlmZ d dlZddlmZmZ d dlmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z% d dlm&Z&m'Z' d dl(m)Z) d dl*m+Z+m,Z,m-Z-m.Z.m/Z/ d dl0m1Z1 ddl2m3Z3m4Z4 d dl5m6Z6m7Z7m8Z8m9Z9 d gZ:d a;da<G dd dZ=G dd dZ>dXdedefdd Z?dYd"d#Z@d$eAd%dfd&d'ZBd(d) ZCd*d+ ZDd,d- ZEG d.d/ d/eZFG d0d1 d1eZGejHZIejJZKejLZMejNZOejPZQejRZSejTZUejVZWd2ed3ejXd%efd4d5ZYd6ejXd%efd7d8ZZdZd:d;Z[e&d<ed%dfd=d>Z\d!a]dZ^i Z_G d?d@ d@e`ZadAdB ZbdCdD ZcG dEdF dFZdG dGdH dHZeG dIdJ dJZfG dKdL dLZgG dMdN dNZhG dOdP dPZidQee d%eefdRdSZjG dTdU dUeaZkG dVdW dWZldS )[    N)CallableIterable)Enum)List)defaultdictTensor)comm)Module)	Parameter   )zero3_linear_wrap)groups   )see_memory_usageget_only_unique_item)DeepSpeedZeroConfig)assert_ints_same_as_other_ranksis_zero_param)OffloadDeviceEnum)get_config_default)instrument_w_nvtxlogger)init_distributed)debug_param2name_id_shape debug_param2name_id_shape_devicedebug_module2namedebug_param2name_id debug_param2name_id_shape_status)get_accelerator) AsyncPartitionedParameterSwapperPartitionedParamStatus)_quantize_paramWEIGHT_QUANTIZATION_LAYERSwrap_quantized_functionalwrap_load_from_state_dictc                   @   s(   e Zd ZdeddfddZdddZdS )	NoGatherHandleparamreturnNc                 C   s   |j tjkrtd|  dt|jdr/tj	|jj
|jjjt  dd|j|_
n|jj
jt  dd|j|_
|| _d S )Nexpected param  to be availableds_quant_scaleTdevicenon_blocking)	ds_statusZeroParamStatusINFLIGHTRuntimeError
ds_summaryhasattr	ds_tensorInitquantizer_module
dequantizedatar+   tor   current_device_nameviewds_shape_NoGatherHandle__paramselfr'    rA   _/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py__init__.   s   

zNoGatherHandle.__init__c                 K   s&   t   st     tj| j_d S N)r   resolves_data_dependencycurrent_streamsynchronizer0   	AVAILABLEr>   r/   )r@   kwargsrA   rA   rB   wait:   s   
zNoGatherHandle.waitr(   N__name__
__module____qualname__r   rC   rJ   rA   rA   rA   rB   r&   ,   s    r&   c                   @   0   e Zd Zdee ddfddZedddZdS )	NoGatherCoalescedHandleparamsr(   Nc                 C   s   || _ d| _| j D ]B}|jtjkrtd|  dt|jdr:t	j
|jj|jjjt  dd|j|_q	|jjjt  dd|j|_q	d S )NFr)    to not be availabler+   Tr,   ) _NoGatherCoalescedHandle__params"_NoGatherCoalescedHandle__completer/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r+   r:   r   r;   r<   r=   )r@   rR   r'   rA   rA   rB   rC   B   s"   

z NoGatherCoalescedHandle.__init__c                 K   s`   | j rd S t  st    | jD ]}|jtjks&J d|	  dtj
|_qd| _ d S Nr)    to be inflightT)rU   r   rE   rF   rG   rT   r/   r0   r1   r3   rH   )r@   rI   r'   rA   rA   rB   rJ   P   s   

 

zNoGatherCoalescedHandle.waitrK   rM   rN   rO   r   r   rC   r   rJ   rA   rA   rA   rB   rQ   @   s    rQ   input_tensoroutput_tensorc                 C   s   t tj|| |ddS )NTgroupasync_op)r   distallgather_fn)rY   rZ   r\   rA   rA   rB   _dist_allgather_fn^   s   r`   Fc                 C   s,   t  }|dkr|s|rt|  d S d S d S Nr   )r^   get_rankprint)messagedebugforcerankrA   rA   rB   print_rank_0b   s   rh   msgr(   c                 C   s   t  dkrt|  d S d S ra   )r^   rb   r   re   )ri   rA   rA   rB   debug_rank0m   s   rj   c                    sH   t | ds"i | _dd   fdd}t | | _t|| | _d S d S )N_external_paramsc                 S   s
   | j  S rD   )rk   itemsr@   rA   rA   rB   external_parametersv      
z2_init_external_params.<locals>.external_parametersc                    s   t | j| dd | S )NFrecurse)	itertoolschainnamed_parametersrm   rn   rA   rB   all_parametersy   s   z-_init_external_params.<locals>.all_parameters)r4   rk   types
MethodTypeds_external_parametersrv   )modulerv   rA   ru   rB   _init_external_paramsr   s   
r{   c                 C   s>   t |tjjstdt| dst|  t|}|| j|< dS )a  Instruct DeepSpeed to coordinate ``parameter``'s collection and partitioning in
    the forward and backward passes of ``module``.

    This is used when a parameter is accessed outside of its owning module's
    ``forward()``. DeepSpeed must know to collect it from its partitioned
    state and when to release the memory.

    .. note::
        This is only applicable to training with ZeRO stage 3.

    Args:
        module (``torch.nn.Module``): The module that requires ``parameter`` in its forward pass.
        parameter (``torch.nn.Parameter``): The parameter to register.

    Raises:
        RuntimeError: If ``parameter`` is not of type ``torch.nn.Parameter``.


    Examples
    ========

    #. Register a weight that is used in another module's forward pass (line 6).
       Parameter ``layer1.weight`` is used by ``layer2`` (line 11).

        .. code-block:: python
            :linenos:
            :emphasize-lines: 6,11

            class ModuleZ3(torch.nn.Module):
                def __init__(self, *args):
                    super().__init__(self, *args)
                    self.layer1 = SomeLayer()
                    self.layer2 = OtherLayer()
                    deepspeed.zero.register_external_parameter(self, self.layer1.weight)

                def forward(self, input):
                    x = self.layer1(input)
                    # self.layer1.weight is required by self.layer2.forward
                    y = self.layer2(x, self.layer1.weight)
                    return y
    %Parameter is not a torch.nn.Parameterrk   N)	
isinstancetorchnnr   r2   r4   r{   idrk   rz   	parameterkeyrA   rA   rB   register_external_parameter   s   *
r   c                 C   sJ   t |tjjstdt| drt|| jvrtdt|}| j|= dS )a  Reverses the effects of :meth:`register_external_parameter`.

    Args:
        module (``torch.nn.Module``): The module to affect.
        parameter (``torch.nn.Parameter``): The parameter to unregister.

    Raises:
        RuntimeError: If ``parameter`` is not of type ``torch.nn.Parameter``.
        RuntimeError: If ``parameter`` is not a registered external parameter of ``module``.
    r|   rk   z;Parameter is not a registered external parameter of module.N)r}   r~   r   r   r2   r4   r   rk   r   rA   rA   rB   unregister_external_parameter   s   r   c                   @      e Zd ZdZdZdZdS )ZeroParamTyper   r      N)rM   rN   rO   NORMALPARTITIONEDREMOTErA   rA   rA   rB   r      s    r   c                   @   r   )r0   r   r   r   N)rM   rN   rO   rH   NOT_AVAILABLEr1   rA   rA   rA   rB   r0      s    r0   fntarget_fp_dtypec                    s   dt f fdd}|S )Nr(   c                     sT   | dd d u rtt tjd |d<  | i |}| r(|j	|_|S )Nr-   
LOCAL_RANK)
getr~   r-   r   device_nameosenvironis_floating_pointr9   r:   )argsrI   tensorr   r   rA   rB   
wrapped_fn   s   z:zero_wrapper_for_fp_tensor_constructor.<locals>.wrapped_fnr   )r   r   r   rA   r   rB   &zero_wrapper_for_fp_tensor_constructor   s   	r   dtypec                    s   dt f fdd}|S )Nr(   c                    sN   t t tjd }|sd}td|dj|i |}| r%|	 }|S )Nr   r   r   )r-   )
r~   r-   r   r   r   r   _orig_torch_empty	new_emptyr   r:   )clsr   rI   r-   r   r   rA   rB   
new_tensor   s   
z/get_new_tensor_fn_for_dtype.<locals>.new_tensorr   )r   r   rA   r   rB   get_new_tensor_fn_for_dtype   s   
r   Tc                    s4   g  fdd  |  t }|r||  |S )Nc                    s$   |   D ]}|  | qd S rD   )__subclasses__append)clsubclassrq   subclass_listrA   rB   rq     s   

z#get_all_subclasses.<locals>.recurse)setadd)r   include_rootretrA   r   rB   get_all_subclasses	  s   
r   r'   c                 C   s^   | j r	J |  t | jrt  s| jt   tj	d| j
| jd| _tj| _dS )z'Free underlying storage of a parameter.r   r   r-   N)ds_active_sub_modulesr3   r   on_acceleratorr9   is_synchronized_devicerecord_streamrF   r~   emptyr   r-   r0   r   r/   r'   rA   rA   rB   
free_param  s   
r   c                   @   s^   e Zd ZdZdZdddZdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd ZdS )&InsertPostInitMethodToModuleSubClassesr   TNc                 C   s   || _ || _| || | jtjtjtjfv s J d| j dt | _	d| _
d | _|d ur<|jr>|jjr@|jj| _d S d S d S d S )NzInvalid data type z>, allowed values are [torch.half, torch.bfloat16, torch.float]r   )mem_efficient_linearenabled
_set_dtyper   r~   halfbfloat16floatr   wrapped_clsskip_init_depthquantized_initializationweight_quantization_config)r@   r   r   	ds_configr   rA   rA   rB   rC   2  s   z/InsertPostInitMethodToModuleSubClasses.__init__c                 C   s*   | j sd S tdkr|   | atd7 ad S )Nr   r   )r   zero_init_contextpatch_init_and_builtinstop_level_contextrm   rA   rA   rB   	__enter__@  s   z0InsertPostInitMethodToModuleSubClasses.__enter__c                 C   sl   | j sd S td8 atdkr.|   d at dkr.tjd }tj}t	
d| d|dd |d ur4dS d S )	Nr   r   g    eAz+finished initializing model - num_params = z, num_elems = z.2fBF)r   r   unpatch_init_and_builtinsr   r^   rb   r   num_module_elementsnum_module_parametersr   info)r@   exc_type	exc_value	tracebackbillion_elems
num_paramsrA   rA   rB   __exit__L  s   
z/InsertPostInitMethodToModuleSubClasses.__exit__c                 C   s   d S rD   rA   )r@   rz   rA   rA   rB   _post_init_methodd  s   z8InsertPostInitMethodToModuleSubClasses._post_init_methodc                 C   s   |d ur*|d u r*|j r|jrtd|j rtj| _d S |jr$tj| _d S tj| _d S t 	 r4|p3tj
n	t jr;tjntj| _d S )Nz+bfloat16 and fp16 cannot be enabled at once)bfloat16_enabledfp16_enabledr2   r~   r   r   r   r   r   is_fp16_supportedfloat16is_bf16_supportedfloat32)r@   r   r   rA   rA   rB   r   g  s   z1InsertPostInitMethodToModuleSubClasses._set_dtypec                    sp  dt dt fdd}fdd  fddfd	d
}fdd}ttjjjjD ]}|| q*tjjjjjtjjjj_tjjjjj	tjjjj_
tjjtj_t|tjjjj_tjri|tjjjjj
tjjjj_	  jrtddd ttdstjjjt_ttjj_jrtddd ttjjjtjj_ttjjjtjj_tD ]}t|j|_qt !d d_"d S )Norig_module_apply_fnr(   c                    s<   dt dt fdd tdtdt ddf fdd}|S )	a  many models make use of child modules like Linear or Embedding which
            perform their own weight initialization in their __init__ methods,
            but will then have more weight initialization in a parent module's __init__
            method that modifies weights of child modules, which is typically done
            using the Module.apply method.

            since the Init context manager partitions child modules immediately after
            they are initialized, without modifying apply we would entirely skip
            any initialization done by parent modules.

            to get around this issue, we wrap the function passed to Module.apply
            so that the applied function is applied to child modules correctly.
            fn_to_applyr(   c                    s8   t  dr S t dtdd f fdd}d|_|S )Nwrappedmodule_to_apply_fn_tor(   c                    sx   t tdd | jddD dd d}|D ]}|  q |  |D ]}tj|jd|jd	 q"|D ]}|jd
d q1dS )a  gathers parameters before calling apply function. afterwards
                    parameters are broadcasted to ensure consistency across all ranks
                    then re-partitioned.

                    takes the following steps:
                    1. allgathers parameters for the current module being worked on
                    2. calls the original function
                    3. broadcasts root rank's parameters to the other ranks
                    4. re-partitions the parameters
                    c                 S   s   g | ]}t |r|qS rA   r   .0prA   rA   rB   
<listcomp>      zInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gather.<locals>.get_wrapped_fn_to_apply.<locals>.wrapped_fn_to_apply.<locals>.<listcomp>Frp   c                 S      | j S rD   ds_idr   rA   rA   rB   <lambda>      zInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gather.<locals>.get_wrapped_fn_to_apply.<locals>.wrapped_fn_to_apply.<locals>.<lambda>r   r   r\   T)has_been_updatedN)	listsorted
parameters
all_gatherr^   	broadcastr9   ds_process_group	partition)r   params_to_apply_fn_tor'   r   rA   rB   wrapped_fn_to_apply  s   
zInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gather.<locals>.get_wrapped_fn_to_apply.<locals>.wrapped_fn_to_applyT)r4   	functoolswrapsr
   r   )r   r   rA   r   rB   get_wrapped_fn_to_apply  s   
!zzInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gather.<locals>.get_wrapped_fn_to_applyrz   Nc                    s   |  | d S rD   rA   )rz   r   r   r   rA   rB   wrapped_apply  s   zpInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gather.<locals>.wrapped_apply)r   r   r   r
   )r   r   rA   r   rB   apply_with_gatherx  s   *zYInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.apply_with_gatherc                    s\   fdd fdd}fdd}dd  t tjjjjD ]}|| q|j_d S )	Nc                    s   t   fdd}|S )Nc                    s$    | g|R i |} | |S rD   )r   )rz   r   rI   _module)fr@   rA   rB   wrapper  s   
zInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>.partition_after_empty_init.<locals>.wrapperr   r   r   r   rm   r   rB   partition_after_empty_init  s   z~InsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>.partition_after_empty_initc                    s   t   fdd}|S )Nc                     s6   | i |}t tjjjjD ]} | q_|S rD   r   r~   r   modulesrz   r
   to_empty)r   rI   resr   )_disable_class_applyr   rz   rA   rB   r     s
   
zInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>.post_wrapper_to_empty.<locals>.wrapperr   r   )r  rz   r   rB   post_wrapper_to_empty  s   	zyInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>.post_wrapper_to_emptyc                    &   d| j v r| j| _ | j| _d S d S )N_apply)__dict__r  _old_apply_of_skip_init_hookr   )r   rA   rB   _enable_class_apply     
zwInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>._enable_class_applyc                 S      t | dr| j| _d S d S )Nr
  )r4   r
  r  r  rA   rA   rB   r       
zxInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_init.<locals>._disable_class_applyr  )rz   r  r  r   rm   )r  rz   r   rB   hook_for_skip_init  s   
zZInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.hook_for_skip_initc                    s   t   fdd}|S )Nc                    s   t d| jj dd d}t| dsd}t| dd d|v o#|d dk}|r- jd7  _ | g|R i | |rCjdkrC|  |r_t| d t d	| jj dd jd
kr_|  t d| jj dd |ru jd8  _d S d S )NzBefore initializing Frf   _ds_child_enteredTr-   metar   zRunning post_init for r   z-After initializing followed by post init for )rh   	__class__rM   r4   setattrr   delattrr   )rz   r   rI   is_child_moduleinit_on_meta)r   r  r@   rA   rB   r     s(   



zhInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.partition_after.<locals>.wrapperr   r   )r  r@   r   rB   partition_after  s   &zWInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>.partition_afterc                    r  NrC   r	  rC   	_old_initr  r  rA   rB   _enable_class  r  zUInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>._enable_classc                    r  r  r  )r   rI   r  rA   rB   _init_subclass  r  zVInsertPostInitMethodToModuleSubClasses.patch_init_and_builtins.<locals>._init_subclasszwnn.functional.linear has been overridden with a more memory efficient version. This will persist unless manually reset.Fr  	linear_bkzGnn.functional.linear has been overridden with quantized linear version.z+Enable Zero3 engine with INT4 quantization.T)#r   r   r~   r   r  rz   r
   __init_subclass___old_init_subclassapply
_old_applyr   __new____old_new__classmethodr6   override_module_apply_add_tensor_creation_wrappersr   rh   r4   r   
functionallinearr   r   r   r$   	embeddingr#   r%   _load_from_state_dictr   r   patched)r@   r   r  r  r   r   rA   )r  r  r@   rB   r   v  s<   ?/+




z>InsertPostInitMethodToModuleSubClasses.patch_init_and_builtinsc                 C   sx   | j r:dd }ttjjjjD ]}|| qtjjjjjtjjjj_t	j
r1tjjjjjtjjjj_|   d| _ d S d S )Nc                 S   r  )Nr  )r4   r  rC   r  rA   rA   rB   _disable_classA  r  zXInsertPostInitMethodToModuleSubClasses.unpatch_init_and_builtins.<locals>._disable_classF)r.  r   r~   r   r  rz   r
   r"  r!  r6   r(  r$  r#   _remove_tensor_creation_wrappers)r@   r/  r   rA   rA   rB   r   >  s   

z@InsertPostInitMethodToModuleSubClasses.unpatch_init_and_builtinsc                 C   s   t | jtj_tt| jt_tt| jt_	tt
| jt_tt| jt_tt| jt_tt| jt_tt| jt_tt| jt_d S rD   )r   r   r~   r   r%  r   _orig_torch_tensorr   r   r   _orig_torch_zeroszeros_orig_torch_onesones_orig_torch_fullfull_orig_torch_arangearange_orig_torch_eyeeye_orig_torch_randnrandnrm   rA   rA   rB   r)  Q  s   zDInsertPostInitMethodToModuleSubClasses._add_tensor_creation_wrappersc                 C   s@   t jjt j_tt _tt _tt _	t
t _tt _tt _tt _tt _d S rD   )r~   r   r&  r%  r1  r   r   r   r2  r3  r4  r5  r6  r7  r8  r9  r:  r;  r<  r=  rm   rA   rA   rB   r0  \  s   
zGInsertPostInitMethodToModuleSubClasses._remove_tensor_creation_wrappers)TTNN)rM   rN   rO   r   r   rC   r   r   r   r   r   r   r)  r0  rA   rA   rA   rB   r   .  s    
 Ir   c                   C      t rt   dS dS )z
    This function is used to initialize deepspeed engine inside the context of Init.
    We need to remove the wrappers but keep the context.
    N)r   r   rA   rA   rA   rB   shutdown_init_contexth  s   r?  c                   C   r>  )z^
    This function is used to restore the wrappers after deepspeed engine is initialized.
    N)r   r   rA   rA   rA   rB   restore_init_contextq  s   r@  c                   @   s,   e Zd Zd	deddfddZd
dddZdS )AllGatherHandleNr'   r(   c                 C   s6   |j tjkrtd|  d|| _|| _|| _d S )Nr)   r*   )r/   r0   r1   r2   r3   _AllGatherHandle__handle_AllGatherHandle__param_AllGatherHandle__quantization)r@   handler'   quantizationrA   rA   rB   rC   {  s
   
zAllGatherHandle.__init__Tc                 C   sX   t | jj  | jr%t | jjj  | jj| jj| jj	| j
j| j
_tj| j
_d S rD   )r   rB  rJ   rD  quant_handlebackendr8   quantized_paramscale_bufferr:   rC  r-   r9   r0   rH   r/   )r@   handle_dependencyrA   rA   rB   rJ     s   
zAllGatherHandle.waitrD   TrK   rL   rA   rA   rA   rB   rA  y  s    rA  c                	   @   sT   e Zd Zg Z		ddee dee deddfddZe	ddd
dZ
edd ZdS )AllGatherCoalescedHandleFNrR   
partitions
world_sizer(   c                 C   sZ   || _ || _|| _|| _|| _d| _|| _| jD ]}|jtj	kr*t
d|  dqd S NFr)   rS   )allgather_handlerR   rN  rO  use_secondary_tensorcompleterF  r/   r0   r1   r2   r3   )r@   rQ  rR   rN  rO  rR  rF  r'   rA   rA   rB   rC     s   	
z!AllGatherCoalescedHandle.__init__Tc              	   C   s  | j rd S t| jj  | jrFt| jjj  | jj| jj| jj	
| jd j}g | _t| jD ]}| j|d| jj| | jj q2d}| jD ]n}|jtjks]J d|  dg }|jj}| jrk||j9 }t| jD ] }|| }	|	|jk r| j| d|t|j|	 |}
||
 qpttj||j|_ tj!|_t" # s|r|D ]
}
|
$t" %  q||7 }qKd| _ t" # s|st&j'| d S d S d S )Nr   r)   rW   T)(rS  r   rQ  rJ   rF  rG  rH  r8   rI  rJ  r:   rR   r-   rN  rangerO  r   narrowpartition_szr/   r0   r1   r3   r5   ds_numelrR  !ds_secondary_tensor_num_of_groupsminr~   catr<   r=   r9   rH   r   r   r   rF   rM  data_buffer)r@   rK  flat_tensoriparam_offsetr'   rN  ds_tensor_numelrg   param_startpart_to_copyrA   rA   rB   rJ     sP   
 



zAllGatherCoalescedHandle.waitc                   C   s
   g t _d S rD   )rM  r[  rA   rA   rA   rB   free_buffer  s   
z$AllGatherCoalescedHandle.free_bufferFNrL  rK   )rM   rN   rO   r[  r   r   r   intrC   r   rJ   staticmethodrb  rA   rA   rA   rB   rM    s"    
+rM  c                   @   s*   e Zd Zdee fddZd	d
ddZdS )MultipleAllGatherHandleshandlesc                 C   s
   || _ d S rD   )rg  )r@   rg  rA   rA   rB   rC     ro   z!MultipleAllGatherHandles.__init__Tr(   Nc                 C   s   | j D ]}|| qd S rD   )rg  rJ   )r@   rK  rE  rA   rA   rB   rJ     r  zMultipleAllGatherHandles.waitrL  rK   )rM   rN   rO   r   rM  rC   rJ   rA   rA   rA   rB   rf    s    rf  c                   @   rP   )	AllReduceCoalescedHandlerR   r(   Nc                 C   sB   || _ || _d| _| jD ]}|jtjkrtd|  dqd S rP  )rE  rR   rS  r/   r0   r1   r2   r3   )r@   rE  rR   r'   rA   rA   rB   rC     s   
z!AllReduceCoalescedHandle.__init__c                 C   sV   | j rd S t| jj  | jD ]}|jtjks!J d|  dtj	|_qd| _ d S rV   )
rS  r   rE  rJ   rR   r/   r0   r1   r3   rH   r?   rA   rA   rB   rJ     s   
 

zAllReduceCoalescedHandle.waitrK   rX   rA   rA   rA   rB   rh    s    	rh  c                   @   s   e Zd ZdddZdS )QuantizationInfor(   Nc                 C   s   d | _ d | _d | _d | _d S rD   )rI  rH  rG  rJ  rm   rA   rA   rB   rC     s   
zQuantizationInfo.__init__rK   )rM   rN   rO   rC   rA   rA   rA   rB   ri    s    ri  c                   @   s:   e Zd ZdZdZe ZdZdddZdddZ	d	d
 Z
dS )CUDAQuantizerTi@  Nr(   c                 C   s$   t jd u rtjj  t _d S d S rD   )rj  quantizer_cuda_module	deepspeedops
op_builderQuantizerBuilderloadrm   rA   rA   rB   rC     s   
zCUDAQuantizer.__init__c                 C   sT  |d u rz	| j |  }W n ty   t| | j }|| k r9| d|  dkr/n
|d7 }|| k s$	 | d| d  dkrT| | | jkrT|d9 }nnq:| d|  dkslJ d|  d| | | dk sJ |  d	| d
| |ksJ d|  || j | < Y nw | j|t	 
 |d| jjS )N   r   r   Tr   zGQantized weight requires the number of weights be a multiple of 8. Yet z cannot be divided by 8*i>  z / z is larger than 16kzNAdaptive grouping algorithm cannot find a group size for input tensor of size )group_size_cachenumelKeyErrormathceiltarget_group_sizerk  quantizer:   r   r   	Symmetric)r@   r'   r   rA   rA   rB   rx    sB   
(zCUDAQuantizer.quantizec                 C   s   | j ||| d| j jS )Nrq  )rk  r8   rs  ry  )r@   rI  scalerA   rA   rB   r8   &  s   zCUDAQuantizer.dequantizerK   rD   )rM   rN   rO   
async_flagrw  dictrr  rk  rC   rx  r8   rA   rA   rA   rB   rj    s    

rj  rR   c                 C   sb   | D ]}|j tjkrtd|  tj|_ qt| dd d} t| dkr-| \}t|S t	| S )Nz<expect param.ds_status == ZeroParamStatus.NOT_AVAILABLE, gotc                 S   r   rD   r   r   rA   rA   rB   r   1  r   z&_no_gather_coalesced.<locals>.<lambda>r   r   )
r/   r0   r   r2   r3   r1   r   lenr&   rQ   )rR   r'   rA   rA   rB   _no_gather_coalesced+  s   
r~  c                       sd  e Zd ZdZeedZeedZdZdZ	dZ
eedZ															d> fdd		Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zed?ddZd@d d!ZedAd"d#ZedBd$d%Zd&d' ZdCd(d)ZdDd*d+Ze  dEd,d-Z!d.d/ Z"d0d1 Z#dBd2d3Z$dBd4d5Z%d6d7 Z&d8d9 Z'e(d:d; Z)d<d= Z*  Z+S )Fr6   r   param_persistence_thresholdmodel_persistence_thresholdFr(  NTc                    sT  |dur|}t d |durtjj||
nd}|dur!|jj}t j	||||	d t
 s:t  t
 s:J d|du rDt
 | _n|| _|dur[t d |durXtd|| _t
j| jd| _t
j| jd| _|| _|dur|jjdkr| jdu rt|jj t | _| j| _| j| _d| _| jdurt | _t| j| j | _t | _td	| j d
d t  d!| j| jt"  t#$t% &t'j(d | _)t% *| j) || _+|dur|jj,r| j+s|jj,| _+|| _-|dur|jj.r| j-s|jj.| _-|| _/| j+s	| j-rt0 | _1td| j1j2j3 d
d |dur4|jj4t5_4|jj6dur4|jj6j$}|jj6j7}| 8|| |dt9j:fv rE| j)n|| _;| j;t9j<t9j=fv rT|nd| _7| j;t9j=kri|pft>|| j?| _@nd| _@|durtA|t#jBjCs{J | D|jEd
d t
F | _G| jGst Hdt#jI  tJtKd| _L|dur|jjL| _LdS dS )a  A context to enable massive model construction for training with
        ZeRO-3. Models are automatically partitioned (or, sharded) across the
        system and converted to half precision.

        Args:
            module (``torch.nn.Module``, optional): If provided, partition the model as
                if it was constructed in the context.
            data_parallel_group (``deepspeed.comm`` process group, optional):
                The group of processes to partition among. Defaults to all processes.
                Synonymous with sequence data parallel group for param partitioning
                across both sequence and data parallel groups.
            mem_efficient_linear (bool, optional): Replace
                torch.nn.functional.linear with an implementation that allows
                DeepSpeed to partition parameters. Defaults to ``True``.
            remote_device (string, optional): The initial device to store model
                weights e.g., ``cpu``, ``nvme``. Passing ``"cpu"`` will create the model in CPU
                memory. The model may still be moved to GPU based on the
                offload settings for training. Defaults to param offload device if a config is
                defined, otherwise GPU.
            pin_memory (bool, optional): Potentially increase performance by
                using pinned memory for model weights. ``remote_device`` must be
                ``"cpu"``. Defaults to pin_memory value in config, otherwise ``False``.
            config_dict_or_path (dict or ``json file``, optional): If provided, provides configuration
                for swapping fp16 params to NVMe.
            config (dict or ``json file``, optional): Deprecated, use config_dict_or_path instead.
            enabled (bool, optional): If ``False``, this context has no
                effect. Defaults to ``True``.
            dtype (``dtype``, optional): Can be used to change the data type of the parameters.
                Supported options are ``torch.half`` and ``torch.float``. Defaults to ``None``
            mpu (``object``, optional): A model parallelism unit object that implements get_{model,data}_parallel_{rank,group,world_size}.
            zero_param_parallel_group(``object``, optional): Parallel (comm) group for dual partitioning of ZeRO params.
            zero_quantized_weights (bool, optional): If ``True``, turn on quantized weights in all gather weights. Default is ``False``
            zero_quantized_nontrainable_weights (bool, optional): If ``True``, nontrainable weights will be stored in quantized format. Default is ``False``
            param_swapper (``deepspeed.runtime.swap_tensor.partitioned_param_swapper.AsyncPartitionedParameterSwapper``, optional): [Experimental] Use existing parameter swapper. Defaults to ``None``.
                This argument will be removed in the near future.

        This context accelerates model initialization and enables models that
        are too large to allocate in their entirety in CPU memory. It has the
        following effects:

        #. allocates tensors to either GPU or CPU memory or NVMe
        #. converts floating point tensors to half precision
        #. immediately partitions tensors among the group of data-parallel devices
        #. (*optional*) replaces ``torch.nn.functional.linear`` with a more
           memory-efficient implementation

        These modifications allow for models that exceed the size of local CPU/GPU
        memory/NVMe, but fit within the total NVMe capacity (*i.e.*, aggregate CPU
        or GPU memory or NVMe) across all nodes. Consider initializing a model with one
        trillion parameters, whose weights occupy two terabytes (TB) in half
        precision. The initial CPU allocation in full precision requires 4TB of
        memory *per process*, and so a system with 8 GPUs per node would need 32TB of
        CPU memory due to data-parallel redundancies. Instead, by immediately
        partitioning tensors we remove the redundancies. The result is that
        regardless of the number of GPUs, we still only require the original 4TB. This
        allows for a linear increase in model size with the aggregate system memory.
        For example, if a node has 1TB of memory and 8 GPUs, we could fit a trillion
        parameter model with 4 nodes and 32 GPUs.

        Important: If the fp16 weights of the model can't fit onto a single GPU memory
        this feature must be used.

        .. note::
            Initializes ``deepspeed.comm`` if it has not already been done so.
            See :meth:`deepspeed.init_distributed` for more information.

        .. note::
            Only applicable to training with ZeRO-3.

        Examples
        --------

        #. Allocate a model and partition it among all processes:

            .. code-block:: python

                with deepspeed.zero.Init():
                    model = MyLargeModel()


        #. Allocate a model in pinned CPU memory and partition it among a subgroup of processes:

            .. code-block:: python

                with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
                                         remote_device="cpu",
                                         pin_memory=True):
                    model = MyLargeModel()


        #. Partition an already-allocated model in CPU memory:

            .. code-block:: python

                model = deepspeed.zero.Init(module=model)
        NzYzero.Init: the `config` argument is deprecated. Please use `config_dict_or_path` instead.)r   r   r   r   zBParameters cannot be scattered without initializing deepspeed.commzcsequence_data_parallel_group' is deprecated and will be removed. Use 'data_parallel_group' instead.zyBoth 'data_parallel_group' and 'sequence_data_parallel_group' were specified. Please provide only one of these arguments.r   r   zhpZeRO group size: Tr  zihpZeRO partition parameter my rank in world {} my rank in group {} ranks in my param partition group: {} r   zUsing quantizer for weights: Frp   z5all_gather_into_tensor API is not available in torch use_all_reduce_for_fetch_params)Mr   warningrl  runtimeconfigDeepSpeedConfigzero_configmemory_efficient_linearsuperrC   r^   is_initializedr   get_world_groupr   
ValueErrorrb   rg   get_world_sizedp_world_sizezero_param_process_groupzero_hpz_partition_sizer   !_create_zero_param_parallel_group$_get_zero_param_intra_parallel_groupnum_ranks_in_param_grouprank_in_groupnum_param_groups/_get_zero_param_intra_parallel_group_world_sizerd  ._get_zero_param_intra_parallel_rank_in_mygrouprh   re   format*_get_zero_param_intra_parallel_group_ranksr~   r-   r   r   r   r   local_device
set_devicequantized_weightszero_quantized_weightsquantized_nontrainable_weights#zero_quantized_nontrainable_weightsrz   rj  r7   r  rM   r(  r6   offload_param
pin_memory_validate_remote_devicer   noneremote_devicecpunvmer    r   param_swapperr}   r   r
   _convert_to_zero_parametersr   has_all_gather_into_tensoruse_all_gather_into_tensorr   __version__r   r   r  )r@   rz   data_parallel_groupr   r  r  config_dict_or_pathr  r   r   mpuzero_param_parallel_groupr  r  sequence_data_parallel_groupr  
_ds_configr  rA   rB   rC   B  s   p














zInit.__init__c                 C   s$   dt _|jjt _|jj| j t _d S )NT)r6   apply_param_persistencer  r  r  num_partitions)r@   r   rA   rA   rB   _update_persist_config  s   
zInit._update_persist_configc                 C   s\   |  | t |  krt|jd|   nt|jt|  d|   |  d S ra   )_convert_to_deepspeed_paramr^   r  get_dp_process_groupr   r9   get_global_rankr   r?   rA   rA   rB   _zero_init_param  s   
zInit._zero_init_paramc                 C   s2   |D ]}t |r	q|j| j|_| | qd S rD   )r   r9   r:   r  r  )r@   
param_listr'   rA   rA   rB   r  %  s   z Init._convert_to_zero_parametersc                 C   s   |d urI|d t jfv r&|jjd ur&|jjj}|t jks&J d| d| d|t jkrK|jjd us:J dt j d|jjjd usMJ dt j d S d S d S )Nz''device' in DeepSpeed Config cannot be z if remote device is .zH"offload_param" must be defined in DeepSpeed Config if remote device is zC"nvme_path" in DeepSpeed Config cannot be None if remote device is )r   r  r  r  r-   r  	nvme_path)r@   r  r   offload_param_devicerA   rA   rB   r  -  s"   


zInit._validate_remote_devicec                 C   s  t d|jj dd td|jj dd |jddD ]Z\}}t d| d|jj dd t jd7  _t j| 7  _t	|svt
 |sP|j| j|_|d	krc| jrct|tv rct|| j | | t d
t| dt|  qtdtj d|jj dd d S )NzConverting Params in Fr  z-Before converting and partitioning params in rp   zAnalyzing param z in r   weightPartitioning param z module=zParam count z.. After converting and partitioning params in )rh   r  rM   r   rt   r   r   r   rs  r   r   r   r9   r:   r  r   typer#   r"   r  r   r   )r@   rz   namer'   rA   rA   rB   r   <  s(   

zInit._post_init_methodc              
      s  t j_tj_j_ _	d _
t _tjr=j	tjkr=tjj	 tjkr=d_t jd7  _t jj	7  _nd_d_j_d _j_j_j_j_tj_ t jd7  _d&fdd	}dd  t!		d'd	t"t# d
t$dt$dt%f fdd}d(fdd	}d)fdd	}d*fdd	}fdd}fdd}fdd}	fdd}
d+dt&j'dt$dt(fdd }fd!d"}d#t)dt)ffd$d%}|_*|_+|_,|_-|_.|_/|_0|	_1t23|_4|j5_5|_6d S ),NTr   Fr   c                    s"    }| d u r	|g} j | ||dS )Nr]   	hierarchy)_all_gather)r  r]   r  r   r'   r@   rA   rB   r     s   z4Init._convert_to_deepspeed_param.<locals>.all_gatherc                 S   s   t dd |D }|d jd u}|rt dd |D }tj|| | t  dd}g }t|D ]}	||d||	 | q/|rQt	tj
dd |D || d	 nt	tj
d
d |D || d	 t|| ||}
t|
||||dS )Nc                 s       | ]}|j jV  qd S rD   r5   rW  r   rA   rA   rB   	<genexpr>      zNInit._convert_to_deepspeed_param.<locals>._all_gather_dtype.<locals>.<genexpr>r   c                 s       | ]
}|j j|j V  qd S rD   r5   rW  rX  r   rA   rA   rB   r    s    Fr   r-   requires_gradc                 S      g | ]}|j t  qS rA   ds_secondary_tensorr:   r   r;   r   rA   rA   rB   r         zOInit._convert_to_deepspeed_param.<locals>._all_gather_dtype.<locals>.<listcomp>)outc                 S   r  rA   r5   r:   r   r;   r   rA   rA   rB   r     r  )rQ  rR   rN  rO  rR  )sumr  r~   r   r   r;   rT  r   rU  r   rZ  r`   rM  )r   rR   rO  r  r   rV  rR  r\  rN  r]  rE  rA   rA   rB   _all_gather_dtype  s<   
z;Init._convert_to_deepspeed_param.<locals>._all_gather_dtyperR   	safe_moderx  r(   c              	      s"   |  jdkrt| S | D ]}|jtjkrt| tj|_qj	}j
}j}| d jd u}jrA|rAj}j}j}t| dd d} ttjr[tddd | D   |rotd	d | D  td
d | D  t| dkr.| d }t|j| | }|r|jjd | }|r|jn|j}	tj||s|	jntj t! " dd}
|st#|	$t! " |
|}|
%dd|j&|j'$|j(|_)t*||S t+|	dr|	j,}|	j)}nj-.|	\}}t#|$t! " |
|}tj|/ | |jt! " dd}t#|$t! " ||}t0 }|
%dd|j&|j'$|j(|_1j-|_2||_3||_4t*|||dS j5r|s|st6dd | D }tj7|t8dd | D t! " dd}d}| D ],}|%d||j&|j'|_)||jj9   }|%d||jj:|j ||j7 }qXt;j<||dd}t=|| dS |st>t?}| D ]}||jj @| qg }|A D ]\}} |@ || ||| qtB|S t6dd | D }|rt6dd | D }tj|| tj t! " dd}|r t+| d jdrtCtjDdd | D }tCtjDdd | D }nGj-.tCtjDdd | D \}}n4t+| d jdrBtCtjDdd | D }tCtjDdd | D }nj-.tCtjDdd | D \}}tj|/ | tjEt! " dd}t#|||}t#|||}t0 }||_1j-|_2||_3||_4||_F||_GtH|| d |||dS )Nr   r   c                 S   r   rD   r   r   rA   rA   rB   r     r   zPInit._convert_to_deepspeed_param.<locals>.all_gather_coalesced.<locals>.<lambda>r   z-allgather_coalesced: c                 S      g | ]}|j qS rA   r   r   rA   rA   rB   r         zRInit._convert_to_deepspeed_param.<locals>.all_gather_coalesced.<locals>.<listcomp>c                 S   r  rA   r   r   rA   rA   rB   r     r  c                 S      g | ]}|j jqS rA   r  r   rA   rA   rB   r         Fr  r+   )rF  c                 s   s    | ]}|j V  qd S rD   )ds_numel_alignedr   rA   rA   rB   r    s    zQInit._convert_to_deepspeed_param.<locals>.all_gather_coalesced.<locals>.<genexpr>c                 s   r  rD   )r5   r   r   rA   rA   rB   r    r  Tr[   )rE  rR   c                 s   r  rD   r  r   rA   rA   rB   r  -  r  c                 s   r  rD   r  r   rA   rA   rB   r  0  s    c                 S       g | ]}|j jt  qS rA   )r  r9   r:   r   r;   r   rA   rA   rB   r   :      c                 S   r  rA   )r  r+   r:   r   r;   r   rA   rA   rB   r   >  r  c                 S   r  rA   r  r   rA   rA   rB   r   D  s    c                 S   r  rA   )r5   r9   r:   r   r;   r   rA   rA   rB   r   K  s     c                 S   r  rA   )r5   r+   r:   r   r;   r   rA   rA   rB   r   L  r  c                 S   r  rA   r  r   rA   rA   rB   r   S  r  )rQ  rR   rN  rO  rR  rF  )I*_ensure_availability_of_partitioned_paramsr  r~  r/   r0   r   r2   r3   r1   r   rg   r  r  r  r  r  r   r   isEnabledForloggingDEBUGrj   r   r}  ru  rv  rW  shaper5   r~   r   r   int8r   r;   r`   r:   rU  r<   r=   r-   r9   rA  r4   r+   r7   rx  rs  ri  rI  rH  rG  rJ  r  r  r3  r   get_partition_rankcopy_r^   
all_reducerh  r   r   r   rl   rf  r   rZ  r   rV  rO  rM  )rR   r  rx  r'   r   r  rO  rR  buffer_sizeparam_ds_tensorparam_bufferrg  scalesrI  rE  quant_scale_bufferrG  
quant_infoflat_buffer_sizer\  start_paramstartdtype_paramsr   r   rV  )r  r@   rA   rB   all_gather_coalesced  s(  



	"







z>Init._convert_to_deepspeed_param.<locals>.all_gather_coalescedc                    sD    }t d|  dt| dd | d u r|g} j| |dd d S )N--z----Partitioning param Fr  Tr   	free_data)rh   r   
_partition)r  r  r   r  r   r  rA   rB   r   l  s   z3Init._convert_to_deepspeed_param.<locals>.partitionc                    sB    }| d u r	|g} t d|  ddd | D  d |  d S )Nr  z*----Reducing Gradients for param with ids c                 S   r  rA   r   r   r'   rA   rA   rB   r   y  r  zWInit._convert_to_deepspeed_param.<locals>.reduce_gradients_at_owner.<locals>.<listcomp>z	 to owner)rh   _reduce_scatter_gradients)r  r  r   r  rA   rB   reduce_gradients_at_ownert  s   zCInit._convert_to_deepspeed_param.<locals>.reduce_gradients_at_ownerc                    sR    }t d|  dt|  | d u r|g} t|tjr|g}j| ||d d S )Nr  z(----Partitioning param gradient with id )partition_buffers
accumulate)rh   r   r}   r~   r   _partition_gradients)r  r  r  r  r   r  rA   rB   partition_gradients}  s   z=Init._convert_to_deepspeed_param.<locals>.partition_gradientsc                      
     S rD   )_aligned_sizerA   r  rA   rB   aligned_size  ro   z6Init._convert_to_deepspeed_param.<locals>.aligned_sizec                      r  rD   )_padding_sizerA   r  rA   rB   padding_size  ro   z6Init._convert_to_deepspeed_param.<locals>.padding_sizec                      r  rD   )_partition_numelrA   r  rA   rB   partition_numel  ro   z9Init._convert_to_deepspeed_param.<locals>.partition_numelc                      s         S rD   )r   
_orig_itemrA   r   rA   rB   item_override  s   z7Init._convert_to_deepspeed_param.<locals>.item_overrideslfuse_debug_namec                 S   sr   |rt | n| j| jj|  | jt| jt| j| j	| j
d ur%t| j
jnd | j| j| jd ur5| jjdS d dS )N)r   statusrs  rW  r  r=   r  
grad_shapepersistactive_sub_moduleszds_tensor.shape)r   r   r/   r  rs  rW  tupler  r=   r  grad
ds_persistr   r5   )r  r  rA   rA   rB   r3     s   z4Init._convert_to_deepspeed_param.<locals>.ds_summaryc                    s     |  d S rD   )r  r  rm   rA   rB   convert_to_zero_parameters  s   zDInit._convert_to_deepspeed_param.<locals>.convert_to_zero_parametersfuncc                    s    fdd}|S )Nc                     s       | i |S rD   )r   )r   rI   )r  r'   rA   rB   r     s   zKInit._convert_to_deepspeed_param.<locals>.allgather_before.<locals>.wrappedrA   )r  r   r   )r  rB   allgather_before  s   z:Init._convert_to_deepspeed_param.<locals>.allgather_before)NFr   FF)Nr   FTra   )NNr   F)F)7r   r   ds_param_typer0   rH   r/   r  r=   rs  rW  r5   r   r   r6   r  r  num_persisted_elementsr  r  num_persisted_parametersis_external_paramr   r  r  ds_zero_param_process_groupr  ds_secondary_tensor_group_sizer  rX  r  nvme_swapperparam_idr   r   r   r   boolrM  r~   r   r|  r   r   r  r   r  r  r  r   r  rw   rx   r3   itemr  )r@   r'   r   r  r   r  r  r  r   r  r  r3   r  r  rA   )r  r'   r@   rB   r  T  sl   
$# =		
z Init._convert_to_deepspeed_paramc                 C   s   |j | | S rD   )rW  r  r?   rA   rA   rB   r    s   zInit._aligned_sizec                 C   s   |j | j }|r| j| S dS ra   )rW  r  )r@   r'   	remainderrA   rA   rB   r    s   zInit._padding_sizec                 C   s   |j jS rD   r  r?   rA   rA   rB   r    s   zInit._partition_numelc                 C   s   g }g }|D ]8}|j jtjkr#|j jtjkr|jtjksJ |	| |j jtj
kr>|j jtjkr7|jtjks9J |	| qt|dkrQ|d jj|dd d S t|dkr`|d j  d S d S )Nr   F)r]   )r5   r  r!   r   final_locationr   r  r/   r0   r   r1   r}  r  swap_insynchronize_reads)r@   rR   swap_in_listswap_in_flightr'   rA   rA   rB   r    s   

z/Init._ensure_availability_of_partitioned_paramsc                 C   s   |  | g }g }|D ]!}|jtjkr,|r'| j|||d}tj|_|| q|| q|s}t|dkr=| j||d}n5g }	g }
|D ]}t	|j
dsVt	|dr\t	|jdr\|	| qC|
| qC| j|
|dd | j|	|dd |D ]}tj|_qtd S |S )	Nr  r   )r  r+   r  F)rx  T)r  r/   r0   r   _allgather_paramr1   r   r}  _allgather_paramsr4   r5   r  _allgather_params_coalescedrH   )r@   r  r]   r  rg  all_gather_listr'   rE  	ret_valueall_gather_quantize_listall_gather_nonquantize_listrA   rA   rB   r    s@   



zInit._all_gatherc                 C   sN   |D ]"}t d|j dd | jd ur| | | j||dd tj|_qd S )NzBefore Partitioning Param Fr  Tr  )rh   r   r  _partition_param_sec_partition_paramr0   r   r/   )r@   r  rf   r   r  r'   rA   rA   rB   r    s   


zInit._partitionc              	   C   s  |j tjusJ d| dtd|j d|j  dd |j tju rtd|j dt dd |jd ur|std	|j d|j	 dd |rKt
| td
|j d|j	 dd |jjtjkrtd|j ddd |j|g td|j d|jj	 ddd d S | |}|| j }|jd u rd }| jtjkr| jj|drtj}| j||}tjd|j|jd}|j|_td|j d n:|jr| j}	n| jtjkrtj}	n| j}	tj||j|	d}|js| jr| j !|\}|_"|	tjkr| j#rt$ #|}d|_||_||j_%t&j|j_'||j_||_(|| )  }
|
| }|* +d}|
|j%k rU||j%krU|,d|
|}t-  |j.| W d    n	1 sOw   Y  n0|
|j%k r|j%|
 }t-  |j,dd|.|,d|
| W d    n	1 sw   Y  td	|j d|j	 dd t
| td
|j d|j	 dd |jjtjkr| j/|g td|j d td|j ddd td|j d|j d|j d|j	  d S d S )N # Cannot partition a param in flight	Param id z status is Fr  zPartitioning param id z reuse buffers zBefore partitioning param zAfter partitioning param zParam z+ partition released since it exists in nvmezafter swap Param )rs  r   r   zID z< Initializing partition for the first time for nvme offload.z0 Offloaded to nvme offload and buffers released. partitioned type  dev  shape )0r/   r0   r1   rh   r   rH   reuse_buffersr5   r   r  r   r  r   r  r  $remove_partition_and_release_buffersr  r  r  r  swappable_tensor
get_bufferr~   r   r   r-   r9   r  r  r  r  r  r7   rx  r+   r  r   rW  r!   r  r  r  
contiguousr<   rU  no_gradr  swap_out_and_release)r@   r'   bufferr   r  tensor_sizepartition_sizer  partitioned_tensorr-   r  endone_dim_param
src_tensorelems_to_copyrA   rA   rB   r+    s   







,zInit._partition_paramc                 C   sp  |j tjusJ d| d|j tju r|jd ur|sd S | |}|| j }t|| j }|jd u rhd }t	j
||j| jd}| jrF| }|jsU| jrU| j|\}|_d|_||_||j_tj|j_||j_|| j }	|	| }
| d}tdt|j|	 |}|jdd||d|	| t  st   !  t"|j# d|j d|j$ d	|j% dd
 d S d S )Nr,  r-  r   Fr/  r   r0  r1  r2  r  )&r/   r0   r1   rH   r  r  r  rd  r  r~   r   r   r  r  r  r  r7   rx  r+   rW  r!   r  r  r  r7  r<   maxrY  rU  r  r   rE   rF   rG   rh   r   r-   r  )r@   r'   r:  r   r;  r<  secondary_partition_sizer  secondary_partitioned_tensorsecondary_startsecondary_endr?  	sec_numelrA   rA   rB   r*    sL   







"
zInit._partition_param_secc                 C   s~   |j d ur#td|j d|j d|j d|j   d|j  
 d S td|j d|j d|j d|j  d|j  
 d S )Nr.  z, param status: z, param numel z, partitioned numel z, data numel z, partitioned ds_tensor )r5   rh   r   r/   rW  rs  r9   r?   rA   rA   rB   _param_status  s   
2.zInit._param_statusc                 C   s  |j j}|| j }| |}||ks J d|j d| d| td|  dt| d|  tdt| d| d	d
d tj	||j
|jdd}tdt| d	| d	| d	d
d t  sht   td|  dt| d|  | jrtj||j t  | ||d}n=g }	t| jD ]%}
|	|d||
 | |
tj| |dkr|	|
 jj|j jdd qtj|	|	|   | ||d}|dd|j|j}|j|_|S )Nz	param id z aligned size z does not match tensor size r  z'---- Before allocating allgather param z partition size=z Before allocate allgather param z partition_size=r,  Fr  r   r/  zAfter allocate allgather param z----allgather param with r[   r   r   T)r.   )r5   rW  r  r  r   rh   r   r   r~   r3  r   r-   r<   r   rE   rG   r  r^   all_gather_into_tensorr:   r   get_partition_dp_grouprT  r   rU  rb   r9   r  r   r  r=   )r@   r'   r]   r  r<  r;  aligned_param_sizer\  rE  rN  r]  replicated_tensorrA   rA   rB   r#    sR   

$

	
zInit._allgather_paramc                 C   s<  t |dkrdS | jdkrt|}|  dS g }g }|r!g }g }|D ]-}	||	jj ||	jt 	  |rP||	jj
  ||	jj
t 	  q#g }
|rWg }|D ]}|| j }tj||d jj| jdd}d|_|
| qY|r|D ] }|| j }tj||d jj
j| jdd}d|_|| q}g }g }t|D ]\}}	|| d}| jrtj|
| || |	dd}|rtj|| || | |	dd}|| nvg }t| jD ]-}|| }|
| d|| |}|| t |std	| d
| d|   qtj||| |	dd}|rPg }t| jD ]}|| }|| d|| |}|| q%tj||| | |	dd}|| || q|d   |ri|D ]}|  qat|D ]#\}}	|
| }|r| j||| }|dd|	j|	j j!|	_!qmt " st #  dS )zO blocking call
        avoid explicit memory copy in _allgather_params
        r   Nr   r   r/  FTr[   zparam z, partition z! is not on CUDA, partition shape )$r}  r  r~  rJ   r   r5   rW  r:   r   r   r+   rs  r~   r   r   r  r<   r  	enumerater  r^   rI  rJ  rT  rU  r   r   r  sizer   r7   r8   r=   r9   rE   rG   )r@   r  r  rx  rE  partition_sizeslocal_tensorsquantize_scale_sizesquantize_scale_tensorsr'   allgather_paramsallgather_quantize_scalepsizer;  r\  launch_handleslaunch_quantize_handles	param_idxrY   hquantize_handleoutput_listr]  r   output_scale_listrG  gathered_tensorrA   rA   rB   r%    s   







 
z Init._allgather_params_coalescedc                 C   s  t |dkrd S tdd |D }|| j }tj||d jj| jd}g }t| jD ]3}|| }|	|
d|| ||  kr]d}	|D ]}
|
jj}|| 
d|	||
jj |	|7 }	qDq*t|d drtdd |D }|| j }tj||d jjj| jd}g }t| jD ]4}|| }|	|
d|| || jkrd}	|D ]}
|
jjj}|| 
d|	||
jjj |	|7 }	qqtj|||   | |
dd t|d drtj||d j| |
dd d}|D ]a}
|
jj}|
j}tj|
j|
jj| jd}t| jD ]-}|| }|| }||k r-t|| |}|| 
d||}|d	
d||| q||
jj7 }t|d drD| j||}|j|
_qd S )
Nr   c                 S   r  rA   r  r  rA   rA   rB   r   k  r  z*Init._allgather_params.<locals>.<listcomp>r   r+   c                 S   s   g | ]}|j j qS rA   )r5   r+   rs  r  rA   rA   rB   r     r   Fr[   r/  )r}  r  r  r~   r   r5   r   r  rT  r   rU  r  rW  r  r9   r4   rO  r+   rg   r^   rI  rJ  r   r=   rY  r<   r7   r8   )r@   r  r  r<  r;  r\  rN  r]  r  offsetr'   param_numel
scale_sizescale_tensor_sizeflat_scale_tensorscale_partitionsparam_scale_numelr^  param_partition_size
param_sizerL  r`  numel_to_copyra  rA   rA   rB   r$  f  s   








zInit._allgather_paramsc           
      C   s   g }|D ] }|j  |jksJ |j   d|j d|| | qt||D ]A\}\}}|d ur8|  |jj}|  | }|| }||j  k rQ|k rkn q*|j| }	|j 	d
d||	|
dd|	 q*d S )Nz != zE Cannot reduce scatter gradients whose size is not same as the paramsr/  r   )r  rs  rW  r   _reduce_scatter_gradientziprJ   r5   r  r<   rU  r  )
r@   r  handles_and_reduced_partitionsr'   rE  reduced_partitionr<  r  r>  elementsrA   rA   rB   r    s&   
&zInit._reduce_scatter_gradientsc                 C   s   |j j}|| j }g }t| jD ]L}|| }|| }||jk r0||jkr0|jdd||}n'tj||j	|j
d}||jk rW|j| }	|dd|	|jdd||	 || qtj| |d}
tj||
 || |dd}|||
 fS )Nr/  r   r   r   Tr[   )r5   rW  r  rT  r  r<   rU  r~   r3  r   r-   r  r   r^   rb   rJ  reduce_scatter)r@   r'   r<  
total_size
input_listr]  r  r>  inputrl  rg   rE  rA   rA   rB   rh    s(   


&
zInit._reduce_scatter_gradientc                 C   s>   |d u rd gt | }t||D ]\}}| j|||d qd S )N)partition_bufferr  )r}  ri  _partition_gradient)r@   r  r  r  r'   rq  rA   rA   rB   r    s
   zInit._partition_gradientsc              	   C   s~  t d|j d|j  d|jj d|jj  tddd |jj}|d u r7|r,J dtj	||j|j
d	}n| |ksIJ d
|  d| tj| |d}|| }|| }|ddd|}||jk rt|j| |}	|dd|	}
|jdd||	}|s|
| n'|j
|
j
kr|
| ntj| |j|j
d	}||
 || |
| |j|j_tddd d S )Nr  z gradient of size z type z part_size zBefore partitioning gradientsFr  zNo buffer to accumulate tor   zThe partition buffer size z* should match the size of param.ds_tensor r   r/  r   zAfter partitioning gradients)rh   r   r  rs  r   r5   rW  r   r~   r3  r-   r^   rb   rJ  r<   rU  rY  r  add_r   r9   )r@   r'   rq  r  r<  rg   r  r>  dest_tensor_full_bufferrl  dest_tensorr@  
acc_tensorrA   rA   rB   rr    s>   *





zInit._partition_gradientc                 C   s   |j S rD   r   r?   rA   rA   rB   rJ  6  s   zInit.get_partition_dp_groupc                 C   r   )z]subclass can overload to specify different relative rank in
        parameter partition group)rg   rm   rA   rA   rB   r  9  s   zInit.get_partition_rankc                 C   r   rD   )r  rm   rA   rA   rB   r  >     zInit.num_partitionsc                 C   r   )z= Return the communication group with all data-parallel ranks rw  rm   rA   rA   rB   r  B  rx  zInit.get_dp_process_group)NNTNFNNTNNNFFNNrc  )FFT)NFT)NF)Fr   )r   Fr   ),rM   rN   rO   r  r   r   r  r  r  r  r  r(  rC   r  r  r  r  r   r  r  r  r  r  r   r  r  r+  r*  rH  r#  r%  r~   r8  r$  r  rh  r  rr  rJ  r  propertyr  r  __classcell__rA   rA   r  rB   r6   9  st    


 V	  p
&y4


6iO
!
>
r6   c                   @   s&   e Zd Zd	ddZdd Zdd ZdS )
GatheredParametersNTc                 C   s   || _ |sdS t|trt|tjst|}n|g}tdd |D s(d| _ dS dd |D | _tt	| jdd d	| _d| _
|dur\| jd
 jt krQ|| _
nt| jd
 j|| _
|| _| jdurp| jD ]
}t| j| qgdS dS )a  A context that collects parameters that were partitioned via a
        :class:`deepspeed.zero.Init` context. The parameters are partitioned
        again upon exit.

        Args:
            params (``torch.nn.Parameter``): A single parameter, or an iterable of parameters (list, tuple, generator) of parameters to collect.
                It's assumed that all parameters are zero params.
            modifier_rank (int, optional): If specified, this rank's parameter will be
                broadcasted on exit from the context. This argument is required if ``params`` are
                modified, so that all processes have a consistent view of the data. Defaults
                to ``None``.
            fwd_module (``torch.nn.Module``, optional): If specified, ``params`` will be
                registered as external parameters of ``fwd_module``. See :meth:`deepspeed.zero.register_external_parameter`.
            enabled (bool, optional): If ``False``, this context is a no-op. Defaults to ``True``.

        Important: Make sure to use ``modifier_rank`` that is not ``None`` (e.g., ``modifier_rank=0``)
        if you need the GPU memory allocated by gather to be released upon exit from the context manager.

        Important: if ``params`` isn't an iterable of parameters or a single parameter it'll be silently ignored!

        Examples
        ========

        #. Allocate a partitioned module, initialize its weight on rank 0, and update all
           processes.

            .. code-block:: python

                with deepspeed.zero.Init():
                    linear = torch.nn.Linear(1000,1000)

                with deepspeed.zero.GatheredParameters(linear.weight,
                                                       modifier_rank=0):
                    if deepspeed.comm.get_rank() == 0:
                        linear.weight.zero_()

                with deepspeed.zero.GatheredParameters(linear.weight,
                                                       modifier_rank=0):
                    if deepspeed.comm.get_rank() == 0:
                        linear.weight.zero_()

        #. Collect a partitioned weight to pass to another module during
           training. The parameter will be registered as an external parameter
           and made available during the backward pass.

            .. code-block:: python
                :emphasize-lines: 6

                def forward(self, input):
                    x = self.layer1(input)

                    # self.layer1.weight is required by self.layer2.forward
                    with deepspeed.zero.GatheredParameters(self.layer1.weight,
                                                           fwd_module=self):
                        y = self.layer2(x, self.layer1.weight)
                    return y


        #. Pretrained model loading

            .. code-block:: python

                with deepspeed.zero.Init():
                    model = MyModel()

                state_dict = torch.load(model_path, map_location="cpu")

                def load(module: nn.Module, prefix=""):
                    # because zero3 puts placeholders in model params, this context
                    # manager gathers (unpartitions) the params of the current layer, then loads from
                    # the state dict and then re-partitions them again
                    with deepspeed.zero.GatheredParameters(list(module.parameters(recurse=False)), modifier_rank=0):
                        if deepspeed.comm.get_rank() == 0:
                            module._load_from_state_dict(state_dict, prefix)

                    for name, child in module._modules.items():
                        if child is not None:
                            load(child, prefix + name + ".")

                load(model, prefix="")

        If this approach is not used, then the full model will first be copied to each GPU. For models
        bigger than the memory of a single GPU, this method is required.
        Nc                 s   s    | ]}t |V  qd S rD   r   r   rA   rA   rB   r    r  z.GatheredParameters.__init__.<locals>.<genexpr>Fc                 S   s   g | ]	}t |d r|qS r   )r4   r   rA   rA   rB   r     s    z/GatheredParameters.__init__.<locals>.<listcomp>c                 S   r   rD   r   )xrA   rA   rB   r     r   z-GatheredParameters.__init__.<locals>.<lambda>r   r   )r   r}   r   r~   r   r   anyrR   r   r   src_rankr   r^   r  r  
fwd_moduler   )r@   rR   modifier_rankr  r   r   rA   rA   rB   rC   I  s0   V


zGatheredParameters.__init__c                 C   s"   | j sd S | jd j| jd d S )Nr   r  )r   rR   r   rm   rA   rA   rB   r     s   zGatheredParameters.__enter__c                    sn    j sd S  jd u r jd j jdd d S  fdd jD }|D ]}|  q# jd j jdd d S )Nr   F)r  r   c                    s$   g | ]}t j|j j|jd dqS )Tr[   )r^   r   r9   r~  r   r   rm   rA   rB   r     s   $ z/GatheredParameters.__exit__.<locals>.<listcomp>T)r   r~  rR   r   rJ   )r@   excrg  rY  rA   rm   rB   r     s   

zGatheredParameters.__exit__)NNT)rM   rN   rO   rC   r   r   rA   rA   rA   rB   r{  G  s    
wr{  rD   r  rL  )mru  r   rw   typingr   r   enumr   r   rr   r   collectionsr   r  r~   r   rl  r	   r^   torch.nnr
   r   r+  r   deepspeed.utilsr   utilsr   r   deepspeed.runtime.zero.configr   deepspeed.runtime.zero.utilsr   r   %deepspeed.runtime.zero.offload_configr   deepspeed.runtime.config_utilsr   r   r   deepspeed.comm.commr   deepspeed.utils.debugr   r   r   r   r   deepspeed.acceleratorr   %swap_tensor.partitioned_param_swapperr    r!   &deepspeed.inference.quantization.utilsr"   r#   r$   r%   partitioned_param_data_shaper   r   r&   rQ   r`   rh   strrj   r{   r   r   r   r0   r   r1  r   r   r3  r2  r5  r4  r7  r6  r9  r8  r;  r:  r=  r<  r   r   r   r   r   r3  temp_contiguous_tensorempty_buffersobjectr   r?  r@  rA  rM  rf  rh  ri  rj  r~  r6   r{  rA   rA   rA   rB   <module>   s   
4
  <	J
	)          