o
    ॵiH                     @   st   d dl Z d dlZd dlmZ d dlmZ d dlmZm	Z	 d dl
mZ d dlmZ dd Zdd	 ZG d
d deZdS )    N)mpu)_flatten_dense_tensors_unflatten_dense_tensors)Variable)Modulec                    s    fdd}|S )Nc                       t jjj|  dS Nmeanstdtorchnninitnormal_tensorr	    T/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/utils/nlp/distributed.pyinit_      z!normal_init_method.<locals>.init_r   )r
   r   r   r   r	   r   normal_init_method   s   r   c                    s$   t d|   fdd}|S )z3Init method based on N(0, sigma/sqrt(2*num_layers).g       @c                    r   r   r   r   r	   r   r   r   &   r   z!scaled_init_method.<locals>.init_)mathsqrt)r
   r   
num_layersr   r   r	   r   scaled_init_method"   s   r   c                       s8   e Zd Z fddZdd Zddd	ZdddZ  ZS )DistributedDataParallelc                    s   t t  tjtjjkrdnd_|_t	
 _t	 }j D ]}t|r4tj||jd q$			dfdd	 g _g _tj D ]} fdd}qL _d S )	NTFgroupc                    s,   j rd _ i } j D ] \}}|jr-|jd ur-|j }||vr&g ||< || | q jr?t	j
j|v r?tdd d _|D ]R}|| }dd |D }t|}	|rX|	 }	|se| se|	tj jd }	tj|	 jd t	j
  |s| r|	tj jd }	t|t|	|D ]	\}
}|
| qqAd S d S )NFzEWARNING: gloo dist backend for half parameters may be extremely slow.z7It is recommended to use the NCCL backend in this case.c                 S   s   g | ]}|j jqS r   )graddata).0paramr   r   r   
<listcomp>N   s    zNDistributedDataParallel.__init__.<locals>.allreduce_params.<locals>.<listcomp>r   )needs_reductionmodulenamed_parametersrequires_gradr   r    typeappendwarn_on_halfr   cuda
HalfTensorprintr   floatdistget_world_sizedata_parallel_group
all_reducesynchronizezipr   copy_)reduce_afterno_scalefp32_allreducebucketsnamer"   tpbucketgrads	coalescedbufsynced)selfr   r   allreduce_params9   sP   


z:DistributedDataParallel.__init__.<locals>.allreduce_paramsc                     s   t j  d S N)r   _execution_enginequeue_callback)unused)rB   r   r   allreduce_hookb   s   z8DistributedDataParallel.__init__.<locals>.allreduce_hook)TFF)superr   __init__r/   _backenddist_backendGLOOr*   r%   r   get_data_parallel_groupr1   get_tensor_model_parallel_rank
parametersr   	is_tensor	broadcasthook_handleshookslistrB   )rA   r%   src_rankpr"   rG   	__class__)rB   rA   r   rI   .   s$   

%
z DistributedDataParallel.__init__c                 O   s   d| _ | j|i |S )NT)r$   r%   )rA   inputskwargsr   r   r   forwardg   s   zDistributedDataParallel.forwardN Fc                 C   s   | j |||}|S rC   )r%   
state_dict)rA   destinationprefix	keep_varssdr   r   r   r]   k   s   z"DistributedDataParallel.state_dictTc                 C   s   | j j||d d S )N)strict)r%   load_state_dict)rA   r]   rb   r   r   r   rc   p   r   z'DistributedDataParallel.load_state_dict)Nr\   F)T)__name__
__module____qualname__rI   r[   r]   rc   __classcell__r   r   rW   r   r   ,   s
    9
r   )r   r   torch.distributeddistributedr/   megatron_utilr   torch._utilsr   r   torch.autogradr   torch.nn.modulesr   r   r   r   r   r   r   r   <module>   s   
