o
    پiy                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZmZmZmZmZ d dlZd dlmZ d dlZd dlZd dlmZmZ d dlmZmZmZm Z  d dl!m"Z"m#Z# d dl$m%Z% d d	l&m'Z' d d
l(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0 e1e2Z3G dd de4Z5ej6G dd dZ7ej6G dd dZ8ej6G dd dZ9G dd dZ:G dd deZ;G dd de Z<G dd deZ=G dd deZ>dS )     )annotationsN)defaultdict)DictListOptionalSetTuple)KVArgsKVPoll)CommonKVBootstrapServerCommonKVManagerCommonKVReceiverCommonKVSender)	FastQueuegroup_concurrent_contiguous)&check_mooncake_custom_mem_pool_enabled)DisaggregationMode)get_mooncake_transfer_engine)envs)
ServerArgs)format_tcp_addressis_valid_ipv6_addressc                      s&   e Zd Zd	 fddZdd Z  ZS )
KVTransferErrorbootstrap_roomintfailure_reasonstrc                   s   t  | || _|| _d S N)super__init__r   r   selfr   r   	__class__ [/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/mooncake/conn.pyr   +   s   
zKVTransferError.__init__c                 C  s   d| j  d| j S )NzKVTransferError(bootstrap_room=z): )r   r   r!   r$   r$   r%   __str__0   s   zKVTransferError.__str__r   r   r   r   )__name__
__module____qualname__r   r'   __classcell__r$   r$   r"   r%   r   *   s    r   c                   @  s>   e Zd ZU ded< ded< ded< ded< d	ed
< ded< dS )TransferKVChunkr   roomnpt.NDArray[np.int32]prefill_kv_indicessliceindex_sliceboolis_lastOptional[int]prefill_aux_indexOptional[List[int]]state_indicesN)r)   r*   r+   __annotations__r$   r$   r$   r%   r-   5   s   
 r-   c                   @  sd   e Zd ZU ded< ded< ded< ded< ded< ded	< d
ed< ded< ded< edddZdS )TransferInfor   r.   r   endpointdst_portmooncake_session_idr/   dst_kv_indicesdst_aux_index	List[int]dst_state_indicesrequired_dst_info_numr3   is_dummymsgList[bytes]c                 C  s   |d dkr|d dkrd}t jg t jd}d }g }n*t j|d t jd}t|d d}|d dkr7g }ntt j|d t jd}d}| t|d	 d|d
 dt|d d|d d|||t|d d|d	S )N          Tdtypeascii   Fr               )	r.   r;   r<   r=   r>   r?   rA   rB   rC   )nparrayint32
frombufferr   decodelist)clsrD   rC   r>   r?   rA   r$   r$   r%   from_zmqL   s,   zTransferInfo.from_zmqNrD   rE   r)   r*   r+   r9   classmethodrX   r$   r$   r$   r%   r:   @   s   
 r:   c                   @  s|   e Zd ZU ded< ded< ded< ded< ded< ded	< ded
