o
    پioR                     @   s  d dl mZ d dlmZ d dlmZmZmZmZ d dl	Z	d dl
m  mZ d dlZd dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZm Z  er]d d	l!m"Z" d
e#fddZ$dd Z%dd Z&dd Z'd=ddZ(dee	j)ef fddZ*d=ddZ+d=ddZ,eG dd dZ-d e#d!e#d"e.fd#d$Z/de	j)fd%d&Z0d'e	j)fd(d)Z1ej2d*ej3d!ej3d+ej3fd,d-Z4d.d/ Z5d0eee	j)ee	j)f fd1d2Z6d>d3d4Z7de	j)fd5d6Z8d7d8 Z9d9d: Z:d;d< Z;dS )?    )	dataclass)
accumulate)TYPE_CHECKINGListTupleUnionN)use_symmetric_memory)DpPaddingModeattn_cp_all_gather_into_tensorget_attention_cp_groupget_attention_cp_rankget_attention_cp_sizeget_attention_dp_rankis_allocation_symmetric)get_global_server_args)
ceil_alignceil_div)ForwardBatchnsa_index_topkc                 C   s   | j |dS )N)max)clamp)original_seq_lensr    r   Y/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/layers/attention/nsa/utils.pycompute_nsa_seqlens   s   r   c                   C   s   t  jS N)r   #enable_nsa_prefill_context_parallelr   r   r   r   is_nsa_enable_prefill_cp"   s   r   c                   C      t  ot jdkS )Nzin-seq-splitr   r   nsa_prefill_cp_moder   r   r   r   is_nsa_prefill_cp_in_seq_split&      
r!   c                   C   r   )Nzround-robin-splitr   r   r   r   r   #is_nsa_prefill_cp_round_robin_split-   r"   r#   forward_batchr   c                 C   s4   | j  sdS t }t| j}t o|dko|dkS )NFr      )forward_modeis_context_parallel_extendr   sumextend_seq_lens_cpur#   )r$   cp_sizeseq_lenr   r   r   $can_nsa_prefill_cp_round_robin_split4   s
   

r,   input_c                 C   s   t  }t }t| ttfrt|t| |}| | S t| }|| dkrM|| || |k }|dkr?| jdg| jdd R  S t	j
|||| jd}| | S | jd|g| jdd R  dd|f  S )aO  
    # for round-robin-split, split the tokens evenly according to the rule of token_idx % cp_size.
    |   +-----------before split------------+|
    | token0, token1, token2, token3, token4, token5, token6, token7, ...
    |
    |   +--------------result-------------------+
    | dp_atten_tp0: token0, token4, token8, token12, token16, ... |
    | dp_atten_tp1: token1, token5, token9, token13, token17, ... |
    | dp_atten_tp2: token2, token6, token10, token14, token18, ... |
    | dp_atten_tp3: token3, token7, token11, token15, token19, ... |
    |   +-------------------------+
    r   r%   N)device)r   r   
isinstancetuplelistrangelen	new_emptyshapetorcharanger.   view
contiguous)r-   r*   cp_rankindicestokenscur_lenr   r   r   nsa_cp_round_robin_split_data<   s   .r?   c                 C   s   | j  }t|}t }t|D ]}t|| |||< qt| j|}|	 r,t
|}nt|dkr8|t  }n|d }t| rEt||}|S Nr%   r   )global_num_tokens_cpucopyr4   r   r3   r   r	   get_dp_padding_modeis_extend_in_batch
is_max_lenr   r   r,   r   )r$   global_num_tokenssync_group_sizeattn_cp_sizeidp_padding_moder=   r   r   r   cal_padded_tokens[   s    


rK   c                 C   st   t  }|dko
t| }| jd u}|s|s|S t| }||jd  }|dkr8t||j|g|jdd  R  g}|S r@   )r   r,   rA   rK   r6   r7   cat	new_zeros)r$   nsa_cache_seqlensrH   needs_cp_padneeds_dp_padr=   pad_lenr   r   r   pad_nsa_cache_seqlensq   s    

