o
    پi                     @  sh  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZmZmZ d dlZd dlm
Z d dlZd dlmZmZ d dlmZmZmZmZ d dlmZ d dlmZ d d	l m!Z! d d
l"m#Z# e$e%Z&d'dZ(ej)G dd dZ*ej)G dd dZ+ej)G dd dZ,G dd deZ-G dd deZ.G dd deZ/G dd deZ0dS )    )annotationsN)defaultdict)DictListOptionalSet)KVArgsKVPoll)CommonKVBootstrapServerCommonKVManagerCommonKVReceiverCommonKVSender)group_concurrent_contiguous)DisaggregationMode)envs)
ServerArgsNixlMsgGuardasciic                   @  sh   e Zd ZU dZded< ded< ded< ded< ded	< ded
< ded< ded< dd ZedddZdS )TransferInfozZContains indices for a transfer, sent by KVReceiver. Received by prefill bootstrap thread.introomstrendpointdst_port
agent_namenpt.NDArray[np.int32]dst_kv_indicesdst_aux_indexrequired_dst_info_num	List[int]dst_state_indicesc                 C  s   | j jdkS )Nr   )r   sizeself r$   W/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/nixl/conn.pyis_dummy.   s   zTransferInfo.is_dummymsgList[bytes]c                 C  s   t |dkr|d dkrttj|d tjd}ng }| t|d d|d dt|d d|d dtj|d	 tjdt|d
 dt|d d|dS )N       dtyper   r                     )r   r   r   r   r   r   r   r    )lenlistnp
frombufferint32r   decode)clsr'   r    r$   r$   r%   from_zmq1   s   zTransferInfo.from_zmqNr'   r(   )__name__
__module____qualname____doc____annotations__r&   classmethodr:   r$   r$   r$   r%   r   !   s   
 r   c                   @  s   e Zd ZU dZded< ded< ded< ded< ded	< d
ed< d
ed< d
ed< ded< ded< ded< ded< edddZdS )KVArgsRegisterInfoz{Contains base pointers and other info which only needs to be sent once by KVReceiver. Received by prefill bootstrap thread.r   r   r   r   r   r   bytesagent_metadata	list[int]dst_kv_ptrsdst_aux_ptrsdst_state_data_ptrsgpu_iddecode_tp_sizedecode_tp_rankdst_kv_item_lenr'   r(   c                 C  s  t |dkr |d dkr ttt |d d  d|d }ng }| t|d d|d dt|d d|d	 d|d
 ttt |d d  d|d ttt |d d  d|d |t|d dt|d dt|d dt|d ddS )Nr)   r*      Qr   r   r-   r.   r/   r0   r1   r2   	   
      )r   r   r   r   rD   rF   rG   rH   rI   rJ   rK   rL   )r3   r4   structunpackr   r8   r   )r9   r'   rH   r$   r$   r%   r:   V   s"   ($$zKVArgsRegisterInfo.from_zmqNr;   )r<   r=   r>   r?   r@   rA   r:   r$   r$   r$   r%   rB   E   s    
 rB   c                   @  s   e Zd ZU dZejdd dZded< ejedZ	ded< d	Z
