o
    ٷi^`                  
   @   s  d dl Z d dlmZmZmZmZ d dlZd dlmZ	 d dl
m  mZ d dlm  mZ d dlmZ zd dlmZmZmZmZ W n eyU   dd ZeZeZeZeZY nw d dlmZ eeZg dZd	ejfd
dZ de	j!d	ee"e"f fddZ#e j$ddde"de	j!d	ee" fddZ%de"de	j!d	ee" fddZ&dejde"de	j!d	eeje"f fddZ'dejde"de	j!d	ejfddZ(dejde"de	j!d	eeje"f fddZ)dejde"de	j!d	ejfdd Z*d!ejd	e+fd"d#Z,dejde	j!d	ejfd$d%Z-dejde	j!d	ejfd&d'Z.dejde	j!d	ejfd(d)Z/dejde	j!d	ejfd*d+Z0dejde	j!d	ed,ejf fd-d.Z1dejde	j!d	ed,ejf fd/d0Z2ej3j4dejde	j!d	ed,ejf fd1d2Z5ej3j4dejde	j!d	ed,ejf fd3d4Z6ej3j4dejde	j!d	ed,ejf fd5d6Z7ej3j4dejde	j!d	ed,ejf fd7d8Z8	d>d9ee9 d	ed,ejf fd:d;Z:	d>d9ee9 d	ed,ejf fd<d=Z;dS )?    N)TupleListCallableOptional)current_platform)per_token_quant_fp8per_token_dequant_fp8qkv_permute_quant_fp8qkv_dequant_permute_fp8c                  O   s   t d)NzFP8 kernels could not be imported (e.g., Triton may not be available on this platform). FP8 async operations are not supported. Please install the required dependencies or disable FP8 mode.)RuntimeError)argskwargs r   k/home/ubuntu/.local/lib/python3.10/site-packages/cache_dit/parallelism/attention/_distributed_primitives.py_fp8_kernel_unavailable   s   r   )init_logger)_all_to_all_single_qkv_async_all_to_all_single_o_async)_all_to_all_single_qkv_uneven_heads_async'_all_to_all_single_o_uneven_heads_async _all_to_all_single_qkv_fp8_async_all_to_all_single_o_fp8_async _all_to_all_single_any_qkv_async_all_to_all_single_any_o_async$_all_to_all_single_any_qkv_fp8_async"_all_to_all_single_any_o_fp8_async_prepare_ulysses_comm_metadata _unified_all_to_all_qkv_async_fn_unified_all_to_all_o_async_fnreturnc                 C   s   t | tjr
|  } | S N)
isinstancefcAsyncCollectiveTensorwait)tensorr   r   r   _wait_tensorB   s   r&   groupc                 C   s    t j| d}t j| d}||fS )Nr'   )distget_world_sizeget_rank)r'   
world_sizerankr   r   r   _get_rank_world_sizeI   s   r.      )maxsizesizec                    sx   t j|d}tt j|d}d|v rdnt   fddt|D }t j|tj	| g tj
d|d dd |D }|S )zwGather the local size from all ranks.
    size: int, local size
    return: List[int], list of size from all ranks
    r(   cpuc                    s   g | ]}t jd  t jdqS )   devicedtype)torchemptyint64).0_gather_devicer   r   
<listcomp>\   s    z(_gather_size_by_comm.<locals>.<listcomp>r5   c                 S   s   g | ]}|d    qS )r   )item)r;   sr   r   r   r?   e   s    )r)   r*   strget_backendr   default_devicerange
all_gatherr8   r%   r:   )r1   r'   r,   comm_backendsgathered_sizesr   r=   r   _gather_size_by_commQ   s   
rI   Hc                 C   sd   | dusJ dt |\}}g }| | }| | }t|D ]}||k r*||d  q|| q|S )zSplit the head dimension size by world_size.
    H: int, global head num
    return: List[int], list of local head num for each rank
    Nz#Global head num H must be provided.r4   )r.   rE   append)rJ   r'   r-   r,   output_split_sizesbase_head_num	remainderir   r   r   _split_head_sizesk   s   rP   xc                 C   sp   t |\}}d}|| dkr4|||  }|| | }||k s(J d| d| t| ddd|f } | |fS )zMaybe pad the head dimension to be divisible by world_size.
    x: torch.Tensor, shape (B, S_LOCAL, H, D)
    H: int, original global head num
    return: Tuple[torch.Tensor, int], padded tensor (B, S_LOCAL, H + H_PAD, D) and H_PAD
    r   Padding head num ( should be less than new local head num r.   Fpad
contiguous)rQ   rJ   r'   r<   r,   H_PADNEW_H_LOCALr   r   r   _maybe_pad_qkv_head   s   

rZ   rX   c                 C   sJ   t |\}}|dkr!||d kr!| ddddd| ddf } |  S )zMaybe unpad the head dimension.
    x: torch.Tensor, shape (B, S_GLOBAL, H_LOCAL + H_PAD, D)
    H_PAD: int, head padding num
    return: torch.Tensor, unpadded tensor (B, S_GLOBAL, H_LOCAL, D)
    r   r4   N)r.   rW   )rQ   rX   r'   r-   r,   r   r   r   _maybe_unpad_qkv_head   s   
"r[   c                 C   s   |du r| dfS t |\}}d}|| dkrB|||  }|| | }||k s0J d| d| ||d krBt| ddd|f } | |fS )zMaybe pad the head dimension to be divisible by world_size.
    x: torch.Tensor, shape (B, S_GLOBAL, H_LOCAL, D)
    H: int, original global head num
    return: Tuple[torch.Tensor, int], padded tensor (B, S_GLOBAL, H_LOCAL + H_PAD, D) and H_PAD
    Nr   rR   rS   r4   rT   )rQ   rJ   r'   r-   r,   rX   rY   r   r   r   _maybe_pad_o_head   s   

r\   c                 C   s2   |dkr| ddddd| ddf } |   S )zMaybe unpad the head dimension.
    x: torch.Tensor, shape (B, S_LOCAL, H_GLOBAL + H_PAD, D)
    H_PAD: int, head padding num
    return: torch.Tensor, unpadded tensor (B, S_LOCAL, H_GLOBAL, D)
    r   N)rW   )rQ   rX   r'   r   r   r   _maybe_unpad_o_head   s   
"r]   queryc                 K   s   | j d }i }||d< |S )N   num_qo_head)shape)r^   r   r`   extra_kwargsr   r   r   r      s   
r   c                    s   t \}}j\}}}}t|\ |  | }	||||	|ddddd j tdddt	j
f fdd	}
|
S )
o
    x: torch.Tensor, shape (B, S_LOCAL, H, D)
    return: Callable that returns (B, S_GLOBAL, H_LOCAL, D)
    r_   r4   r         Nr   c                      s:   t dddddd t S Nr   r4   r_   rd   )r&   reshapeflattenpermuterW   r[   r   rX   _shaper'   rQ   r   r   r$         "z*_all_to_all_single_qkv_async.<locals>.wait)r.   ra   rZ   rg   ri   rW   rh   r"   all_to_all_singler8   TensorrQ   r'   r   r<   r,   BS_LOCALrJ   DH_LOCALr$   r   rj   r   r      s   	$
r   c                    s   | dd}t\}}t|\ j\}}}}	|| }
|||
||	ddddd j t	dddt
jf fd	d
}|S )v
    x: torch.Tensor, shape (B, S_GLOBAL, H_LOCAL, D)
    return: Callable that returns (B, S_LOCAL, H_GLOBAL, D)
    r`   Nr4   rd   r   r_   re   r   c                      s:   t dddddd t S rf   )r&   rg   rh   ri   rW   r]   r   rj   r   r   r$     rl   z(_all_to_all_single_o_async.<locals>.wait)getr.   r\   ra   rg   ri   rW   rh   r"   rm   r8   rn   rQ   r'   r   rJ   r<   r,   rp   S_GLOBALrs   rr   rq   r$   r   rj   r   r     s   $
r   c                    s   t |\}j\ }dd tjddD }|| dddd g }t|||dtjf fd	d
}|S )zAnother variant for uneven head splits without padding.
    x: torch.Tensor, shape (B, S_LOCAL, H_GLOBAL, D)
    return: Callable that returns (B, S_GLOBAL, H_LOCAL, D)
    c                 S      g | ]}| d qS )r_   r1   )r;   rO   r   r   r   r?   6      z=_all_to_all_single_qkv_uneven_heads_async.<locals>.<listcomp>r_   dimr   r4   rd   r   c                      sH   t  ddddd   S )Nr_   r   rd   r4   re   r&   rg   ri   rW   r   rp   rr   rs   rq   r,   rQ   r   r   r$   >  s
   z7_all_to_all_single_qkv_uneven_heads_async.<locals>.wait)	r.   ra   r8   tensor_splitri   rW   r"   rm   rn   )rQ   r'   r   r-   H_GLOBALinput_split_sizesrL   r$   r   r~   r   r   '  s   	
r   c                    s   | dd}j\ }}t|\}}t||}t|||  ||ddddd dd|g| }	t	
||	|dtjf fd	d
}
|
S )zAnother variant for uneven head splits without padding.
    x: torch.Tensor, shape (B, S_GLOBAL, H_LOCAL, D)
    return: Callable that returns (B, S_LOCAL, H_GLOBAL, D)
    r`   Nr4   rd   r   r_   re   r   c                      s0   t  dddd S )Nr4   r_   r   rd   r}   r   rp   rr   r   rq   rQ   r   r   r$   i  s   z5_all_to_all_single_o_uneven_heads_async.<locals>.wait)ru   ra   r.   rP   sumrg   ri   rW   rh   r"   rm   r8   rn   )rQ   r'   r   rJ   rw   rs   r-   r,   rL   r   r$   r   r   r   r   L  s   

	r   .c                    s   t \}}j\}}}}t|\ |  | }	||||	|tj tdddtj	f fdd}
