o
     i                     @   sF  d dl mZ d dlZd dlmZ d dlmZ deejvr#ejjej_	deejvr0ejj
ej_d$ded	ed
efddZd$ded	ed
efddZd$ded	ed
efddZG dd dejjZejZG dd dejjZejZG dd dejjZejZdejjd	efddZdejjd	efddZd%dededed ed!ef
d"d#ZdS )&    )OptionalN)Tensor)ProcessGroupall_gather_into_tensorreduce_scatter_tensorFinput_process_groupasync_opc                 C   s^   t j|}t j|| jd  g| jdd  R | j| jd}t jj||  ||d}||fS Nr      )dtypedevicegroupr	   )	torchdistributedget_world_sizeemptyshaper   r   r   
contiguousr   r   r	   
world_sizeoutputhandle r   Z/home/ubuntu/.local/lib/python3.10/site-packages/xformers/_flash_attn/utils/distributed.pyall_gather_raw   s   r   c                 C   st   t j|}| jd | dksJ t j| jd | g| jdd  R | j| jd}t jj||  ||d}||fS r
   )	r   r   r   r   r   r   r   r   r   r   r   r   r   reduce_scatter_raw   s   r   c                 C   s"   |   } tjj| ||d}| |fS )Nr   )r   r   r   
all_reduce)r   r   r	   r   r   r   r   all_reduce_raw+   s   r   c                   @   <   e Zd ZdZedededefddZedefdd	Zd
S )AllGatherFunc?Gather the input from sequence parallel region and concatenate.r   r   returnc                 C      || _ t||\}}|S N)r   r   ctxr   r   r   _r   r   r   forward4      zAllGatherFunc.forwardgrad_outputc                 C      t || j\}}|d fS r%   )r   r   r'   r+   
grad_inputr(   r   r   r   backward:      zAllGatherFunc.backwardN	__name__
__module____qualname____doc__staticmethodr   r   r)   r/   r   r   r   r   r!   1       r!   c                   @   r    )ReduceScatterFunczKReduce scatter the input from the sequence parallel region and concatenate.r   r   r#   c                 C   r$   r%   )r   r   r&   r   r   r   r)   G   r*   zReduceScatterFunc.forwardr+   c                 C   r,   r%   )r   r   r-   r   r   r   r/   M   r0   zReduceScatterFunc.backwardNr1   r   r   r   r   r8   D   r7   r8   c                   @   r    )AllReduceFuncr"   r   r   r#   c                 C   r$   r%   )r   r   r&   r   r   r   r)   Z   r*   zAllReduceFunc.forwardr+   c                 C   s   |d fS r%   r   )r'   r+   r   r   r   r/   `   s   zAllReduceFunc.backwardNr1   r   r   r   r   r9   W   r7   r9   modelc              	   C   sp   dd |   D }t| D ]&\}}t  tjj|tj|d|d W d    n1 s0w   Y  qd S )Nc                 S   "   i | ]\}}t |d dr||qS )_shared_paramsFgetattr.0namepr   r   r   
<dictcomp>l   
    z&sync_shared_params.<locals>.<dictcomp>r   )srcr   )named_parameterssorteditemsr   no_gradr   	broadcastget_global_rank)r:   r   pamams_sharedr(   rB   r   r   r   sync_shared_paramsi   s   
rM   c                 C   s   dd |   D }dd t| D }|rOt + tj|}tjj||d t	|tj
||D ]	\}}|| q3W d    d S 1 sHw   Y  d S d S )Nc                 S   r;   )_sequence_parallelFr=   r?   r   r   r   rC   {   rD   z4allreduce_sequence_parallel_grad.<locals>.<dictcomp>c                 S   s   g | ]\}}|j qS r   )grad)r@   r(   rB   r   r   r   
<listcomp>~   s    z4allreduce_sequence_parallel_grad.<locals>.<listcomp>)r   )rF   rG   rH   r   rI   _utils_flatten_dense_tensorsr   r   zip_unflatten_dense_tensorscopy_)r:   r   params_seqparallelgrads	coalescedbufsyncedr   r   r    allreduce_sequence_parallel_gradx   s   
"r[   r   dimr   
local_rankmultiple_ofr#   c                 C   s0   | | }|| }|| }|t ||k  }|| S )zGet the dim for the local rank derived from splitting dim on world_size processes.

    The split may not be even across the world_size processes.
    )int)r\   r   r]   r^   multipledivmodlocal_multipler   r   r   get_dim_for_local_rank   s
   rd   )F)r   )typingr   r   r   torch.distributedr   dirr   _all_gather_baser   _reduce_scatter_baser   boolr   r   r   autogradFunctionr!   apply
all_gatherr8   reduce_scatterr9   r   nnModulerM   r[   r_   rd   r   r   r   r   <module>   s(    $