d
ed< dZded< ejedZded< dZded< dZded< dd Zdd Zd	S )TransferStatusz4Used by KV Receiver to know when a transfer is done.c                   C  s   t tS N)r   setr$   r$   r$   r%   <lambda>t   s    zTransferStatus.<lambda>)default_factoryzDict[int, Set[int]]received_kvs_per_ppzDict[int, int]expected_kvs_per_ppNOptional[int]num_pp_ranks_expectedFboolreceived_auxzSet[int]received_state_per_ppexpects_state
is_failurec                 C  s|   | j rdS | jd u s| jsdS | jrt| j| jk rdS t| j| jk r&dS | j D ]\}}t| j| |kr; dS q+dS )NTF)	ra   r\   r^   r`   r3   r_   rZ   itemsrY   )r#   pp_rankexpectedr$   r$   r%   is_done   s   zTransferStatus.is_donec                 C  s   | j S rU   )ra   r"   r$   r$   r%   	is_failed   s   zTransferStatus.is_failed)r<   r=   r>   r?   dataclassesfieldrY   r@   dictrZ   r\   r^   rV   r_   r`   ra   re   rf   r$   r$   r$   r%   rT   n   s   
 rT   c                      s   e Zd Z	dZd[ fd
dZdd Zdd Zd\ddZd]ddZd^ddZdd Z	d_d d!Z
d`d,d-Zdad1d2Zdbd7d8Zdcd<d=ZdddBdCZdedDdEZ	F	FdfdgdQdRZdSdT ZdhdVdWZdXdY Z  ZS )iNixlKVManagerFargsr   disaggregation_moder   server_argsr   is_mla_backendOptional[bool]c              
     sr  t  |||| z
ddlm}m} W n ty$ } ztd|d }~ww tj }||g|t	j
kr4dndd}	|tt |	| _| j }
||
vrXtd| d|
 d|
 td	|  |   | jt	j
krp|   d S | jt	jkrtt| _i | _ttj| _t | _ tt!| _"t | _#t$tj% d
| _&t$tj' d| _(tj) | _*| +  d S td| j )Nr   )
nixl_agentnixl_agent_configzPlease install NIXL by following the instructions at https://github.com/ai-dynamo/nixl/blob/main/README.md to run SGLang with NixlTransferEngine.rM   )backendsnum_threadszNIXL backend 'z' not found. Available: z:. Please install the required NIXL plugin or choose from: z)NIXL KVManager initialized with backend: g       @r-   z Unsupported DisaggregationMode: ),super__init__	nixl._apirp   rq   ImportErrorr   "SGLANG_DISAGGREGATION_NIXL_BACKENDgetr   PREFILLr   uuiduuid4agentget_plugin_list
ValueErrorloggerinforegister_buffer_to_enginerl   _start_bootstrap_threadDECODEr   rT   transfer_statusesheartbeat_failuresrequestsSessionsession_pool	threadingLocksession_pool_lockrV   addr_to_rooms_trackerconnection_lockmax(SGLANG_DISAGGREGATION_HEARTBEAT_INTERVALheartbeat_interval+SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILUREmax_failures%SGLANG_DISAGGREGATION_WAITING_TIMEOUTwaiting_timeout_start_heartbeat_checker_thread)r#   rk   rl   rm   rn   rp   rq   ebackendagent_configavailable_plugins	__class__r$   r%   ru      s`   







zNixlKVManager.__init__c                   s"    fdd}t j|dd  dS )z
        Start the heartbeat checker thread for Decode worker.
        TODO (smor): unite nixl heartbeat checker with mooncake's.
        c               	     s  	 t  j  j t j } W d    n1 sw   Y  | D ]}d }zd j  j| }W d    n1 s;w   Y  |j	d| ddddid}|j
dkrYd	 j|< n1td
| d  j	|d	d  j|<  j | jv r{ j|= W d    n1 sw   Y  W n ty   td
| d  j	|d	d  j|< Y nw  j	|d	 jkrՈ |  j | jv rƈ j|= W d    n1 sw   Y  q#q)NTzhttp://z/health)r.   r/   
Connectionz
keep-alive)timeoutheaders   r   zAttempting to reconnect to z...r-   )timesleepr   r   r4   prefill_dp_size_tablekeysr   r   ry   status_coder   r   r   	Exceptionr   _handle_node_failure)	addressesbootstrap_addrsessionresponser"   r$   r%   heartbeat_checker   sZ   





zHNixlKVManager._start_heartbeat_checker_thread.<locals>.heartbeat_checkerT)targetdaemonNr   Threadstart)r#   r   r$   r"   r%   r      s   ,z-NixlKVManager._start_heartbeat_checker_threadc                   s(  | j F  fdd| jD }|D ]}| j|= q | jv r | j =  | jv r)| j =  | jv r2| j = | j g } | jv rB| j = W d   n1 sLw   Y  g }|D ]}|| jv rn| j|  snd| j| _	|
| qUtd  dt| d |D ]}td| d	 | |tj qdS )
z!Handle failure of a prefill node.c                   s   g | ]	}|  r|qS r$   )
startswith).0kfailed_bootstrap_addrr$   r%   
<listcomp>  s
    
