o
    TiCW                     @   s  d dl Z d dlmZ d dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZmZ d dlmZ d dlmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dlmZ d dlmZ de
fddZG dd deZG dd deZ G dd deZ!G dd deZ"dS )    N)List)commis_zero_param)MiCS_CommGroupscreate_mics_comm_groupsscale_tensors)DeepSpeedZeRoOffload)InitAllGatherCoalescedHandleZeroParamStatus)DeepSpeedZeroOptimizer_Stage3)instrument_w_nvtxlog_distlogger)get_accelerator)Tensor)	Parametercomm_groupsc                 C   s    d}| j d ur| jd urd}|S )NFT)param_intra_node_groupparam_inter_node_shard_group)r   result r   O/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/zero/mics.py"has_hierarchical_all_gather_groups   s   r   c                       sD   e Zd ZdZdee dee deddf fddZdd	d
Z	  Z
S )MiCS_AllGatherCoalescedHandlezT This handle assumes that no need to
    copy data out from a contiguous tensor
    params
partitions
world_sizereturnNc                    s   t  |||| d S N)super__init__)selfallgather_handler   r   r   	__class__r   r   r"   &   s   z&MiCS_AllGatherCoalescedHandle.__init__c              
   K   s   z	t | jj  W n% ttfy. } ztddgd td| dgd W Y d}~nd}~ww | jr4dS t| jD ]\}}|j	t
