o
    TiM                     @   s   d dl Z d dlmZmZ d dl mZ d dlmZ d dlmZ d dl	m
Z d dlmZ d dlmZmZmZ d dlmZ d	d
 Zdd Zdd Zdd Zdd Zdd ZdddZG dd de jjZG dd de jjZdS )    N)AnyTuple)Tensor)Module)	rearrange)get_accelerator)get_shard_size_listset_num_kv_headsget_num_kv_heads)groupsc                 C   s^  |dkrW| dk r'|j \}}}}|||| ||g}d}	d}
||| || |g}n|j \}}}}|| dks?J d| d| d||||| |g}d}	d}
||| || |g}nR| dk rz|j \}}}}||| |||g}d	}	d}
||| || |g}n/|j \}}}}|| dksJ d| d| d||||| |g}d}	d	}
|| ||| |g}|	||
|fS )
z
    This function generates the parameters required for `permute` and `reshape` operations,
    which are used to process data before and after `all2all` communication.
    r      )   r   r         )r   r   r   r   r   Number of heads (z3) must be divisible by the sequence parallel size (z)!)r   r   r   r   r   N)shape)scatter_idxbatch_dim_idxseq_world_sizeinputbsglobal_seq_lennum_local_headhead_dimpre_all2all_inp_shapepre_all2all_permute_idxpost_all2all_permute_idxpost_all2all_res_shapelocal_seq_lennum_total_head r    L/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/sequence/layer.py_generate_layout_params   s4   ""r"   c                    s    fdd}|S )z?
    Post-processing function for `all2all` communication.
    c                    s(    d ur|    } |  }|S N)permute
contiguousreshape)r   outputpermute_idx	res_shaper    r!   	post_func@   s   zpost_all2all.<locals>.post_funcr    )r)   r*   r+   r    r(   r!   post_all2all;   s   r,   c                 C   s(   | | }| dur||  }|S )z>
    Pre-processing function for `all2all` communication.
    N)r&   r%   r$   )r)   	inp_shaper   input_tr    r    r!   pre_all2all_funJ   s   r/   c                 C   s2   t | ddd} | jdd\}}tj| |fddS )zA
    change sign so the last dimension becomes [-odd, +even]
    z... (j d) -> ... j dr   )jdim)r   unbindtorchcat)xx1x2r    r    r!   _rotate_halfT   s   r;   c                 C   sl   |j d }| dd|f | d|df } }| | t| |  } |j d dkr+| }|S tj| |fdd}|S )z
    input tensor t is of shape [seq_length, ..., dim]
    rotary positional embeding tensor freqs is of shape [seq_length, ..., dim]
    check https://kexue.fm/archives/8265 for detailed formulas
    r4   .Nr   r2   )r   r;   r6   r7   )t	freqs_cos	freqs_sinrot_dimt_passresr    r    r!   apply_rotary_pos_emb]   s   
"rB   c           "         s  t |}t| j}|dv sJ d|dk st|| |}| d| } |t  }|g| }	|| gt| jdd   }
t	j
|
| j| jd}t j|| |	||d |j||g|jdd  R  }|dkrg dttd	t|j }|| }|j|jd || | g|jd
d  R   }|S |dkr|dd
 }|j|| | g|jdd  R   }|S | | jd | jd d} |dkr| ddd } n|dkr| dd } | j\}}}tt |}|t  }|| }|| }| || |} t| jd | }|g| }||  t | } fdd|D }	t|| }t|| | }t	j
|| jd | j| jd}t j|| |	||d t | }|d }t | }|| }|| }|| } | } | }|j||gdd\}} ||||||}| |||||} |dkrg d}|| ||||}| | ||||} n#|dkrg d}|| ||||}| | ||||} t	j|| gdd }|| | ||< |d | |g ||d d   }!||!}|S )N)r   r   z#batch_dim_idx must be either 0 or 1r   r   r   )devicedtype)output_split_sizesinput_split_sizesgroup)r   r   r   r   r   r   r4   c                    s   g | ]}|  qS r    r    ).0num_local_headscoeffr    r!   
<listcomp>   s    z(uneven_heads_all2all.<locals>.<listcomp>r2   )r   r   r   r   r   )r   r   r   r   r   )distget_world_sizelistr   r   	transposer%   r   _get_sequence_parallel_rankr6   emptyrC   rD   all_to_all_singleviewrangelenr$   r&   r
   intsplitr7   )"r   r   
gather_idxr   rG   r   r-   input_splitslocal_headsoutput_splitsoutput_buffer_shaper'   orderseq_lenh
batch_sizenum_local_heads_listh_dimr   local_seq_len_with_headsheads_scale_coeffoutput_buff_d1_sizetotal_hchunk_num_heads_smallchunk_num_heads_largenum_chunk_heads_largenum_chunk_heads_smalltotal_num_large_headstotal_num_small_headsheads_large_combine_sizeheads_small_combine_sizeheads_large_chunkheads_small_chunkoutput_shaper    rJ   r!   uneven_heads_all2allo   s   



