o
    پi7                     @   s  d Z ddlmZ ddlmZmZ ddlZddlmZ ddl	m
Z
 ddlmZ er<ddlmZ dd	lmZmZ dd
lmZ z
ddlmZ dZW n eyQ   dZY nw eG dd dZdddedee fddZejde ddejdedejfddZG dd de
ZdS )a  
KT Expert Parallelism Wrapper for MoE layers.

This module provides a generic wrapper that enables CPU-GPU expert parallelism
for any MoE quantization method. It coordinates parallel execution of GPU experts
(using any quantization method) and CPU experts (using AMX/AVX instructions).
    )	dataclass)TYPE_CHECKINGOptionalN)get_tensor_model_parallel_rank)FusedMoEMethodBase)get_compiler_backend)MoeRunnerConfig)CombineInputStandardDispatchOutput)
ServerArgs)KTMoEWrapperTFc                   @   sb   e Zd ZU dZeed< eed< eed< eed< eed< eed< eed< eed	< d
Zee ed< d
S )KTConfiga"  Configuration for KTransformers heterogeneous computing CPU part.

    Args:
        layer_idx: Layer index in the model
        num_gpu_experts: Number of experts to run on GPU
        cpuinfer_threads: Number of CPU inference threads
        threadpool_count: Number of thread pools for CPU computation
        weight_path: Path to CPU quantized weights
        chunked_prefill_size: Chunk size for prefill computation
        method: CPU computation method (e.g., "int4")
        num_layers: Total number of layers in the model (optional)
    	layer_idxnum_gpu_expertscpuinfer_threadsthreadpool_countweight_pathchunked_prefill_sizemax_deferred_experts_per_tokenmethodN
num_layers)	__name__
__module____qualname____doc__int__annotations__strr   r    r   r   W/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/layers/moe/kt_ep_wrapper.pyr   #   s   
 r   server_argsr   r   returnc                 C   sf   | j du rdS d}z|  }t|dd}W n	 ty   Y nw t|| j| j| j| j | j| j	| j
|d	S )zCreate KTConfig from ServerArgs if KT is configured.

    Args:
        server_args: Global server arguments
        layer_idx: Layer index in the model

    Returns:
        KTConfig if KT is configured, None otherwise
    Nnum_hidden_layers)	r   r   r   r   r   r   r   r   r   )kt_weight_pathget_hf_configgetattr	Exceptionr   kt_num_gpu_expertskt_cpuinferkt_threadpool_countr   	kt_method!kt_max_deferred_experts_per_token)r    r   r   	hf_configr   r   r   !create_kt_config_from_server_args=   s(   
r-   )dynamicbackendtopk_idsr   c                 C   s   d| | |k< | S )a  Mask CPU expert IDs by setting them to -1.

    This function masks expert IDs that should be computed on CPU (IDs >= num_gpu_experts)
    so they won't be computed on GPU. The masked IDs are set to -1, which causes the
    GPU MoE kernel to skip those experts.

    Args:
        topk_ids: Tensor of shape [num_tokens, top_k] containing expert IDs
        num_gpu_experts: Number of experts that should run on GPU (experts 0 to num_gpu_experts-1)

    Returns:
        Modified topk_ids tensor with CPU expert IDs masked as -1
    r   )r0   r   r   r   r   mask_cpu_expert_idsb   s   r2   c                
   @   s   e Zd ZdZdedefddZdejj	de
de
d	e
d
ejf
ddZdejj	ddfddZdejj	ddfddZdejj	ddddfddZdejdejfddZdejj	ddddfddZdefd d!ZdS )"KTEPWrapperMethoda  Wrapper for any MoE quantization method to enable CPU-GPU expert parallelism.

    This wrapper coordinates parallel execution of:
    - GPU experts (0 to num_gpu_experts-1) using any quantization method
    - CPU experts (num_gpu_experts to total_experts-1) using AMX/AVX instructions

    The wrapper implements the submit-compute-sync pattern:
    1. Submit CPU expert computation (non-blocking)
    2. Execute GPU expert computation in parallel
    3. Synchronize and merge CPU+GPU results

    Example:
        # Wrap any GPU method with AMX/AVX CPU expert support
        gpu_method = CompressedTensorsWNA16MoE(quant_config, prefix)
        kt_config = KTConfig(layer_idx=0, num_gpu_experts=4, ...)
        method = KTEPWrapperMethod(gpu_method, kt_config)
    
gpu_method	kt_configc                 C   sH   t std|| _|| _|j| _d| _| j| j_t | _d| _d| _	dS )zInitialize the KT EP wrapper.

        Args:
            gpu_method: The quantization method to use for GPU experts
            kt_config: Configuration for KT CPU expert computation
        zVkt_kernel is not installed. To use KTransformers EP wrapper, please install kt_kernel.TN)
KTRANSFORMERS_AVAILABLEImportErrorr4   r5   r   override_num_local_expertsr   tp_rankwrapper_layer_params)selfr4   r5   r   r   r   __init__   s   

zKTEPWrapperMethod.__init__layernum_expertshidden_sizeintermediate_size_per_partitionparams_dtypec           
      K   s   || _ || _|| _|j}|j|j }| jjpd}	| jjdur0| jjdur0| jj| jjd kr0d}	| j	j