< ded< ded< ded< ded< ded< edddZdS )KVArgsRegisterInfor   r.   r;   r   r<   r=   	list[int]dst_kv_ptrsdst_aux_ptrsdst_state_data_ptrsdst_tp_rankdst_attn_tp_sizedst_kv_item_lendst_state_item_lensdst_state_dim_per_tensorrD   rE   c                 C  sh  | t |d d|d dt|d d|d dttt|d d  d|d ttt|d	 d  d|d	 ttt|d
 d  d|d
 t|d dt|d dt|d dt|dkrt|d dkrttt|d d  d|d ng t|dkrt|d dkrttt|d d  d|d dS g dS )Nr   rK   rM   rN   rO   rF      QrH   rL   rP   	   
   I   )r.   r;   r<   r=   r^   r_   r`   ra   rb   rc   rd   re   )r   rU   r   rV   structunpacklen)rW   rD   r$   r$   r%   rX   y   s&   $$$&$zKVArgsRegisterInfo.from_zmqNrY   rZ   r$   r$   r$   r%   r\   i   s   
 r\   c                   @  s(   e Zd ZdZedd Zedd ZdS )AuxDataCodeczCHandles serialization and deserialization of auxiliary data buffersc                 C  s   t j| | }t|S )z*Serialize data from memory buffer to bytes)ctypesc_bytefrom_addressbytes)src_addrdata_lengthbufferr$   r$   r%   serialize_data_from_buffer   s   z'AuxDataCodec.serialize_data_from_bufferc                 C  sD   | j | }| j| }|||  }tjt| |}||dd< dS )z+Deserialize bytes into target memory bufferN)aux_data_ptrsaux_item_lensrp   rq   rn   rr   )kv_argsbuffer_index	aux_indexdatadst_aux_ptritem_lendst_addrrv   r$   r$   r%   deserialize_data_to_buffer   s   

z'AuxDataCodec.deserialize_data_to_bufferN)r)   r*   r+   __doc__staticmethodrw   r   r$   r$   r$   r%   ro      s    
ro   c                      s  e Zd ZdZ	drds fddZdd Zdd Zdd Zdtd d!Zdud%d&Z	dvd*d+Z
dwd0d1Zdwd2d3Zdxd;d<Zdyd?d@Z	Adzd{dFdGZd|dIdJZd}dMdNZd~dQdRZddUdVZdWdX ZdYdZ Z	A	AdddddeZddfdgZddidjZddldmZdndo Zdpdq Z  ZS )MooncakeKVManagers   AUX_DATAFargsr	   disaggregation_moder   server_argsr   is_mla_backendOptional[bool]c                   s  t  |||| |   |   | jtjkr|   tt	| _
t | _t | _t }tj d u rDttdt	d| d dtj  dd t D | _ ksbJ d d  d	 fd
dt D | _t| j| jD ]\}}tj| j||fdd  qvtj | _ t! \| _"| _#n:| jtj$kri | _%tt&j'| _(t | _)tt| _*tt| _+ttj, d| _-ttj. d| _/| 0  tj1 | _2i | _3t | _4d S )NrF   g      ?rf      c                 S  s   g | ]}t  qS r$   )r   .0_r$   r$   r%   
<listcomp>   s    z.MooncakeKVManager.__init__.<locals>.<listcomp>z@The environment variable SGLANG_DISAGGREGATION_THREAD_POOL_SIZE=zC must be greater than or equal to SGLANG_DISAGGREGATION_QUEUE_SIZE=.c                   s   g | ]
}t j  qS r$   )
concurrentfuturesThreadPoolExecutorr   transfer_queue_sizetransfer_thread_pool_sizer$   r%   r      s    T)targetr   daemong       @rM   )5r   r   init_engineregister_buffer_to_enginer   r   PREFILLstart_prefill_threadr   r   session_failuressetfailed_sessions	threadingLocksession_lockos	cpu_countr   &SGLANG_DISAGGREGATION_THREAD_POOL_SIZEgetminmax SGLANG_DISAGGREGATION_QUEUE_SIZErangetransfer_queues	executorszipThreadtransfer_workerstart'SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUTbootstrap_timeoutr   enable_custom_mem_poolcustom_mem_pool_typeDECODEheartbeat_failuresrequestsSessionsession_poolsession_pool_lockaddr_to_rooms_trackerprefill_response_tracker(SGLANG_DISAGGREGATION_HEARTBEAT_INTERVALheartbeat_interval+SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILUREmax_failuresstart_decode_thread%SGLANG_DISAGGREGATION_WAITING_TIMEOUTwaiting_timeoutfailure_recordsfailure_lock)r!   r   r   r   r   r   queueexecutorr"   r   r%   r      sb   








zMooncakeKVManager.__init__c                 C  s   t  | _d S r   )r   enginer&   r$   r$   r%   r      s   zMooncakeKVManager.init_enginec                 C  s~   | j jr| j jr| j| j j| j j | j jr&| j jr&| j| j j| j j | j jr;| j jr=| j| j j| j j d S d S d S r   )	rz   kv_data_ptrskv_data_lensr   batch_registerrx   aux_data_lensstate_data_ptrsstate_data_lensr&   r$   r$   r%   r      s   z+MooncakeKVManager.register_buffer_to_enginec                 C  s4   |sdS t | \}}}| j|t|t|t|S Nr   )r   r   batch_transfer_syncrV   )r!   r=   transfer_blocks	src_addrs	dst_addrslengthsr$   r$   r%   _transfer_data  s   z MooncakeKVManager._transfer_datar=   r   src_data_ptrsr]   dst_data_ptrs	item_lensprefill_data_indicesr/   dst_data_indicesr   %concurrent.futures.ThreadPoolExecutorreturnr   c                   s^  t ||\d}
jr#
||\fddtD }n;
||\ t krCtd dt   dS  fddtD fddtD  }|dusdJ dfddd
fdd	d
fdd}	
jr	fdd|D }
t	j
|
D ]}| }|dkr|
D ]}|  q|  S qdS |	|S )z
        Generic KV cache transfer supporting both MHA and MLA architectures.
        This method is used by both send_kvcache (full pool) and maybe_send_extra.
        Nc                   "   g | ]}|  | | fqS r$   r$   r   layer_id)r^   r   src_kv_ptrsr$   r%   r   *      z;MooncakeKVManager._send_kvcache_generic.<locals>.<listcomp>zaPrefill transfer kvcache error, layers_current_pp_stage is out of range: layers_current_pp_stage=z, len(dst_k_ptrs)=c                   r   r$   r$   r   )
dst_k_ptrsr   
src_k_ptrsr$   r%   r   >  r   c                   s&   g | ]}|  | |  fqS r$   r$   r   )
dst_v_ptrsr   layers_current_pp_stage
src_v_ptrsr$   r%   r   E  s    
src_ptrr   dst_ptrr   r   List[Tuple[int, int, int]]c           	        s`   g }t  D ]&\}}| t|d |  }|t|d |  }|t| }||||f q|S r   )r   r   rn   append)	r   r   r   r   prefill_indexdecode_indexrt   r   length)dst_kv_blocksprefill_kv_blocksr$   r%   set_transfer_blocksO  s   zDMooncakeKVManager._send_kvcache_generic.<locals>.set_transfer_blocksc                   s   | ||}  |S r   )r   )r   r   r   r   r=   r!   r   r$   r%   process_layer[  s   z>MooncakeKVManager._send_kvcache_generic.<locals>.process_layerlayers_paramsc                   s2   g }| D ]\}}}| ||| q |S r   )extendr   )r   r   r   r   r   r   r$   r%   process_layers`  s   z?MooncakeKVManager._send_kvcache_generic.<locals>.process_layersc                   s"   g | ]\}}}  |||qS r$   )submit)r   r   r   r   )r   r   r$   r%   r   g  s    r   )r   r   r   r   r   r   r   r   )r   r   r   r   r   r   r   r   )r   r   r   r   )r   r   get_mla_kv_ptrs_with_ppr   get_mha_kv_ptrs_with_pprn   loggererrorr   r   r   as_completedresultcancel)r!   r=   r   r   r   r   r   r   r   r   r   futurestatusfr$   )r   r   r^   r   r   r   r   r=   r   r   r!   r   r   r   r   r%   _send_kvcache_generic  sZ   


	
z'MooncakeKVManager._send_kvcache_genericr0   r^   r>   c              	   C  s    | j || jj|| jj|||dS )Nr=   r   r   r   r   r   r   )r  rz   r   kv_item_lens)r!   r=   r0   r^   r>   r   r$   r$   r%   send_kvcache|  s   zMooncakeKVManager.send_kvcachera   rb   rc   c	           #   	     s   j jj }	j jd || }
j j}j j}|}|j | }| | }j|kr6d}|}|	| }n
|
| | }|}d}j j|\}}}}}|| }|| }|| | krrt	d d d|  d dS |
ddtj|
ddtj tj|tjd
dd}| }| }|| | || |  f	d	d
}g }t|D ]}||||| ||  qt|D ]}||||| ||  qtj|D ]} |  }!|!dkr|D ]}"|"  q|!  S qdS )a  
        Sends KV cache slices from this Prefill rank to a target Decode rank,
        supporting generic M-to-N TP size configurations.

        NOTE: This implementation calls the transfer engine for each token slot within
        each page to ensure correctness for any page_size and head-slicing configuration.
        This may introduce performance overhead (increased TTFT) for long sequences.
        r   [z] slice size (z") exceeds target token slot size ()r   rM   rI   c           
        sp   |   }|   }| }| }| d }|sdS | d }t|}g| }	j|||	S )Nr   r   )reshapetolistrn   r   r   )
src_layer_ptrdst_layer_ptrsrc_page_base_addrsdst_page_base_addrssrc_slice_addrsdst_slice_addrssrc_addr_listdst_addr_listtotal_sliceslength_list	decode_page_indicesrc   dst_token_slot_offsetsheads_bytes_per_token_to_sendr=   prefill_page_indicesr!   src_kv_item_lensrc_token_slot_offsetsr$   r%   process_layer_tp_aware  s   
zDMooncakeKVManager.send_kvcache_slice.<locals>.process_layer_tp_aware)rz   engine_rankattn_tp_sizer  kv_head_num	page_sizer   r   r   r   r  astyperQ   int64aranger   r   r   r   r   r   r   r   )#r!   r=   r0   r^   r>   ra   rb   rc   r   local_tp_rank_in_groupdst_tp_rank_in_groupnum_kv_headsr  src_heads_per_rankdst_heads_per_rankbytes_per_head_slice_to_sendsrc_head_start_offsetnum_heads_to_senddst_head_start_offsetr   r   r   r   r   src_head_slice_offsetdst_head_slice_offsettokens_per_pagebytes_per_token_on_prefillbytes_per_token_on_decoder  r   ir   r   r   r$   r  r%   send_kvcache_slice  sr   





z$MooncakeKVManager.send_kvcache_slicereqr:   r6   r_   c                 C  s   | j r| jdkstj r| |||S g }| jj}| jj}t	|D ]!\}}|| }	|| |	|  }
|| |	|j
  }||
||	f q"| |j|S )NNVLINK)r   r   r   SGLANG_MOONCAKE_SEND_AUX_TCPr   send_aux_tcprz   rx   ry   	enumerater?   r   r   r=   )r!   r2  r6   r_   r   prefill_aux_ptrsprefill_aux_item_lensr0  r~   r   rt   r   r$   r$   r%   send_aux  s   
zMooncakeKVManager.send_auxc           
   	   C  sh   | j j}| j j}tt|D ]#}|| }|| ||  }t||}	| j|j|j	|j
||j|	d qdS )N)remoter<   r.   r{   r|   r}   r   )rz   rx   ry   r   rn   ro   rw   send_aux_data_to_endpointr;   r<   r.   r?   )
r!   r2  r6   r_   r7  r8  r0  r   rt   r}   r$   r$   r%   r5    s   	zMooncakeKVManager.send_aux_tcpr:  r<   r.   r{   r|   r}   rs   c                 C  s^   | j t||t|d}|tjt|dt|dt|dt	dt
||g d S )Nis_ipv6rK   >I)_connectr   r   send_multipartr   AUX_DATA_HEADERr   encoderl   packrn   )r!   r:  r<   r.   r{   r|   r}   socketr$   r$   r%   r;  0  s   	z+MooncakeKVManager.send_aux_data_to_endpointrD   rE   c                 C  s   t |d d}t |d d}t |d d}td|d d }|d }t||kr9td	|  d
S t| j	||| t
d| dt|  d
S )z7Handle AUX_DATA messages received by the decode thread.rM   rK   rN   rO   r>  rF   r   rH   z,AUX_DATA length mismatch for bootstrap_room Nz%Received AUX_DATA for bootstrap_room z with length:)r   rU   rl   rm   rn   r   r   ro   r   rz   debug)r!   rD   r.   r{   r|   ru   r}   r$   r$   r%   _handle_aux_dataH  s   
z"MooncakeKVManager._handle_aux_dataNprefill_state_indicesr`   target_rank_registration_infoOptional[KVArgsRegisterInfo]c              	   C  s  t | jdd}|dkr+|dur$| j|jkr$| ||||j|j|j|jS | |||S |dv r|durF| j	sF| j|jkrFt
d|  dt|t|jk rhtdt| d	t|j  |dt|j }tj|tjd
}tj|jtjd
}| j|j| jj|| jj|||dS dS )z:Send state or extra pool data with type-specific handling.
state_typenonemambaN)swansazEPD Disaggregation does NOT support PD different TP sizes for non-MLA z hybrid models yet.zlen(prefill_state_indices) = z, len(dst_state_indices) = rI   r  r   )getattrrz   r  rb   _send_mamba_state_slicerd   re   ra   _send_mamba_stater   RuntimeErrorupperrn   rA   r   warningrQ   rR   rS   r  r=   r   state_item_lens)r!   r2  rG  r`   r   rH  rJ  rA   r$   r$   r%   maybe_send_extra\  sZ   	

z"MooncakeKVManager.maybe_send_extraprefill_mamba_indexc                 C  s   t |dks
J dg }| jj}| jj}t|D ]'\}}|| }	|| |	t|d   }
||	t|jd   }||
||	f q| |j	|S )zTransfer Mamba states.rM   $Mamba should have single state indexr   )
rn   rz   r   rU  r6  r   rA   r   r   r=   )r!   r2  rW  r`   r   prefill_state_data_ptrsprefill_state_item_lensr0  dst_state_ptrr   rt   r   r$   r$   r%   rQ    s   z#MooncakeKVManager._send_mamba_staterd   re   c                 C  sZ  t d| j d| d t|dksJ dg }| jj}	| jj}
t| jdg }|r,|s3| |||S | jj	| j }|| }t
|D ]c\}}|
| }|| }|| }|| }|| }|| }| j|krld}|}|| }n
|| | }|}d}|| }|| }|| }|	| |t|d   | }||t|jd   | }||||f qB| |j|S )a}  Transfer Mamba states with TP slice support.

        Mamba state layout:
        - conv_state: [num_layers, size+1, conv_dim/tp, conv_kernel-1]
        - temporal_state: [num_layers, size+1, num_heads/tp, head_dim, state_size]

        The 3rd dimension is sliced by TP. When prefill and decode have different
        attn_tp_size, we need to slice the state accordingly.
        ziUsing Mamba state slice transfer for different TP sizes between prefill and decode. Prefill attn_tp_size=z, Decode attn_tp_size=z. Performance may be affected.rM   rX  state_dim_per_tensorr   )r   warning_oncer  rn   rz   r   rU  rO  rQ  r  r6  r   rA   r   r   r=   )r!   r2  rW  r`   rd   re   ra   rb   r   rY  rZ  src_state_dim_per_tensorr"  r#  r0  r[  src_item_lendst_item_lensrc_dimdst_dimsrc_bytes_per_dimdst_bytes_per_dimsrc_dim_startnum_dims_to_senddst_dim_startsrc_dim_offsetdst_dim_offsetbytes_to_sendrt   r   r$   r$   r%   rP    s^   

z)MooncakeKVManager._send_mamba_state_slicer   prefill_rankc                 C  sF   | j t||t|dt|dt|dt|dg d S )Nr<  rK   )r?  r   r   r@  r   rB  )r!   r:  r<   r.   r   rk  r$   r$   r%   sync_status_to_decode_endpoint  s   z0MooncakeKVManager.sync_status_to_decode_endpointr   r   c                 C  s  	 z|  }|j| jv r| j|j  ng }g }g }| j| j | j }|D ]{}|js| j9 |j	| j
v rb| |jd|j	 d | |jtj | |j|j|jtj| 	 W d     nAW d    n1 slw   Y  |j|j }	t|	t|jk rtdt|	 dt|j  |jd t|	 |_| j|j	 }
| js| j|
jkr| |j	|j|
j|	|}n| |j	|j|
j|	|
j|
j|
j |}|dkr-| j+ | j!|j	  d7  < | j!|j	 dkr| j
"|j	 t#d|j	 d	 W d    n1 sw   Y  | |jd
|j d|j d|j  | |jtj | |j|j|jtj|  nv|j$r|j%d urB| &||j%|
j'||
 | (||j)|
j*}|+|dkrTdnd |+|j|j|jf t||j,krt-|rrtj.ntj}| |j| |D ]\}}}| ||||| q~q&|j$r|j| j/v r| |jtj. q&|j| j/vs| 0|jtj.kr|j| jv r| j1|j W n t2y } zt3d| d| j4 dd }~ww q)NTz7Decode instance could be dead, remote mooncake session z is not alivezlen(chunked_dst_kv_indice) = z%, len(kv_chunk.prefill_kv_indices) = r   rM   zSession z failed.zFailed to send kv chunk of z to :Fz"Transfer thread failed because of z'. Prefill instance with bootstrap_port=z	 is dead.)5r   r.   transfer_infosvaluesattn_tp_rankpp_sizepp_rankrC   r   r=   r   record_failureupdate_statusr
   Failedrl  r;   r<   r>   r2   rn   r0   r   rT  decode_kv_args_tabler   r  rb   r  r^   r1  ra   rc   r   addr   r4   r8   rV  r`   r9  r6   r_   r   rB   allSuccessrequest_statuscheck_statuspop	ExceptionrR  bootstrap_port)r!   r   r   kv_chunkreqs_to_be_processedpollsdst_ranks_infos