z6NixlKVManager._handle_node_failure.<locals>.<listcomp>NTz7Lost connection with prefill instance (bootstrap_addr: z), z transfers affectedz	Let room z be failed due to prefill down)r   connection_poolprefill_attn_tp_size_tabler   prefill_pp_size_tabler   ry   r   re   ra   appendr   errorr3   update_statusr	   Failed)r#   r   keys_to_remover   possible_affected_roomsaffected_roomsr   r$   r   r%   r     sH   







z"NixlKVManager._handle_node_failurebootstrap_roomr   c                 C  s
   | j | S rU   )request_status)r#   r   r$   r$   r%   check_status5  s   
zNixlKVManager.check_statusstatusr	   c                 C  sL   || j vr|| j |< d S |tjkrtj| j |< d S t| j | || j |< d S rU   )r   r	   r   r   )r#   r   r   r$   r$   r%   r   8  s   


zNixlKVManager.update_statusfailure_reasonr   c                 C  s   d S rU   r$   )r#   r   r   r$   r$   r%   record_failureD  s   zNixlKVManager.record_failurec           
      C  sP  g }t | jj| jjD ]\}}|||| jjdf q| j|d| _t	
dt|  | js4tdg }t | jj| jjD ]\}}|||ddf q?| j|d| _t	
dt|  | jsftd| jjr| jjrg }t | jj| jjD ]\}}	|||	| jjdf qy| j|d| _t	
d	t|  | jstd
d S d S d S )N VRAMz#Register kv tensors, len(kv_addr)= z.NIXL memory registration failed for kv tensorsr   DRAMz&Register aux tensors, len(aux_addrs)= z/NIXL memory registration failed for aux tensorsz*Register state tensors, len(state_addrs)= z1NIXL memory registration failed for state tensors)zipkv_argskv_data_ptrskv_data_lensr   rI   r}   register_memorykv_descsr   debugr3   r   aux_data_ptrsaux_data_lens	aux_descsstate_data_ptrsstate_data_lensstate_descs)
r#   kv_addrskv_data_ptrkv_data_len	aux_addrsaux_data_ptraux_data_lenstate_addrsstate_data_ptrstate_data_lenr$   r$   r%   r   G  sD   z'NixlKVManager.register_buffer_to_enginedecode_kv_argsrB   c                 C  sB   |j }|| jv rtd| d d S || j|< | j|j d S )NzPeer z" was already registered, ignoring.)r   decode_kv_args_tabler   r   r}   add_remote_agentrD   )r#   r   r   r$   r$   r%   _add_remote_peerk  s   

zNixlKVManager._add_remote_peer	peer_namesrc_data_ptrsrE   dst_data_ptrs	item_lensprefill_data_indicesr   dst_data_indices
dst_gpu_idnotifc	              	     s  t ||\}	}
td| d|  | jr,| ||\}fddt|D }n%| ||\ } fddt|D fddt|D  }g }g }g }g }tjdd |	D tj	d	}tjd
d |
D tj	d	}tjdd |	D tj	d	}|D ]%\}}}|| }|
|||   |
| |
|||   |
| qdd }|||| jj}||||}tdt| dt|  | j|d}| j|d}| jd||||d}|std| j|}|dkrtd|S )z|Generic KV cache transfer supporting both MHA and MLA architectures.
        Used by both send_kvcache and maybe_send_extra.zsending kvcache to z with notif c                   "   g | ]}|  | | fqS r$   r$   r   layer_id)rF   r   src_kv_ptrsr$   r%   r         z7NixlKVManager._send_kvcache_generic.<locals>.<listcomp>c                   r   r$   r$   r   )
dst_k_ptrsr   
src_k_ptrsr$   r%   r     r   c                   r   r$   r$   r   )
dst_v_ptrsr   
src_v_ptrsr$   r%   r     r   c                 s      | ]}|d  V  qdS r   Nr$   r   blockr$   r$   r%   	<genexpr>      z6NixlKVManager._send_kvcache_generic.<locals>.<genexpr>r+   c                 s  r   r   r$   r   r$   r$   r%   r     r   c                 s      | ]}t |V  qd S rU   r3   r   r$   r$   r%   r     r   c                 S  s@   | s
t jdt jdS t | }t |}t ||t ||fS N)r   r/   r+   r5   emptyint64concatenatecolumn_stack	full_like)addr_chunks
len_chunksgpu
flat_addrs	flat_lensr$   r$   r%   make_req_array  s   