rR   c                   @   s   e Zd ZU dZee ed< dZee ed< dZee ed< dZ	ee ed< dZ
ee ed< dZee ed< dZeed	< dZeed
< dZeed< dZeed< dZejed< dZejed< dZejed< dZejed< dZejed< dS )NSAContextParallelMetadataN
split_listmax_rank_lenzigzag_indexper_rank_actual_tokenreverse_split_lencp_reverse_indexr/   kv_len_prevkv_len_nextactual_seq_q_prevactual_seq_q_nextkv_len_prev_tensorkv_len_next_tensoractual_seq_q_prev_tensoractual_seq_q_next_tensortotal_seq_lens)__name__
__module____qualname__rT   r   int__annotations__rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r7   Tensorr_   r`   ra   rb   r   r   r   r   rS      s    
 rS   r+   r*   use_nsac                 C   sj   t  r| | }| | dksJ d|  d| dn| |d  }|dkr3|dkr3|r3|j r3t r3dS dS )	Nr   zseq_len z is not divisible by cp_size z. when nsa_prefill_cp_mode is round-robin-split   r%   TF)r#   r&   r'   r   )r+   r*   ri   r$   cur_cp_seq_lenr   r   r   can_cp_split   s    rl   c                    s   t  rt }|jd | dksJ d|j d| t|S ttj|| jjdd tj	 fdd| jj
D ddd|jd }|S )Nr   zAExpect input shape 0 can divided by cp size, but got input shape 
, cp size dimc                       g | ]} | qS r   r   .0rI   
input_listr   r   
<listcomp>       z-cp_split_and_rebuild_data.<locals>.<listcomp>r/   )r#   r   r6   r?   r2   r7   splitnsa_cp_metadatarT   rL   rV   r9   )r$   r-   r*   resultr   rs   r   cp_split_and_rebuild_data   s   rz   	positionsc                    sx   t  rt }|jd | dksJ d|j d| t|S ttj|| jjdd tj	 fdd| jj
D dd}|S )Nr   zIExpect positions shape 0 can divided by cp size, but got positions shape rm   r/   rn   c                    rp   r   r   rq   position_id_listr   r   ru      rv   z1cp_split_and_rebuild_position.<locals>.<listcomp>)r#   r   r6   r?   r2   r7   rw   rx   rT   rL   rV   )r$   r{   r*   r   r|   r   cp_split_and_rebuild_position   s    
r~   r=   r;   c                 C   s   d}d}t |D ]5}t| | }	|	|7 }	|	| |	| |k }
|
dkr7t|| | t|| |
 |d7 }|	|
|  }qd S )Nr   r%   )r3   tlloadstore)in_seqs_ptrout_seqs_ptr
bs_idx_ptrr=   r*   r;   	extra_seqbs_idxbsr>   cur_seqr   r   r   &nsa_cp_round_robin_split_q_seqs_kernel   s   	r   c           	      C   s   t  }t }d}g }t| D ]\}}||7 }|| t|| |k }|| |||  }qtdd t|D }dd |D }||fS )Nr   c                 S   s   g | ]
\}}|d kr|qS r   r   )rr   rI   xr   r   r   ru      s    z7nsa_cp_round_robin_split_q_seqs_cpu.<locals>.<listcomp>c                 S   s   g | ]}|d kr|qS r   r   )rr   q_lenr   r   r   ru      s    )r   r   	enumeraterf   appendr2   )	extend_seqsr*   r;   r   q_seqsr   r>   r   r   r   r   r   #nsa_cp_round_robin_split_q_seqs_cpu   s   
r   returnc           	      C   sv   t  }t }t| \}}tjt|f|j|jd}tjt|f|jtjd}d}t	| |||t||| ||||fS )a  
    round-robin-split distributes tokens across ranks based on token_idx % cp_size.

    Return:
    ret_q_lens_cpu(List) and ret_q_lens(torch.Tensor): the partitioned length (excluding zeros) on the current cp rank
        for each sequence after distribution across cp ranks.
    bs_idx_cpu(List) and bs_idx(torch.Tensor): marks which sequences are ultimately selected,
        i.e., those with a partitioned length greater than zero.
    r.   dtyper%   )
r   r   r   r7   emptyr4   r.   r   int32r   )	extend_seqs_cpur   r*   r;   ret_q_lens_cpu
bs_idx_cpu
ret_q_lensr   gridr   r   r   nsa_cp_round_robin_split_q_seqs   s   r   c                 C   s.   |d u rt  }| jd ur|r| j rdS dS )NTF)r   rx   r&   r'   )r$   nsa_enable_prefill_cpr   r   r   nsa_use_prefill_cp  s   
r   c           	         s   || d | }|| j d  }|dkr tj| ddd|fddd} tt t  d tj|| | j d | j| j	d}W d   n1 sDw   Y  t 
|| | ttj||jjdd tj fd	d
t|jjD dd}|S )a  
    Allgather communication for context_parallel(kv_cache, index_k, hidden_states).
    This implementation mainly consists of three parts:
    Step 1, padding the input shape to unify the shape for allgather communication (the shape must be the same).
    Step 2, allgather communication(async).
    Step 3, removing the padding and reassembling the data according to the actual tokens.
    r%   r   constant)modevaluedisabledr   Nrn   c                    s    g | ]\}} | d | qS r   r   )rr   indexper_rank_lenoutputs_list_maxr   r   ru   D  s    zBcp_attn_tp_all_gather_reorganazied_into_tensor.<locals>.<listcomp>)r6   Fpadr   r   r   r7   r   r.   r   cp_all_gather_into_tensor_asyncr2   rw   rx   rU   rL   r   rW   )	r-   	total_lenattn_tp_sizer$   	stream_opmax_lenpad_sizeinput_tensor_alloutputsr   r   r   .cp_attn_tp_all_gather_reorganazied_into_tensor#  s:   


	r   c                    s  t  rNtt t  d | | jd | g| jdd R }W d   n1 s*w   Y  t||  |j}|j|dg|dd R  dd	|}|S | j\}}t
| |jj|||}ttj||jjdd tj fdd|jjD dd}|d|}|S )	a,  
    # for in-seq-split
    |   +-----------before allgather------------+|
    |   | dp_atten_tp0: block0, block7 |
    |   | dp_atten_tp1: block1, block6 |
    |   | dp_atten_tp2: block2, block5 |
    |   | dp_atten_tp3: block3, block4 |
    |
    |   +----------before rerange---------------+|
    | block0 | block7 | block1 | block6 | block2 | block5 | block3 | block4 |
    |
    |   +--------------result-------------------+
    | block0 | block1 | block2 | block3 | block4 | block5 | block6 | block7 |
    |   +-------------------------+

    # for round-robin-split
    |   +-----------before allgather------------+|
    | dp_atten_tp0: token0, token4, token8, token12, token16, ... |
    | dp_atten_tp1: token1, token5, token9, token13, token17, ... |
    | dp_atten_tp2: token2, token6, token10, token14, token18, ... |
    | dp_atten_tp3: token3, token7, token11, token15, token19, ... |
    |
    |   +--------------result-------------------+
    | token0, token1, token2, token3, token4, token5, token6, token7, ...
    |   +-------------------------+
    r   r   r%   Nr/   rn   c                    rp   r   r   rq   outputs_listr   r   ru     rv   z0cp_all_gather_rerange_output.<locals>.<listcomp>)r#   r   r   r   r5   r6   r
   r9   	transposereshaper   rx   rb   r2   r7   rw   rX   rL   rY   )input_tensorr*   r$   streamoutput_tensor	out_shape
bs_seq_lenhidden_sizer   r   r   cp_all_gather_rerange_outputO  sH   


r   c                 C   s.  d}g }i }t t| D ]}i }g }| | }|t|k rr|| }	|	|kr9|}
|	||
< ||
d|	 ||
< |d7 }n9|	|krT|}
|||
< ||
d| ||
< |	| ||< n|}
|	||
< ||
d|	 ||
< ||	8 }|d7 }|t|k s| D ]\}
}||
d| }|| }||
||f qv|| q|S )zUsed to obtain the index of the seq corresponding
    to each cp block in the forwardbatch, and the starting
    and ending positions of the corresponding seq in the cp blockr   r%   )r3   r4   getitemsr   )cp_chunks_lenseqs_lenj	tuple_len
cumulativerI   current_dictcurrent_tuplesc_vals_validxvalprev_cumcurrent_cumr   r   r   calculate_cp_seq_idx  sB   r   c                    s  t  rdS 	 t| } d}| } d }| | }||  | | }|dkr7dd d | D d |< |   d   }	|	   }
tt||||  |tt|| d || | }t fddt D } fd	dt D }g }t|D ]!}|tt||| d| tt|d | | dd
|   qtt	}|| }| d | d  }| } d | d  }t|j
dtjd}t|j
dtjd}t|j
dtjd}t|j
dtjd}t|
|||||||||||||d}|S )NTr%   rj   r   c                 S   s   g | ]}|d  qS r   r   )rr   r   r   r   r   ru     rv   z0prepare_input_dp_with_cp_dsa.<locals>.<listcomp>c                 3   s,    | ]}|  d  | d   V  qdS )rj   r%   Nr   rq   r*   rT   r   r   	<genexpr>  s    
z/prepare_input_dp_with_cp_dsa.<locals>.<genexpr>c                    s2   g | ]}|  d  | d  fD ]}|qqS )rj   r%   r   )rr   rI   elementr   r   r   ru   	  s    cudar   )rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   )r#   r7   tensorrepeat_interleaverf   tolistr2   r3   extendr   tor   rS   )kv_lenr;   r*   r   bs_per_cp_groupkv_len_origincp_segment_numseq_per_batch	remainderseq_max_rank_lenrU   rV   rW   rX   rY   batch_idprefix_sum_listrZ   r[   r\   r]   r^   r_   r`   ra   rx   r   r   r   prepare_input_dp_with_cp_dsa  s   
&




r   )r$   r   r   )<dataclassesr   	itertoolsr   typingr   r   r   r   r7   torch.nn.functionalnn
functionalr   tritontriton.languagelanguager   <sglang.srt.distributed.device_communicators.pynccl_allocatorr   sglang.srt.layers.dp_attentionr	   r
   r   r   r   r   r   sglang.srt.server_argsr   sglang.srt.utils.commonr   r   ,sglang.srt.model_executor.forward_batch_infor   rf   r   r   r!   r#   r,   rh   r?   rK   rR   rS   boolrl   rz   r~   jit	constexprr   r   r   r   r   r   r   r   r   r   r   r   <module>   sZ   $	





,B0