|
S )rc   Nr   c                      s2   t ddtt S )Nr   r4   )r&   rg   rh   r
   r[   r   rX   r'   shape_with_scalerQ   r   r   r$     s
   z._all_to_all_single_qkv_fp8_async.<locals>.wait)
r.   ra   rZ   rg   r	   rh   r"   rm   r8   rn   ro   r   r   r   r   u  s   	r   c                    s   | dd}t\}}t|\ j\}}}}	|| }
|||
||	ddddd jtj t	
dddtjf fd	d
}|S )rt   r`   Nr4   rd   r   r_   re   r   c                      sL   t tdddddd t S rf   )r&   rg   r   rh   ri   rW   r]   r   rX   rk   r'   r   rQ   r   r   r$     s   
"z,_all_to_all_single_o_fp8_async.<locals>.wait)ru   r.   r\   ra   rg   ri   rW   r   rh   r"   rm   r8   rn   rv   r   r   r   r     s   $r   c                    s   t \}}j\}}}}t|\ |  | }	||||	|ddddd |g| }
t|}ddt	||
dt
jf fdd}|S )	rc   r_   r4   r   rd   re   r   c                      s,   t dddd t S )Nr4   r   r_   rd   )r&   ri   rW   r[   r   rX   r'   rQ   r   r   r$     s   z._all_to_all_single_any_qkv_async.<locals>.wait)r.   ra   rZ   rg   ri   rW   rI   rh   r"   rm   r8   rn   rQ   r'   r   r<   r,   rp   rq   rJ   rr   rs   r   rL   r$   r   r   r   r     s   
$

	r   c           
   	      s   | dd}t\}t|\j}|\ }dd tjddD }|| dddd	 g }t	||d
tj
f fdd}	|	S )rt   r`   Nc                 S   rx   r3   ry   r;   or   r   r   r?     rz   z2_all_to_all_single_any_o_async.<locals>.<listcomp>r4   r{   r   r_   rd   r   c                      sT   t  ddddd   tS Nr_   r4   r   rd   re   )r&   rg   ri   rW   r]   r   rp   rr   rs   rX   rq   r'   r,   rQ   r   r   r$      s   z,_all_to_all_single_any_o_async.<locals>.wait)ru   r.   r\   ra   r8   r   ri   rW   r"   rm   rn   
rQ   r'   r   rJ   r-   ra   rw   r   rL   r$   r   r   r   r     s   
"	r   c                    s   t \}}j\}}}}t|\ |  | }	||||	||g| }
t|}tddt||
dt	j
f fdd}|S )rc   r   r4   r   c                      s    t tt S r    )r&   r
   r[   r   r   r   r   r$   )  s   z2_all_to_all_single_any_qkv_fp8_async.<locals>.wait)r.   ra   rZ   rg   rI   r	   rh   r"   rm   r8   rn   r   r   r   r   r     s   