z;NixlKVManager._send_kvcache_generic.<locals>.make_req_arrayzlen(src_addrs): before group: z, after group: r   WRITEr   "KVSender failed to create transferERR KVSender failed to post transfer)r   r   r   rn   get_mla_kv_ptrs_with_pprangeget_mha_kv_ptrs_with_ppr5   fromiterr  r   r   rI   r3   r}   get_xfer_descsinitialize_xferencoder   transfer)r#   r   r   r   r   r   r   r   r   prefill_kv_blocksdst_kv_blockslayers_current_pp_stagelayers_params	src_addrssrc_lens	dst_addrsdst_lensprefill_starts
dst_starts
block_lenssrc_ptrdst_ptritem_lenlengthsr
  src_reqsdst_reqs	src_descs	dst_descsxfer_handlestater$   )r   rF   r   r   r   r   r   r%   _send_kvcache_generics  sr   



z#NixlKVManager._send_kvcache_genericprefill_kv_indicesrF   r   c              
   C  s"   | j || jj|| jj||||dS )Nr   r   r   r   r   r   r   r   )r,  r   r   kv_item_lens)r#   r   r-  rF   r   r   r   r$   r$   r%   send_kvcache  s   	zNixlKVManager.send_kvcacheprefill_tp_sizerJ   rK   rL   c           /   	     s@  | j j| }|	| }| j j}|}|| | }| j jd }| j j}|
| | }||kr3d}|}|| }n
|| | }|}d}| | j j|\ }|| }|| }|| } fddt|D fddt|D  }tj	|tj
d}tj	|tj
d}|| }|
| }tj|tj
d}g } g }!|D ]F\}"}#|"||  }$|#||
  }%|$d d d f |d d d f |  |  }&|%d d d f |d d d f |  |  }'| |& |!|' qdd }(|(| || j j})|(|!||}*| j|)d}+| j|*d},| jd	|+|,||d
}-|-std| j|-}.|.dkrtd|-S )Nr   c                      g | ]
}|  | fqS r$   r$   r   )r   r   r$   r%   r   (      z4NixlKVManager.send_kvcache_slice.<locals>.<listcomp>c                   r2  r$   r$   r   )r   r   r$   r%   r   .  r3  r+   c                 S  s>   | s
t jdt jdS t | }t |t ||t ||fS r   r   )r  r!   r  r  r$   r$   r%   r
  Q  s   


z8NixlKVManager.send_kvcache_slice.<locals>.make_req_arrayr   r  r   z#Failed to create sliced KV transferr  z!Failed to post sliced KV transfer)r   engine_rankkv_head_numr/  	page_sizer  r   r  r5   asarrayr  arangeravelr   rI   r}   r  r  r  r   r  )/r#   r   r-  rF   r   r   r   r1  rJ   rK   rL   local_tp_rank_in_groupdst_tp_rank_in_groupnum_kv_headssrc_heads_per_rankdst_heads_per_ranksrc_kv_item_lenr6  bytes_per_head_slice_to_sendsrc_head_start_offsetnum_heads_to_senddst_head_start_offsetr  src_head_slice_offsetdst_head_slice_offsetheads_bytes_per_token_to_sendsrc_dst_ptr_pairsprefill_indicesdst_indicesbytes_per_token_prefillbytes_per_token_decodetoken_offsetsr  r  r"  r#  src_page_basesdst_page_basessrc_alldst_allr
  r&  r'  r(  r)  r*  r+  r$   )r   r   r   r   r%   send_kvcache_slice  s   




z NixlKVManager.send_kvcache_sliceprefill_aux_indexrG   r   c              	   C  s   g }g }| j j}| j j}	t|D ](\}
}|	|
 }||
 ||  }||
 ||  }|||df |||df q| j|d}| j|d}| jd||||d}|sZt	d| j