jksMJ d|  dt
j|_	q9d| _dS )		
        z`WARNING: Runtime Error while waiting the collective all-gather, possibly due to the _IllegalWorkr   rankszError message: Nzexpected param z to be inflightT)r   r$   wait
ValueErrorRuntimeErrorr   complete	enumerater   	ds_statusr   INFLIGHT
ds_summary	AVAILABLE)r#   kwargse_paramr   r   r   r*   )   s      

z"MiCS_AllGatherCoalescedHandle.wait)r   N)__name__
__module____qualname____doc__r   r   r   intr"   r*   __classcell__r   r   r%   r   r   !   s    &r   c                       sz   e Zd Z											d fdd	Z fddZddd	Zdd
dZdddZdd Zdd Z	e
dd Z  ZS )	MiCS_InitNTFc                    s   |dusJ dt jj||}t s t  t s J d|du r)t }n|}|dur?t	d |dur<t
d|| _t|j||j|d| _t ||||||||	|
|
 dS )a  A context manager to partition the model parameters during the model
        construction with MiCS partition strategy. Model states are partitioned
        to the number of devices specified via ``mics_shard_size`` field in the
        deepspeed config json file. The context manager also introduces
        hierarchical communication method to reduce the cost of inter-node
        communications, which can be enabled with
        ``mics_hierarchical_params_gather`` field in deepspeed config.

        Args:
            module (``torch.nn.Module``, optional): If provided, partition the model as
                if it was constructed in the context.
            data_parallel_group (``deepspeed.comm`` process group, optional):
                The group of processes to partition among. Defaults to all processes.
                Synonymous with sequence data parallel group for param partitioning
                across both sequence and data parallel groups.
            mem_efficient_linear (bool, optional): Replace
                torch.nn.functional.linear with an implementation that allows
                DeepSpeed to partition parameters. Defaults to ``True``.
            remote_device (string, optional): The initial device to store model
                weights e.g., ``cpu``, ``nvme``. Passing ``"cpu"`` will create the model in CPU
                memory. The model may still be moved to GPU based on the
                offload settings for training. Defaults to param offload device if a config is
                defined, otherwise GPU.
            pin_memory (bool, optional): Potentially increase performance by
                using pinned memory for model weights. ``remote_device`` must be
                ``"cpu"``. Defaults to pin_memory value in config, otherwise ``False``.
            config_dict_or_path (dict or ``json file``, optional): If provided, provides configuration
                for swapping fp16 params to NVMe.
            config (dict or ``json file``, optional): Deprecated, use config_dict_or_path instead.
            enabled (bool, optional): If ``False``, this context has no
                effect. Defaults to ``True``.
            dtype (``dtype``, optional): Can be used to change the data type of the parameters.
                Supported options are ``torch.half`` and ``torch.float``. Defaults to ``None``
            mpu (``object``, optional): A model parallelism unit object that implements get_{model,data}_parallel_{rank,group,world_size}.

        This context follows the same logic as ``deepspeed.zero.Init()``, but
        with the modification for partition size of each parameter.

        Examples
        --------

        #. Allocate a model and partition it among all processes:

            .. code-block:: python
                # the config_dict_or_path is required to let the context manager know
                # how partition the parameters.
                # The configuration has to include the field ``mics_shard_size``
                with deepspeed.zero.MiCS_Init(config_dict_or_path=ds_config):
                    model = MyLargeModel()


        #. Allocate a model in pinned CPU memory and partition it among a subgroup of processes:

            .. code-block:: python

                with deepspeed.zero.MiCS_Init(data_parallel_group=mpu.get_data_parallel_group(),
                                              remote_device="cpu",
                                              pin_memory=True
                                              config_dict_or_path=ds_config):
                    model = MyLargeModel()


        #. Partition an already-allocated model in CPU memory:

            .. code-block:: python

                model = deepspeed.zero.MiCS_Init(module=model,
                                                 config_dict_or_path=ds_config)
        Nz2Must provide configuration for MiCS InitializationzBParameters cannot be scattered without initializing deepspeed.commzcsequence_data_parallel_group' is deprecated and will be removed. Use 'data_parallel_group' instead.zyBoth 'data_parallel_group' and 'sequence_data_parallel_group' were specified. Please provide only one of these arguments.)hierarchical_allgathermpu)	deepspeedruntimeconfigDeepSpeedConfigdistis_initializedinit_distributedget_world_groupr   warningr+   ds_process_groupr   mics_shard_sizemics_hierarchial_params_gathermics_comm_groupsr!   r"   )r#   moduledata_parallel_groupsequence_data_parallel_groupmem_efficient_linearremote_device
pin_memoryconfig_dict_or_pathrB   enableddtyper?   
_ds_configrI   r%   r   r   r"   B   s4   R

zMiCS_Init.__init__c                    s4   t  | j|_|j d fdd	}||_d S )Nc                    sN   | d j }t|}t r|r| |S t r| |S  | fi |S ) r   )r   r   rD   has_coalescing_manager_hierarchical_all_gather_params(_flat_all_gather_with_coalescing_manager)r   param_buffersr3   rL   hierarchical_all_gatherold_all_gather_coalescedr#   r   r   _param_all_gather_coalesced   s   
zJMiCS_Init._convert_to_deepspeed_param.<locals>._param_all_gather_coalescedr    )r!   _convert_to_deepspeed_paramrL   r   all_gather_coalesced)r#   r6   r_   r%   r]   r   r`      s
   
z%MiCS_Init._convert_to_deepspeed_paramc                 C   sL   |  | |D ]}|jtjkrt| tj|_qt|dd d}||fS )Nc                 S   s   | j S r    )ds_id)pr   r   r   <lambda>   s    z+MiCS_Init._pre_all_gather.<locals>.<lambda>)key)*_ensure_availability_of_partitioned_paramsr/   r   NOT_AVAILABLEr,   r1   r0   sorted)r#   r   params_buffersr6   r   r   r   _pre_all_gather   s   