r   c           
   	      s   | dd}t\}t|\j}t|\ }dd tjddD }|| dddd	 g }t	
||d
tjf fdd}	|	S )rt   r`   Nc                 S   rx   r3   ry   r   r   r   r   r?   I  rz   z6_all_to_all_single_any_o_fp8_async.<locals>.<listcomp>r4   r{   r   r_   rd   r   c                      s\   t t ddddd   tS r   )r&   r   rg   ri   rW   r]   r   r   r   r   r$   Q  s   z0_all_to_all_single_any_o_fp8_async.<locals>.wait)ru   r.   r\   ra   r   r8   r   ri   rW   r"   rm   rn   r   r   r   r   r   3  s   
"
r   fp8c                 C   t   ddl m} ddl m} ddl m} | d uo|  }| r%| r#|s#tS tS | r3|s3| r1J dtS | r8tS tS Nr4   )is_ulysses_float8_enabled)is_ulysses_anything_enabled)is_ulysses_heads_no_paddingz?FP8 and ulysses heads no padding both enabled is not supported.)	_templated_ulyssesr   r   r   r   r   r   r   r   r   r   r   r   _force_disable_float8r   r   r   r   d  "   

r   c                 C   r   r   )	r   r   r   r   r   r   r   r   r   r   r   r   r   r   {  r   r   r    )<	functoolstypingr   r   r   r   r8   torch.distributeddistributedr)   )torch.distributed._functional_collectives_functional_collectivesr"   torch.nn.functionalnn
functionalrU   cache_dit.platformsr   cache_dit.kernelsr   r   r	   r
   ImportErrorr   cache_dit.loggerr   __name__logger__all__rn   r&   ProcessGroupintr.   	lru_cacherI   rP   rZ   r[   r\   r]   dictr   r   r   r   r   r   r   compilerallow_in_graphr   r   r   r   boolr   r   r   r   r   r   <module>   sB   









 
$
%
)

('(&1