|}|dkrht	d|S )Nr   r   r  r   r  r  r  )r   r   aux_item_lens	enumerater   r}   r  r  r  r   r  )r#   r   rR  rG   r   r   r  r  prefill_aux_ptrsprefill_aux_item_lensi_lengthsrc_addrdst_addrr(  r)  r*  r+  r$   r$   r%   send_auxr  s2   zNixlKVManager.send_auxprefill_state_indicesr   rH   r    c              	   C  s  t |dks
J dt |t |ksJ dg }g }| jj}	| jj}
t|D ]0\}}|
| }|	| |t|d   }||t|d   }|||| jjf ||||f q&| j	|d}| j	|d}| j
d||||d}|sxtd| j|}|d	krtd
|S )zTransfer Mamba states via RDMA.r-   z$Mamba should have single state indexz7State indices count mismatch between Prefill and Decoder   r   r  r   z%Failed to create Mamba state transferr  z#Failed to post Mamba state transfer)r3   r   r   state_item_lensrT  r   r   rI   r}   r  r  r  r   r  )r#   r   r]  rH   r    r   r   r  r  prefill_state_data_ptrsprefill_state_item_lensrW  dst_state_ptrrY  rZ  r[  r(  r)  r*  r+  r$   r$   r%   _send_mamba_state  sB   

zNixlKVManager._send_mamba_statec           	   
   C  s   t | jdd}|dkr| j|krtd| ||||||S |dv rg| js4| j|kr4td|  dt|t|krJtdt| d	t| | j|| jj	|| jj
tj|tjd
tj|tjd
||dS |dkrstd| ddS )z:Send state or extra pool data with type-specific handling.
state_typenonemambazUPD Disaggregation does NOT support PD different TP sizes for hybrid mamba models yet.)swansazEPD Disaggregation does NOT support PD different TP sizes for non-MLA z hybrid models yet.z%State index length mismatch: prefill=z, dst=r+   r.  z,PD Disaggregation via NIXL does NOT support N)getattrr   attn_tp_sizeRuntimeErrorrb  rn   upperr3   r,  r   r^  r5   arrayr7   )	r#   r   r]  rH   r    r   r   rJ   rc  r$   r$   r%   maybe_send_extra  sP   

zNixlKVManager.maybe_send_extraN
kv_indicesindex_slicesliceis_lastr]   chunk_id	aux_indexr[   state_indicesOptional[List[int]]c                 C  s  | j tjksJ |r|r|d usJ | j|  }g }	|D ]}
||
jks&J |
 r+q|
j| }t|t|ks:J |
j	| j
v sBJ |
j d| dt| d| jj }| j
|
j	 j}| jsc|| jkry| |
j	|| j
|
j	 j|| j
|
j	 j|}n%| j|
j	|| j
|
j	 j|| j
|
j	 j|| j|| j
|
j	 j| j
|
j	 jd
}|	| |r|d ur| j
|
j	 }| |
j	||j|
j|j|
j d| jj |}|d ur|	| |d usJ | |
j	|| j
|
j	 j|
j|
j d}|	| q|r| j|= |	S )N_kv_rX  )r1  rJ   rK   rL   _state__aux)rl   r   rz   transfer_infosvaluesr   r&   r   r3   r   r   r   r   rc   rJ   rn   ri  r0  rF   rI   rQ  rK   rL   r   rm  rH   r    r\  rG   r   )r#   r   rn  ro  rq  rr  rs  rt  reqs_to_be_processedhandlesreqchunked_dst_kv_indicer   rJ   kv_xfer_handledst_infostate_xfer_handleaux_xfer_handler$   r$   r%   add_transfer_request  s   

$	
	


z"NixlKVManager.add_transfer_requestc           
      C  s6  | j  }| D ]\}}|D ]}|ddd}t|d }|d dkrmt|d }tt|d }t|dkr@t|d nd}	| j| j	|	 
| |rl|d | j| j|	< | j| jd u rl| j|d| j| _q|d d	krzd
| j| _q|d dkrt|dkrt|d nd}	| j| j
|	 qq	d S )Nr   rX  r0   r   r-   kvr.   r/   auxTr+  )r}   get_new_notifsrb   r8   splitr   r]   r3   r   rY   addrZ   r\   #required_prefill_response_num_tablery   r^   r_   )
r#   	notif_mapr   messagesr'   
componentsr   rr  rq  rc   r$   r$   r%   update_transfer_statusP  s6   