d|| j|||d| | jdkret| jj||||| j| jj| jj| jj| jj| jj|	d| _dS dS )a  Create weights for both GPU and CPU experts.

        Args:
            layer: The MoE layer module
            num_experts: Total number of experts (GPU + CPU)
            hidden_size: Hidden dimension size
            intermediate_size_per_partition: Intermediate size per TP partition
            params_dtype: Data type for parameters
            **extra_weight_attrs: Additional weight attributes
        r   N   )r>   r?   r@   rA   rB   )r   r?   num_experts_per_tokr@   moe_intermediate_sizer   r   r   r   r   r   r   r   )global_num_expertsr@   rA   top_kmoe_tp_sizer5   r   r   r   r4   create_weightsr   r9   r   r   r   r   r   r   r:   )
r<   r>   r?   r@   rA   rB   extra_weight_attrsrD   intermediate_size_fulllayer_max_deferredr   r   r   rI      sH   

z KTEPWrapperMethod.create_weightsr!   Nc                 C   sn   t | jdr| j| | jdkr3| jdur5tj  ddlm	} | j
| jj  }| j| dS dS dS )znProcess weights after loading from checkpoint.

        Args:
            layer: The MoE layer module
        process_weights_after_loadingr   N)#get_global_expert_location_metadata)hasattrr4   rM   r9   r:   torchcudasynchronize(sglang.srt.eplb.expert_location_dispatchrN   physical_to_logical_map_cpur5   r   
contiguousload_weights)r<   r>   rN   rT   r   r   r   rM      s   
z/KTEPWrapperMethod.process_weights_after_loadingmoe_runner_configr   c                 C   s&   || _ | jr
| j|_| j|| dS )zCreate MoE runner for computation.

        Args:
            layer: The MoE layer module
            moe_runner_config: Configuration for MoE runner
        N)rW   r8   r   num_local_expertsr4   create_moe_runner)r<   r>   rW   r   r   r   rY     s   	z#KTEPWrapperMethod.create_moe_runnerdispatch_outputr
   c                 C   sd   | j jdks
J d| jdks| jdu rdS |j}|j}|\}}}| j|||tj	|j
j dS )a_  Submit CPU expert computation asynchronously (non-blocking).

        This method submits the CPU expert computation to AMX/AVX without waiting
        for completion, allowing GPU computation to proceed in parallel.

        Args:
            layer: The MoE layer module
            dispatch_output: Dispatched tokens and routing information
        siluz"Only SiLU activation is supported.r   N)rW   
activationr9   r:   hidden_statestopk_outputsubmit_forwardrP   rQ   current_streamdevicecuda_stream)r<   r>   rZ   xr^   topk_weightsr0   _r   r   r   submit  s   
zKTEPWrapperMethod.submitrc   c                 C   s8   | j dks
| jdu rt|S | j|tj|jjS )a&  Synchronize and retrieve CPU expert computation results.

        This method waits for the CPU computation to complete and returns the results.

        Args:
            x: Reference tensor for shape and device information

        Returns:
            CPU expert computation results
        r   N)	r9   r:   rP   
zeros_likesync_forwardrQ   r`   ra   rb   )r<   rc   r   r   r   sync1  s
   
zKTEPWrapperMethod.syncr	   c                 C   s   ddl m} |j}|j}| jdkr| || |j}t|| j}|j	|d}|j	|d}	| j
||	}
|
j}| jdkrD| |}|| }||dS )a  Execute hybrid CPU+GPU MoE forward pass with parallelism.

        This is the main computation method that coordinates:
        1. Submit CPU expert computation (non-blocking)
        2. Execute GPU expert computation in parallel
        3. Synchronize CPU results and merge with GPU results

        Args:
            layer: The MoE layer module
            dispatch_output: Dispatched tokens and routing information

        Returns:
            Combined computation results from CPU and GPU experts
        r   )StandardCombineInput)r0   )r^   )r]   )&sglang.srt.layers.moe.token_dispatcherrj   r]   r^   r9   rf   r0   r2   r   _replacer4   applyri   )r<   r>   rZ   rj   rc   r^   r0   masked_topk_idsmasked_topk_outputmasked_dispatch_outputgpu_combine_inputoutput
cpu_outputr   r   r   rm   D  s"   



zKTEPWrapperMethod.applynamec                 C   s0   |dv rt dt| j d| dt| j|S )a-  Delegate attribute access to the wrapped GPU method.

        This allows the wrapper to transparently expose attributes and methods
        from the wrapped GPU quantization method.

        Args:
            name: Attribute name

        Returns:
            Attribute value from gpu_method
        )r4   r:   r5   'z' object has no attribute ')AttributeErrortyper   r%   r4   )r<   rt   r   r   r   __getattr__w  s
   zKTEPWrapperMethod.__getattr__)r   r   r   r   r   r   r=   rP   nnModuler   dtyperI   rM   rY   rf   Tensorri   rm   r   rx   r   r   r   r   r3   u   sP    

E


3r3   )r   dataclassesr   typingr   r   rP   sglang.srt.distributedr   *sglang.srt.layers.quantization.base_configr   sglang.srt.utilsr   sglang.srt.layers.moer   rk   r	   r
   sglang.srt.server_argsr   	kt_kernelr   r6   r7   r   r   r-   compiler|   r2   r3   r   r   r   r   <module>   s:   
%