local_rankr2  chunked_dst_kv_indicerH  retr   r;   r<   r.   er$   r$   r%   r     s   




	
 z!MooncakeKVManager.transfer_workerc                   s     fdd}t j|d  d S )Nc                    s  	  j  } | d d}| d d}|dkrQt|  j|<  j | jv r/ j| | j	v r8 j	|= W d   n1 sBw   Y  t
d| d q t| d	 d}t|}| jvrhi  j|< t|  j| |< t j| |kr |tj q)
z?This thread recvs pre-alloc notification from the decode engineTr   rK   rO   NoneNzRegister KVArgs from z successfullyrP   )server_socketrecv_multipartrU   r\   rX   rv  r   r   remover   r   rE  r   rn  r:   rn   rt  r
   WaitingForInput)waiting_req_bytesr.   r=   rB   r&   r$   r%   bootstrap_thread  s6   





z@MooncakeKVManager.start_prefill_thread.<locals>.bootstrap_threadr   r   r   r   )r!   r  r$   r&   r%   r     s   !z&MooncakeKVManager.start_prefill_threadc                   s<    fdd} fdd}t j|d  t j|d  d S )Nc                    s   	  j  } | d tjkr |  q | \}}}t|d}t|d}t|d}|tjkrW| j	v rV j
| |  j| }t j
| }||krV |tj n|tjkrh |d  || q)NTr   rK   z=Failed to get kvcache from prefill instance, it might be dead)r  r  r   rA  rF  r   rU   r
   ry  rz  r   rw  #required_prefill_response_num_tablern   rt  ru  rs  )rD   r   r   rk  expected_response_numarrived_response_numr&   r$   r%   decode_thread  s6   