zMiCS_Init._pre_all_gatherc                 C   s*  |  ||\}}|d j}|j}g }g }t|D ]Q\}}|jj| }	|durJ|| durJ||  |	ksEJ d| d||   d|	 || }
ntj|	|j	| j
ddd}
||
 |jjd}|| qtj|||jd	d
}t|D ]\}}|| dd|j|jj|_qxt||g |dS )rW   r   Nzparams_to_gather_buffers[] size z does not match with t_size FrU   devicerequires_gradTgroupasync_opr$   r   r   r   )rj   r   param_shard_sizer.   	ds_tensords_numelnumeltorchemptyrU   local_deviceviewappenddatarD   ra   param_shard_groupnarrowds_shaper   )r#   r   ri   rL   rt   output_tensorsinput_tensorsirc   t_sizeflat_out_flat_inputall_gather_handleidxr6   r   r   r   rZ      s<   



"z2MiCS_Init._flat_all_gather_with_coalescing_managerc                 C   s|  |  ||\}}|d j}tj|jd}|j}|j}|j}tj|d}tj|d}	g }
t|D ]E\}}|j	j
| }|dura|| dura||  |ks\J d| d||   d| || }ntj||j| jddd	}|
| q/g }g }t|D ](\}}|j	j
| }|
| d|| |}|| ||j	jd	| j q}tj|||dd
 g }g }t|D ][\}}|
| ||	|j	j
fd|d}|||   |  t|
| |}t|D ])\}}|	|j	j
 }||j	j
 }|
| d|| | |j	j
}|| || qqtj|||dd
}t|D ]\}}|
| dd|j
|jj|_q t||g |dS )rW   r   rq   Nzparam_buffers[rk   z  does not match with param_size Frl   ro   rp      Trs   )rj   r   rD   get_rankr   r   rt   get_world_sizer.   ru   rv   rw   rx   ry   rU   rz   r{   r|   r   r}   tora   copy_detachclonesizechunkr   r   )r#   r   ri   rL   
local_rankinter_node_comm_groupintra_node_comm_grouprt   inter_node_sizeintra_node_sizeparam_tensorsr   rc   
param_sizeparam_tensorinter_outputsinter_inputs
inter_size_outintra_outputsintra_inputsparam_chunkoutput_chunksjintra_chunk_sizelocal_offset_inr   r6   r   r   r   rY      s|   




 
$z)MiCS_Init._hierarchical_all_gather_paramsc                 C   s   |j jS r    )r   r~   )r#   r6   r   r   r   get_partition_dp_groupC     z MiCS_Init.get_partition_dp_groupc                 C      | j jS r    )rL   param_shard_rankr#   r   r   r   get_partition_rankF  r   zMiCS_Init.get_partition_rankc                 C   r   r    )rL   rt   r   r   r   r   num_partitionsI  s   zMiCS_Init.num_partitions)NNNTNFNNTNNr    )r7   r8   r9   r"   r`   rj   rZ   rY   r   r   propertyr   r<   r   r   r%   r   r=   @   s*    o


$Er=   c                   @   s   e Zd ZdZdd ZdS )MiCS_Offloadz; Wrapper to change the behavior for parameter sharding
    c              	   C   s   t ddgd dd | D }|r?dd | D }|r'|d j|d dS d}|r/| }t||| j|| j| j|d	 dS dS )
zH overload the parent class function for convert the parameters

        z4Convert to zero parameters from MiCS Offload managerr   r(   c                 S   s   g | ]}t |s|qS r   r   .0rc   r   r   r   
<listcomp>W      z<MiCS_Offload._convert_to_zero_parameters.<locals>.<listcomp>c                 S   s   g | ]}t |r|qS r   r   r   r   r   r   r   Y  r   )
param_listN)rM   rN   rU   rS   rQ   rR   r?   )r   
parametersconvert_to_zero_parametersget_data_parallel_groupr=   rU   offload_deviceoffload_param_pin_memory)r#   	ds_configrM   r?   non_zero_paramszero_paramsrq   r   r   r   _convert_to_zero_parametersR  s$   
z(MiCS_Offload._convert_to_zero_parametersN)r7   r8   r9   r:   r   r   r   r   r   r   N  s    r   c                       s   e Zd ZdZdddddddddd	ejdddddd
dddejejdddddf fdd	Zdd Z	de
e de
e ddf fddZede
e fddZ				d fdd	Z  ZS )MiCS_Optimizerz
    MiCS Optimizer
    r   FNTi eii ʚ;i l    J)g        r   c            !         s   t ddgd t jg |||||||||	|