z$NixlKVManager.update_transfer_statusr   c                 C  s   || j vrdS | j |  S )NF)r   re   )r#   r   r$   r$   r%   check_transfer_doner  s   
z!NixlKVManager.check_transfer_donec                   s     fdd}t j|d  d S )Nc                    s*  	  j  } tdtdd | D   | d tks#J dt d| dd	 } | d d
}| d d
}|dkrM t	|  td| d q t
|}| jvr[i  j|< t	|  j| |<  j| | j}td|d|d| t j| |krtd|d  |tj q)z6This thread recvs transfer info from the decode engineTz(Received multipart with total byte size c                 s  r   rU   r   r   xr$   r$   r%   r   }  r   zRNixlKVManager._start_bootstrap_thread.<locals>.bootstrap_thread.<locals>.<genexpr>r   zFirst message should be z. Foreign traffic?r-   Nr   r/   NonezRegister KVArgs from z successfullyzgot info room=z agent_name=z required_dst_info_num=zroom=z is bootstrapped)server_socketrecv_multipartr   r   sumGUARDr8   r   rB   r:   r   ry  r   r   r3   r   r	   WaitingForInput)waiting_req_bytesr   r   r   r"   r$   r%   bootstrap_threadx  sB   



z?NixlKVManager._start_bootstrap_thread.<locals>.bootstrap_thread)r   r   )r#   r  r$   r"   r%   r   w  s   "z%NixlKVManager._start_bootstrap_thread)F)rk   r   rl   r   rm   r   rn   ro   )r   r   )r   r   r   r	   )r   r   r   r   )r   rB   )r   r   r   rE   r   rE   r   rE   r   r   r   r   r   r   r   r   )r   r   r-  r   rF   rE   r   r   r   r   r   r   )r   r   r-  r   rF   rE   r   r   r   r   r   r   r1  r   rJ   r   rK   r   rL   r   )
r   r   rR  r   rG   rE   r   r   r   r   )r   r   r]  r   rH   rE   r    r   r   r   r   r   )r   r   r]  r   rH   rE   r    r   r   r   r   r   rJ   r   NN)r   r   rn  r   ro  rp  rq  r]   rr  r   rs  r[   rt  ru  )r   r   )r<   r=   r>   ru   r   r   r   r   r   r   r   r,  r0  rQ  r\  rb  rm  r  r  r  r   __classcell__r$   r$   r   r%   rj      s,    =4
(


$

n

}
&
/<T
"rj   c                      s>   e Zd Zd fd
dZ	ddddZdddZdd Z  ZS )NixlKVSendermgrrj   r   r   r   r   dest_tp_ranksr   rc   c                   s*   t  ||||| g | _d| _d| _d S )NFr   )rt   ru   xfer_handleshas_sentrr  )r#   r  r   r   r  rc   r   r$   r%   ru     s   
zNixlKVSender.__init__Nrn  r   rt  ru  c              	   C  s   t | j| jt| }|  jt|7  _| j| jk}| j| j|||| j| j|}| j	
| |  jd7  _|rCd| _| jj| j= d S d S )Nr-   T)rp  curr_idxr3   num_kv_indiceskv_mgrr  r   rr  rs  r  extendr  r   )r#   rn  rt  ro  rq  new_xfer_handlesr$   r$   r%   send  s$   	zNixlKVSender.sendreturnr	   c                   s`    j s
 j jS  fdd jD }tdd |D r tjS tdd |D r-t	dtj
S )Nc                   s   g | ]	} j j|qS r$   )r  r}   check_xfer_stater  r"   r$   r%   r     s    z%NixlKVSender.poll.<locals>.<listcomp>c                 S     g | ]}|d kqS )DONEr$   r  r$   r$   r%   r         c                 S  r  )r  r$   r  r$   r$   r%   r     r  z'KVSender transfer encountered an error.)r  r  r   r   r  allr	   Successanyr   r  )r#   statesr$   r"   r%   poll  s   zNixlKVSender.pollc                 C     t d)NzNIXL KVSender Exceptionrj  r"   r$   r$   r%   failure_exception     zNixlKVSender.failure_exception)
r  rj   r   r   r   r   r  r   rc   r   rU   )rn  r   rt  ru  r  r	   )r<   r=   r>   ru   r  r  r  r  r$   r$   r   r%   r    s    