z<MooncakeKVManager.start_decode_thread.<locals>.decode_threadc               	     s  	 t  j  j t j } W d    n1 sw   Y  | D ]}d }z} j  j| }W d    n1 s;w   Y  |j	d| ddddid}|j
dkrrd	 j|<  j|  }|D ]}| jvrp j| | qan1td
| d  j	|d	d  j|<  j | jv r j|= W d    n1 sw   Y  W n ty   td
| d  j	|d	d  j|< Y nw  j	|d	 jkr |  j | jv r߈ j|= W d    n1 sw   Y  q#q)NTzhttp://z/health)rN   rO   
Connectionz
keep-alive)timeoutheaders   r   zAttempting to reconnect to z...rM   )timesleepr   connection_lockrV   prefill_dp_size_tablekeysr   r   r   status_coder   r   copyrz  discardr   infor}  r   _handle_node_failure)	addressesbootstrap_addrsessionresponsecurrent_roomsr   r&   r$   r%   heartbeat_checker  sr   








z@MooncakeKVManager.start_decode_thread.<locals>.heartbeat_checkerr  r  )r!   r  r  r$   r&   r%   r     s   6z%MooncakeKVManager.start_decode_threadr   
kv_indicesr2   r1   r4   r3   r5   r8   r7   c           
   
   C  s   | j tjksJ |r|r|d usJ || jvs| |tjkr'td| d S || j	vr.d S | j	| 
 }tdd |D }|t| j }	| j|	 t||||||d d S )Nz-Request with bootstrap_room=%s already failedc                 s  s$    | ]}t |d dd V  qdS )rm  rM   N)r   rsplit)r   r  r$   r$   r%   	<genexpr><  s   " z9MooncakeKVManager.add_transfer_request.<locals>.<genexpr>)r.   r0   r2   r4   r6   r8   )r   r   r   rz  r{  r
   ru  r   rE  rn  r  sumrn   r   putr-   )