|||||||||||||||||||||R   t| } t| ds`J dg d| jj| _	| jj
| _d S )NzInit MiCS optimizerr   r(   r    )z;Sharded parameters don't have the MiCS_CommGroups attached.zQMight due to the use of deepspeed.zero.Init context for initializing the weights.z]To use MiCS sharding, please use deepspeed.zero.MiCS_Init instead for initializing parameter.)r   r!   r"   nextr   hasattrjoinr   r~   dp_process_grouprt   partition_count)!r#   rM   init_optimizertimersr   static_loss_scaledynamic_loss_scaledynamic_loss_argsverbosecontiguous_gradientsreduce_bucket_sizeprefetch_bucket_sizemax_reuse_distancemax_live_parametersparam_persistence_thresholdmodel_persistence_thresholdr   reduce_scatteroverlap_commoffload_optimizer_configoffload_param_configsub_group_sizeoffload_ratior?   	clip_gradgradient_accumulation_dtypecommunication_data_typepostscale_gradientsgradient_predivide_factorgradient_accumulation_stepselastic_checkpoint
aio_configfirst_paramr%   r   r   r"   o  sp   ! 
zMiCS_Optimizer.__init__c                 O   s   t |i |S r    )r   )r#   argsr3   r   r   r   initialize_ds_offload  s   z$MiCS_Optimizer.initialize_ds_offloadparams_to_releasegrad_partitionsr   c                    s   t  ||}| || d S r    )r!   partition_gradsallreduce_mics_shard_grads)r#   r   r   grad_buffersr%   r   r   r     s   zMiCS_Optimizer.partition_gradspartitioned_grads_buffersc           	   	   C   s   | j r	t|dkrdS |d j}|j}|j}|du s|dkr dS t |d s,tdt	 r>t
|| tj||d dS t|}|| tj||d d}|D ]}|d|d||  || 7 }qSdS )r'   r   Nr   z0Local sharding has no support for CPU offloading)tensorsrq   r   ro   )!is_gradient_accumulation_boundarylenr   param_repli_groupparam_repli_sizer   on_acceleratorr,   rD   has_all_reduce_coalescedr   all_reduce_coalescedrx   catdiv_
all_reducer{   r   r   rw   )	r#   r   r   rL   r   r   aggregated_bufferoffset	grad_buffr   r   r   r     s*   



z)MiCS_Optimizer.allreduce_mics_shard_gradsc                    s   t  |||| dS )z Loading the ZeRO-3/MiCS partitioned checkpoints
        Because the self.dp_process_group is replaced with the communicator for
        partition group we can call the load_state_dict logic from ZeRO-3.
        N)r!   load_state_dict)r#   state_dict_listload_optimizer_statesload_from_fp32_weightscheckpoint_folderload_serialr%   r   r   r     s   
zMiCS_Optimizer.load_state_dict)TFNN)r7   r8   r9   r:   sysmaxsizerx   float16r"   r   r   r   r   r   r   r   r   r<   r   r   r%   r   r   j  sN    	3" r   )#r   typingr   r@   rx   r   rD   deepspeed.runtime.zero.utilsr   !deepspeed.runtime.zero.mics_utilsr   r   r   (deepspeed.runtime.zero.parameter_offloadr	   +deepspeed.runtime.zero.partition_parametersr
   r   r   deepspeed.runtime.zero.stage3r   deepspeed.utilsr   r   r   deepspeed.acceleratorr   r   torch.nnr   r   r   r=   r   r   r   r   r   r   <module>   s*     