R(M










rs   Fc                 C   s  t |}| jd }	t d us|	| dkrA|dk sAt d u r1|	|ks-J d|	 d| dt|	 |dks9J dt| ||||S t|||| \}
}}}t|
|| }t||}t	
|}t j||||d}|r|d	v r|||d
 < |||d < |||d < ||S ||}|S )Nr   r   r   z.) must be larger than sequence parallel size ()Fz(uneven head sp does not support async op)rG   async_op)dqdk_work_grad_post_all2all_func)rM   rN   r   r
   r	   rs   r"   r/   r,   r6   
empty_likerS   rT   )r   r   rY   r   rG   ru   handletyper   	num_headsr   r   r   r   r.   post_all2all_funr'   workrA   r    r    r!   single_all_to_all   s.   





r   c                   @   sh   e Zd Ze				ddedejdedededed	efd
dZ	ededed	e
deddf fddZdS )_SeqAllToAllNTctxrG   r   r   rY   r   returnc
              	   C   s   || _ || _|| _|| _|| _|| _|| _| jd u r%t|||||d}
|
S |	sF|dkrF| jd ks2J t|||||d}
t 	 
| j |
S |	s]|dv r]d| }t|||||d||}
|
S |	rt|dv rtd| }t|||||d||}
|
S t|||||d}
|
S )NFo)qkdTfwd_)rG   r   rY   streamr|   r}   r   r   r   current_streamwait_stream)r   rG   r   r   rY   r   r   r|   r}   is_fwdrA   r    r    r!   forward  s2   

z_SeqAllToAll.forwardgrad_outputc              	   G   sN   d t j| jg|| j| j| j| j| j| jdR  d d d d d d d f	S )NF)	r   applyrG   rY   r   r   r   r|   r}   )r   r   r    r    r!   backward/  s    z_SeqAllToAll.backward)NNNT)__name__
__module____qualname__staticmethodr   rM   ProcessGroupr   rW   r   r   r   r    r    r    r!   r     s*    	+(r   c                       sn   e Zd ZdZ			ddedejdeded	df
 fd
dZdd Z		dde
de
de
deded	e
fddZ  ZS )DistributedAttentiona  Initialization.

    Arguments:
        local_attention (Module): local attention with q,k,v
        sequence_process_group (ProcessGroup): sequence parallel process group
        scatter_idx (int): scatter_idx for all2all comm
        gather_idx (int): gather_idx for all2all comm
    r   r   Nlocal_attentionsequence_process_groupr   rY   r   c                    s`   t t|   || _|| _|| _|| _d| _d | _|| _	|d ur.i | _d| _t
  | _d S d S )NFT)superr   __init__
local_attnspgr   rY   sp_overlap_commoverlap_handles	sp_streamr   default_stream)selfr   r   r   rY   r   	__class__r    r!   r   A  s   	zDistributedAttention.__init__c                 C   s*   | j rt|dr| j|j d S d S d S )N
done_event)r   hasattrr   
wait_eventr   )r   layerr    r    r!   
layer_syncW  s   zDistributedAttention.layer_syncquerykeyvaluer   argsc              
      sf   fdd}  | t j| j j|d jd}	  | t j| j j|d jd}
 jr: j	 j
 t j| j j|d jd} jrm|jjd d }||dd |jjd d }||dd |dur|d d	dd
d|d	 d	dd
d}}t|	||}	t|
||}
 j|	|
|g|R i |}t j| j j| j
 jd}|S )a\   forward

        Arguments:
            query (Tensor): query input to the layer
            key (Tensor): key input to the layer
            value (Tensor): value input to the layer
            batch_dim_idx (int): indicating which dim is batch
            args: other args

        Returns:
            * output (Tensor): context output
        c                    s    fdd}|S )Nc                    s`   d  }j |d    jj j |d  }t| } j |d  || d< t| } d S )Nr   rx   ry   rz   r   )r   waitr   r   r   rO   tuple)gradr}   all2all_output)
layer_typer   r    r!   pre_hook_funv  s   zDDistributedAttention.forward.<locals>.bwd_hook.<locals>.pre_hook_funr    )r   r   r   r   r!   bwd_hookt  s   	z.DistributedAttention.forward.<locals>.bwd_hookNr   r   vr   r   r   r   r   r   )r   r   r   r   r   rY   r   r   r   r   r   grad_fnnext_functionsregister_prehookr$   rB   r   )r   r   r   r   r   rotary_pos_embr   kwargsr   query_layer	key_layervalue_layer	grad_fn_q	grad_fn_kpos_emb_cospos_emb_sincontext_layerr'   r    r   r!   r   [  s8   

*
zDistributedAttention.forward)r   r   Nr#   )r   r   r   __doc__r   rM   r   rW   r   r   r   r   r   __classcell__r    r    r   r!   r   7  s>    	r   )FNN) r6   typingr   r   r   torch.nnr   einopsr   deepspeed.commcommrM   deepspeed.acceleratorr    deepspeed.module_inject.tp_shardr   r	   r
   deepspeed.utilsr   r"   r,   r/   r;   rB   rs   r   autogradFunctionr   nnr   r    r    r    r!   <module>   s$   (
	
n$6