r!   r   r  r2   r4   r|   r8   	dst_infossession_port_sum	shard_idxr$   r$   r%   add_transfer_request  s.   	


z&MooncakeKVManager.add_transfer_requestc                 C  s
   | j | S r   )rz  )r!   r   r$   r$   r%   r{  J     
zMooncakeKVManager.check_statusr
   c                 C  sL   || j vr|| j |< d S |tjkrtj| j |< d S t| j | || j |< d S r   )rz  r
   ru  r   )r!   r   r   r$   r$   r%   rt  M  s   


zMooncakeKVManager.update_statusr   c                 C  s6   | j  || j|< W d    d S 1 sw   Y  d S r   )r   r   r    r$   r$   r%   rs  Y  s   "z MooncakeKVManager.record_failurec                 C  s
   | j  S r   )r   get_session_idr&   r$   r$   r%   r  ]  r  z MooncakeKVManager.get_session_idc                   s  | j F  fdd| jD }|D ]}| j|= q | jv r | j =  | jv r)| j =  | jv r2| j = | j g } | jv rB| j = W d    n1 sLw   Y  g }|D ]%}|| jv rz| |t	j
krz| |d  d | |t	j || qUtd  dt| d d S )Nc                   s   g | ]	}|  r|qS r$   )
startswith)r   kfailed_bootstrap_addrr$   r%   r   b  s
    
