o
    پi                     @   s  d dl Z d dlmZmZ d dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ e eZdZdae rMz
d dlmZ eZW n eyL   ed Y nw G dd dZe a	d#dededefddZ					d$dejdejdejdededee dededeejejf fddZeg ded						d%dejdejdejdededee dededeejejf fdd Zd!d" ZdS )&    N)OptionalTuple)$get_tensor_model_parallel_world_size)is_flashinfer_available)register_custom_opzIflashinfer.comm is not available, falling back to standard implementationc                   @   s@   e Zd Zdd Z		ddedededed	ef
d
dZdd ZdS )FlashInferWorkspaceManagerc                 C   s"   d | _ d | _d | _d | _d| _d S NF)workspace_tensoripc_handles
world_sizerankinitialized)self r   \/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/layers/flashinfer_comm_fusion.py__init__   s
   
z#FlashInferWorkspaceManager.__init__NFr   r   max_token_num
hidden_dimuse_fp32_lamportc                 C   s|   | j r
| j|kr
dS tdu rtd dS |   tj||||||d\| _| _	|| _|| _
d| _ td| d|  dS )zInitialize workspaceNz@FlashInfer comm not available, skipping workspace initialization)groupr   Tz*FlashInfer workspace initialized for rank z, world_size )r   r   _flashinfer_commloggerwarningcleanupcomm1trtllm_create_ipc_workspace_for_all_reduce_fusionr
   r	   r   info)r   r   r   r   r   r   r   r   r   r   
initialize$   s2   

z%FlashInferWorkspaceManager.initializec              
   C   s   | j rR| jdurTz?ztj| jtjjd W n ty/ } zt	d|  W Y d}~nd}~ww W d| _
d| _d| _ dS W d| _
d| _d| _ dS d| _
d| _d| _ w dS dS )zClean up workspaceN)r   z(Failed to cleanup FlashInfer workspace: F)r   r
   r   +trtllm_destroy_ipc_workspace_for_all_reducedistr   WORLD	Exceptionr   r   r	   )r   er   r   r   r   M   s*   



z"FlashInferWorkspaceManager.cleanupr   )__name__
__module____qualname__r   intboolr   r   r   r   r   r   r      s     
)r         Fr   r   r   c                 C   sV   t  rtdu r	dS t }|dkrdS t }tjrtj|kr(tj||| ||d tjS )zEnsure workspace is initializedNF   )r   r   r   r   r   )	r   r   r   r   get_rank_workspace_managerr   r   r   )r   r   r   r   r   r   r   r   ensure_workspace_initialized_   s"   
r-   ư> @  input_tensorresidualweightepsuse_oneshottrigger_completion_at_endfp32_accreturnc           
      C   s   t |}t | }	|	|fS N)torch
empty_like)
r0   r1   r2   r3   r   r4   r5   r6   residual_outnorm_outr   r   r   *fake_flashinfer_allreduce_residual_rmsnorm{   s   


r=   )r0   r1   r2   )mutates_args	fake_implc                 C   s.  t  rtdu rtd dS t }|dkrtd dS | jd |ks%J t|| jd | jtj	kds:td	 dS | j\}	}
t
|}t
| }tjd i d
| d|dt d|	d|
dtjddd|d|d|dtjjddd|d|d|ddddd|d|dddd ||fS )!a  
    Use FlashInfer's fused allreduce + residual + RMS norm operation

    Args:
        input_tensor: Input tensor that needs allreduce
        residual: Residual tensor
        weight: RMS norm weight
        eps: RMS norm epsilon
        max_token_num: Maximum token number
        use_oneshot: Whether to use oneshot mode
        trigger_completion_at_end: Whether to trigger completion at end
        fp32_acc: Whether to use fp32 precision

    Returns:
        Tuple[torch.Tensor, torch.Tensor]: (norm_output, residual_output)
    NzAFlashInfer not available, falling back to standard implementation)NNr*   z(Single GPU, no need for allreduce fusionr   )r   r   r   z"FlashInfer workspace not availableallreduce_inr   
world_rank	token_numr   workspace_ptrslaunch_with_pdlTr4   r5   r6   pattern_codeallreduce_outresidual_inr;   r<   	quant_out	scale_out	rms_gammarms_epsscale_factorlayout_coder   )r   r   r   debugr   shaper-   dtyper9   float32r:   trtllm_allreduce_fusionr   r+   r,   r	   AllReduceFusionPatternkARResidualRMSNorm)r0   r1   r2   r3   r   r4   r5   r6   r   rC   r   r;   r<   r   r   r   %flashinfer_allreduce_residual_rmsnorm   s   






	
rV   c                   C   s   t d ur
t   d S d S r8   )r,   r   r   r   r   r   cleanup_flashinfer_workspace   s   rW   )r(   r)   F)r.   r/   NFF)r.   r(   NFF) loggingtypingr   r   r9   torch.distributeddistributedr   sglang.srt.distributedr   sglang.srt.utilsr   sglang.srt.utils.custom_opr   	getLoggerr#   r   r   r,   flashinfer.commr   ImportErrorr   r   r&   r'   r-   Tensorfloatr=   rV   rW   r   r   r   r   <module>   s    
@
 	
	O