r  c                      sN   e Zd Z		dd fd	d
Z		ddddZdddZdd Zdd Z  ZS )NixlKVReceiverNr  rj   r   r   r   r[   prefill_dp_rankc                   sJ   d| _ d | _t |||| t| jdr | jj| j | j	 d | _
d S )NFr   )started_transferconclude_statert   ru   hasattrr  r   r   r  r   	init_time)r#   r  r   r   r  r   r$   r%   ru     s   
zNixlKVReceiver.__init__rn  r   rs  rt  ru  c                 C  sf  | j d u rtd| j  | j| jtj d S | j D ]}t	d| d| jj
j  | |\}}|d }t	d| j d| |O |tt| jd| jjdt| jjd| jjjd|sk| ndt|dt| jd|s|d urtj|tjd	 ndg	 W d    n1 sw   Y  q|d urd
| jj| j _d
| _t | _d S )Nz;Could not fetch prefill parallel info from bootstrap_addr: zFetched bootstrap info: z for engine rank: r&   z.Sending to prefill server with bootstrap room z
 is_dummy=r   r*   r+   T)bootstrap_infosr   r   r   r  r   r   r	   r   r   r   r4  _connect_to_bootstrap_serversend_multipartr  r   r  local_ip	rank_portr}   nametobytesr   r5   rl  r7   r   r`   r  r   r  )r#   rn  rs  rt  bootstrap_infosocklockr&   r$   r$   r%   init  sL   


zNixlKVReceiver.initr  r	   c                 C  s$  | j d ur| j S | j| j}|tjtjfv r|| _ |S | js"tjS t		 }|| j
 }|| jjkrStd| j d | j| jd| j d|dd tj| _ tjS | j  | j| jr| jj| j | j | jj| j  rtj| _ td| j d ntj| _ | jj| j= | j S tjS )NzRequest z waiting_timeoutz timed out after z.1fzs in KVPoll.WaitingForInputzTransfer for room z failed due to node failure)r  r  r   r   r	   r  r   r  r  r   r  r   r   r   r   r  r  r   r   discardr   rf   )r#   r   nowelapsedr$   r$   r%   r    s@   


zNixlKVReceiver.pollc                 C  s*  | j D ]}| |\}}ddd | jjjD }ddd | jjjD }ddd | jjjD }|U |t	d
d| jj
dt| jj
d| jjj
d| jj |||t| jjj
dt| jjj
dt| jjj
dt| jjjd 
dg W d    n1 sw   Y  qd S )	Nr*   c                 s      | ]	}t d |V  qdS rN   NrR   packr   ptrr$   r$   r%   r   >      
z3NixlKVReceiver._register_kv_args.<locals>.<genexpr>c                 s  r  r  r  r  r$   r$   r%   r   A  r  c                 s  r  r  r  r  r$   r$   r%   r   D  r  r  r   r   )r  r  joinr  r   r   r   r   r  r  r  r  r   r  r}   r  get_agent_metadatarI   rJ   r4  r/  )r#   r  r  r  packed_kv_data_ptrspacked_aux_data_ptrspacked_state_data_ptrsr$   r$   r%   _register_kv_args;  s>   




z NixlKVReceiver._register_kv_argsc                 C  r  )NzNIXL KVReceiver Exceptionr  r"   r$   r$   r%   r  [  r  z NixlKVReceiver.failure_exceptionr  )r  rj   r   r   r   r[   r  r[   )rn  r   rs  r[   rt  ru  r  )	r<   r=   r>   ru   r  r  r  r  r  r$   r$   r   r%   r    s    
0' r  c                   @  s   e Zd ZdS )NixlKVBootstrapServerN)r<   r=   r>   r$   r$   r$   r%   r  _  s    r  )1
__future__r   rg   loggingrR   r   r   r{   collectionsr   typingr   r   r   r   numpyr5   numpy.typingnptr   #sglang.srt.disaggregation.base.connr   r	   %sglang.srt.disaggregation.common.connr
   r   r   r   &sglang.srt.disaggregation.common.utilsr   sglang.srt.disaggregation.utilsr   sglang.srt.environr   sglang.srt.server_argsr   	getLoggerr<   r   r  r  	dataclassr   rB   rT   rj   r  r  r  r$   r$   r$   r%   <module>   sJ    

#(,      4 