z:MooncakeKVManager._handle_node_failure.<locals>.<listcomp>z9Losing connection with prefill instance (bootstrap_addr: r  z), z requests affected)r  connection_poolprefill_attn_tp_size_tabler  prefill_pp_size_tabler   r   rz  r{  r
   ry  rs  rt  ru  r   r   r   rn   )r!   r  keys_to_remover  possible_affected_roomsaffected_roomsr.   r$   r  r%   r  `  sB   








z&MooncakeKVManager._handle_node_failure)F)r   r	   r   r   r   r   r   r   )r=   r   r   r]   r   r]   r   r]   r   r/   r   r/   r   r   r   r   )
r=   r   r0   r/   r^   r]   r>   r/   r   r   )r=   r   r0   r/   r^   r]   r>   r/   ra   r   rb   r   rc   r   r   r   )r2  r:   r6   r   r_   r]   )r:  r   r<   r   r.   r   r{   r   r|   r   r}   rs   rY   r   )
r2  r:   rG  r]   r`   r]   r   r   rH  rI  )r2  r:   rW  r]   r`   r]   )r2  r:   rW  r]   r`   r]   rd   r]   re   r]   ra   r   rb   r   )
r:  r   r<   r   r.   r   r   r   rk  r   )r   r   r   r   NN)r   r   r  r/   r2   r1   r4   r3   r|   r5   r8   r7   )r   r   )r   r   r   r
   r(   )r)   r*   r+   rA  r   r   r   r   r  r  r1  r9  r5  r;  rF  rV  rQ  rP  rl  r   r   r   r  r{  rt  rs  r  r  r,   r$   r$   r"   r%   r      s>    G
	
l

q



@

T
 $^
-

r   c                      sP   e Zd Zd fd
dZ	dd ddZd!ddZd"ddZdd Zdd Z  Z	S )#MooncakeKVSendermgrr   r  r   r   r   dest_tp_ranksr@   rr  c                   s(   t  ||||| d | _t | _d S r   )r   r   conclude_stater  	init_time)r!   r  r  r   r  rr  r"   r$   r%   r     s   zMooncakeKVSender.__init__Nr  r/   r8   r7   c                 C  sp   t | j| jt| }|  jt|7  _| j| jk}|s(| j| j||d d S | jj| j||d| j|d d S )NFT)r|   r8   )r1   curr_idxrn   num_kv_indiceskv_mgrr  r   r|   )r!   r  r8   r2   r4   r$   r$   r%   send  s$   
zMooncakeKVSender.sendr   r
   c                 C     | j d u rQ| j| j}|tjtjfv r|| _ |S |tjkrO| jd urOt		 }|| j }|| jj
krOtd | j| jd| j d|dd tj| _ tjS |S | j S )Na#  Some requests timed out when bootstrapping, which means prefill instances fail to receive the KV indices from the decode instance of this request. If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600' (10 minutes) to relax the timeout condition. Request  timed out after .1fzs in KVPoll.Bootstrapping)r  r  r{  r   r
   ry  ru  Bootstrappingr  r  r   r   r]  rs  r!   r   nowelapsedr$   r$   r%   poll  s*   



zMooncakeKVSender.pollr  c                 C  s&   | j | jjv r| jj| j  d S d S r   )r   r  rz  r|  r&   r$   r$   r%   clear  s   zMooncakeKVSender.clearc                 C  `   | j d u r	tj| _ |   | jj | jj| jd}W d    n1 s%w   Y  t	| j|Nz1Failed due to an unknown reason from another rank
r  r
   ru  r  r  r   r   r|  r   r   r!   r   r$   r$   r%   failure_exception     

z"MooncakeKVSender.failure_exceptionc                 C     | j | jd tj| _d S NzAborted by AbortReq.r  rs  r   r
   ru  r  r&   r$   r$   r%   abort  
   zMooncakeKVSender.abort)
r  r   r  r   r   r   r  r@   rr  r   r   )r  r/   r8   r7   r   r
   r   r  )
r)   r*   r+   r   r  r  r  r  r  r,   r$   r$   r"   r%   r    s    

r  c                      sx   e Zd Ze Zi Zi Ze	 Z
		dd  fd	d
Zdd Z		dd!ddZd"ddZd#ddZdd Zdd Z  ZS )$MooncakeKVReceiverNr  r   r  r   r   r5   prefill_dp_rankc                   sT   |  | _d | _d | _t |||| | jj| j 	| j
 | j| j
tj d S r   )r  
session_idr  r  r   r   r  r   r  rw  r   rt  r
   r  )r!   r  r  r   r  r"   r$   r%   r     s   
zMooncakeKVReceiver.__init__c                 C  s\  | j D ]}ddd | jjjD }ddd | jjjD }ddd | jjjD }ddd | jjjD }t| jjdg }ddd |D }| jjj	}| jjj
d	 }	t|d
}
t| jjd
}t|	d
}| |\}}|, |dd
| jjd
t| jjd
| jd
||||
||||g W d    n1 sw   Y  qd S )NrG   c                 s      | ]	}t d |V  qdS rg   Nrl   rC  r   ptrr$   r$   r%   r        
z7MooncakeKVReceiver._register_kv_args.<locals>.<genexpr>c                 s  r  r  r  r  r$   r$   r%   r    r  c                 s  r  r  r  r  r$   r$   r%   r    r  c                 s  r  rj   Nr  )r   r   r$   r$   r%   r    s
    

r\  c                 s  r  r  r  )r   dimr$   r$   r%   r  
  r  r   rK   r  )bootstrap_infosjoinr  rz   r   rx   r   rU  rO  r  r  r   rB  r  _connect_to_bootstrap_serverr@  local_ip	rank_portr  )r!   bootstrap_infopacked_kv_data_ptrspacked_aux_data_ptrspacked_state_data_ptrspacked_state_item_lensr\  packed_state_dim_per_tensortp_rankkv_item_lenra   rb   rc   socklockr$   r$   r%   _register_kv_args  sX   








z$MooncakeKVReceiver._register_kv_argsr  r/   r|   r8   r7   c                 C  s  | j d u r| j| jd| j  | j| jtj d S | j D ]g}| |\}}|d }|P |	t
| jd| jjdt
| jjd| jd|sQ| nd|s[t
|dnd|sl|d urltj|tjd ndt
| jdg W d    n1 sw   Y  qt | _d S )Nz;Could not fetch prefill parallel info from bootstrap_addr: rC   rK   rG   rI   )r  r  rs  r   r  rt  r
   ru  r  r@  r   rB  r  r  r  tobytesrQ   rR   rS   rB   r  r  )r!   r  r|   r8   r  r  r  rC   r$   r$   r%   init'  sB   



zMooncakeKVReceiver.initr   r
   c                 C  r  )NzSome requests fail to receive KV Cache transfer done signal after bootstrapping. If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600' (10 minutes) to relax the timeout condition. r  r  r  zs in KVPoll.WaitingForInput)r  r  r{  r   r
   ry  ru  r  r  r  r   r   r]  rs  r  r$   r$   r%   r  O  s*   



zMooncakeKVReceiver.pollr  c                 C  sb   | j | jjv r| jj| j  | j | jjv r| jj| j  | j | jjv r/| jj| j  d S d S r   )r   r  rz  r|  r  r   r&   r$   r$   r%   r  i  s   zMooncakeKVReceiver.clearc                 C  r  r  r  r  r$   r$   r%   r  s  r  z$MooncakeKVReceiver.failure_exceptionc                 C  r  r  r  r&   r$   r$   r%   r    r  zMooncakeKVReceiver.abortr  )r  r   r  r   r   r5   r  r5   )r  r/   r|   r5   r8   r7   r  r  )r)   r*   r+   zmqContext_ctx_socket_cache_socket_locksr   r   _global_lockr   r  r  r  r  r  r  r,   r$   r$   r"   r%   r    s     3
(

r  c                   @  s   e Zd ZdS )MooncakeKVBootstrapServerN)r)   r*   r+   r$   r$   r$   r%   r    s    r  )?
__future__r   concurrent.futuresr   rp   dataclassesloggingr   rl   r   r  collectionsr   typingr   r   r   r   r   numpyrQ   numpy.typingnptr   r  #sglang.srt.disaggregation.base.connr	   r
   %sglang.srt.disaggregation.common.connr   r   r   r   &sglang.srt.disaggregation.common.utilsr   r   (sglang.srt.disaggregation.mooncake.utilsr   sglang.srt.disaggregation.utilsr   %sglang.srt.distributed.parallel_stater   sglang.srt.environr   sglang.srt.server_argsr   sglang.srt.utilsr   r   	getLoggerr)   r   r}  r   	dataclassr-   r:   r\   ro   r   r  r  r  r$   r$   r$   r%   <module>   sZ    

()       f\ (