o
    پi                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZmZmZmZmZmZmZm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z&m'Z'm(Z( d d	l)m*Z* d d
l+m,Z, d dl-m.Z. d dl/m0Z0m1Z1m2Z2m3Z3 e4e5Z6dZ7d'ddZ8d(ddZ9ej:G dd dZ;ej:G dd dZ<G dd dZ=ej:G dd dZ>G dd  d e&Z?G d!d" d"e(Z@G d#d$ d$e'ZAG d%d& d&e%ZBdS ))    )annotationsN)defaultdict)DictListOptionalSetTuple)TransferStatus)BackendType
EngineDescIOEngineIOEngineConfig
MemoryDescMemoryLocationType
PollCqModeRdmaBackendConfig)KVArgsKVPoll)CommonKVBootstrapServerCommonKVManagerCommonKVReceiverCommonKVSender)group_concurrent_contiguous)DisaggregationMode)
ServerArgs)format_tcp_addressget_int_env_varget_local_ip_autois_valid_ipv6_addresss   MoriMsgGuardmemsList[MemoryDesc]returnbytesc                 C  s"   | sdS dd | D }t j|S )N    c                 S  s   g | ]}|  qS  )pack).0memr$   r$   W/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/mori/conn.py
<listcomp>4   s    z'_pack_mem_desc_list.<locals>.<listcomp>)msgspecmsgpackencode)r   packed_descsr$   r$   r(   _pack_mem_desc_list1   s   r.   blobc                 C  s"   | sg S t j| }dd |D S )Nc                 S  s   g | ]}t |qS r$   )r   unpack)r&   br$   r$   r(   r)   <   s    z)_unpack_mem_desc_list.<locals>.<listcomp>)r*   r+   decode)r/   
desc_blobsr$   r$   r(   _unpack_mem_desc_list8   s   r4   c                   @  s\   e Zd ZU ded< ded< ded< ded< ded< ded	< ded
< ded< edddZdS )TransferInfointroomstrendpointdst_port
engine_keynpt.NDArray[np.int32]dst_kv_indicesdst_aux_indexrequired_dst_info_numboolis_dummypayloadList[bytes]r!   c           
   
   C  s   t |d d}|d d}t |d d}|d d}|d r/tj|d tjd}ntjg tjd}|d rEt |d d}nd	}t|d
krVt |d
 dnd}|jdko`|dk }	| ||||||||	dS )Nr   ascii            dtype      )r7   r9   r:   r;   r=   r>   r?   rA   )r6   r2   np
frombufferint32arraylensize)
clsrB   r7   r9   r:   r;   r=   r>   r?   rA   r$   r$   r(   from_zmqJ   s.    zTransferInfo.from_zmqN)rB   rC   r!   r5   )__name__
__module____qualname____annotations__classmethodrU   r$   r$   r$   r(   r5   ?   s   
 r5   c                   @  sz   e Zd ZU ded< ded< ded< ded< ded	< ded
< ded< ded< ded< ded< edddZedddZdS )KVArgsRegisterInfor8   r9   r6   r:   r   engine_descr    dst_kv_mem_descsdst_aux_mem_descsdst_state_mem_descsgpu_iddecode_tp_sizedecode_tp_rankdst_kv_item_lenr!   c                 C  s   | j jS N)r\   keyselfr$   r$   r(   r;   x   s   zKVArgsRegisterInfo.engine_keyrB   rC   c                 C  s   |d  d}t|d  d}t|d }t|d }t|d }t|d }t|d  d}t|d	  d}	t|d
  d}
t|d  d}| ||||||||	|
|d
S )NrE   rD   rF   rG   rH   rK      rM      	   
   )
r9   r:   r\   r]   r^   r_   r`   ra   rb   rc   )r2   r6   r   r0   r4   )rT   rB   r9   r:   r\   r]   r^   r_   r`   ra   rb   rc   r$   r$   r(   rU   |   s,   zKVArgsRegisterInfo.from_zmqNr!   r8   )rB   rC   r!   r[   )rV   rW   rX   rY   propertyr;   rZ   rU   r$   r$   r$   r(   r[   k   s   
 r[   c                   @  s$   e Zd Zedd Zedd ZdS )AuxDataCodecc                 C  s   t j| | }t|S rd   )ctypesc_bytefrom_addressr"   )src_addrdata_lengthbufferr$   r$   r(   serialize_data_from_buffer   s   z'AuxDataCodec.serialize_data_from_bufferc                 C  sD   | j | }| j| }|||  }tjt| |}||d d < d S rd   )aux_data_ptrsaux_item_lensro   rp   rR   rq   )kv_argsbuffer_index	aux_indexdatadst_aux_ptritem_lendst_addrrt   r$   r$   r(   deserialize_data_to_buffer   s   

z'AuxDataCodec.deserialize_data_to_bufferN)rV   rW   rX   staticmethodru   r   r$   r$   r$   r(   rn      s
    
rn   c                   @  sN   e Zd ZU ded< ded< ded< ded< ded< ded< ded< ded	< d
S )TPSliceConfigr6   	page_sizesrc_item_lendst_item_lenbytes_per_token_srcbytes_per_token_dstsrc_head_slice_offsetdst_head_slice_offsetheads_bytes_per_token_to_sendN)rV   rW   rX   rY   r$   r$   r$   r(   r      s   
 r   c                      s   e Zd ZdZ	dudv fddZdwddZdxddZdyddZdzddZd{ddZ	d|d"d#Z
d|d$d%Zd}d(d)Zdxd*d+Zd~d,d-Zdxd.d/Z	0ddd4d5Zdd8d9Zdd=d>Zdd@dAZddJdKZddNdOZddTdUZddXdYZdd]d^Zdd_d`ZddgdhZddidjZ	0	0dddsdtZ  ZS )MoriKVManagers   AUX_DATAFargsr   disaggregation_moder   server_argsr   is_mla_backendOptional[bool]c                   s   t  |||| |  | _| j | _g | _g | _g | _i | _	t
 | _t
 | _|   | jtjkr@tdd| _|   d S | jtjkr_tdd| _tt| _tt| _i | _|   d S d S )N'SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUTi,  %SGLANG_DISAGGREGATION_WAITING_TIMEOUT)super__init___init_engineengineget_engine_descr\   kv_mem_descsaux_mem_descsstate_mem_descsfailure_records	threadingLockfailure_locktransfer_lock_register_local_buffersr   r   PREFILLr   bootstrap_timeout_start_bootstrap_threadDECODEwaiting_timeoutr   setprefill_response_trackeraddr_to_rooms_trackerroom_to_bootstrap_addr_start_decode_thread)rg   r   r   r   r   	__class__r$   r(   r      s0   




zMoriKVManager.__init__r!   r   c           
   
   C  s   | j jr| j jtjd< t | _t| jdd}d| jj d| j	 d| j
 dt  d| j 
}t||}tj}td	d
}tdd}tdd
}t||||d}|tj| | j}	|	dksdJ d| td|| j|	|||j |S )NMORI_RDMA_DEVICESr   )hostportzio-z-dpz-tpz-pid-SGLANG_MORI_QP_PER_TRANSFERrE   SGLANG_MORI_POST_BATCH_SIZErL   SGLANG_MORI_NUM_WORKERSFzFailed to bind port for engine zTInitialized Mori IOEngine %s at %s:%s (qp_per_transfer=%s, workers=%s, poll_mode=%s))rx   	ib_deviceosenvironr   local_ipr   r   valuesystem_dp_rankattn_tp_rankgetpidr   r   POLLINGr   r   create_backendr
   RDMAr   r   loggerdebugname)
rg   configr;   r   	poll_modeqp_per_transferpost_batch_sizenum_worker_threadsrdma_cfgactual_portr$   r$   r(   r      sN   




	zMoriKVManager._init_engineNonec                 C  s   t | jj| jjD ]\}}| j||| jjtj}| j	
| q	t | jj| jjD ]\}}| j||dtj}| j
| q)t | jjt| jdg D ]\}}| j||| jjtj}| j
| qJd S )NrL   state_data_lens)ziprx   kv_data_ptrskv_data_lensr   register_memoryr`   r   GPUr   appendrv   aux_data_lensCPUr   state_data_ptrsgetattrr   )rg   ptrlengthmem_descdescr$   r$   r(   r     s6   z%MoriKVManager._register_local_buffersbootstrap_roomr6   c                 C  s
   | j | S rd   )request_status)rg   r   r$   r$   r(   check_status(  s   
zMoriKVManager.check_statusstatusr   c                 C  sL   || j vr|| j |< d S |tjkrtj| j |< d S t| j | || j |< d S rd   )r   r   Failedmax)rg   r   r   r$   r$   r(   update_status+  s   


zMoriKVManager.update_statusfailure_reasonr8   c                 C  s6   | j  || j|< W d    d S 1 sw   Y  d S rd   )r   r   )rg   r   r   r$   r$   r(   record_failure6  s   "zMoriKVManager.record_failurerB   rC   c                 C  s:   zt |}| | W d S  ty   td Y d S w )NzFailed to register remote peer)r[   rU   _add_remote_peer	Exceptionr   	exception)rg   rB   register_infor$   r$   r(   _handle_register_message:  s   
z&MoriKVManager._handle_register_messagec                 C  s   z1t |}| j|ji }|||j< t||jkr/t	d|jt| | 
|jtj W d S W d S  ty@   td Y d S w )Nz/Bootstrap room %s got enough transfer info (%s)z%Failed to parse transfer info message)r5   rU   transfer_infos
setdefaultr7   r;   rR   r?   r   r   r   r   WaitingForInputr   r   )rg   rB   transfer_infoinfosr$   r$   r(   _handle_transfer_messageA  s   

z&MoriKVManager._handle_transfer_messagemsgOptional[List[bytes]]c                 C  s6   |r|d t krtd d S |dd  }|sd S |S )Nr   z$Received malformed bootstrap messagerE   )
MORI_GUARDr   warning)rg   r   rB   r$   r$   r(   _validate_messageQ  s   
zMoriKVManager._validate_messagec                   "    fdd}t j|dd  d S )Nc                    sr   	 z( j  }  | }|d u rW q |d d}|dkr# | n | W n ty7   td Y nw q)NTr   rD   r   zBootstrap worker failed)	server_socketrecv_multipartr   r2   r   r   r   r   r   )r   rB   r7   rf   r$   r(   bootstrap_worker[  s   


z?MoriKVManager._start_bootstrap_thread.<locals>.bootstrap_workerTtargetdaemonr   Threadstart)rg   r   r$   rf   r(   r   Z  s   z%MoriKVManager._start_bootstrap_threadc                 C  sV   | j |d }|d ur%| j|}|d ur'|| |s)| j|d  d S d S d S d S rd   )r   popr   getdiscard)rg   r   bootstrap_addrroomsr$   r$   r(   _cleanup_room_trackingm  s   
z$MoriKVManager._cleanup_room_trackingc                   r   )Nc                    s  	 z j  } | r| d tjkr |  W q | r| d tkr&td W q | dd  }t|dk r9td W q t	|d 
d}t	|d 
d}t	|d 
d}t|dkre|d re|d 
d	nd }|tjkr j| }||  j|d}t||kr j|d   |tj  | n(|tjkr|r ||  j|d   |tj  | ntd
|| W n ty   td Y nw q)NTr   z2Received malformed status message on decode workerrE   rG   z"Incomplete status payload receivedrD   rF   utf-8z+Unknown status code %s received for room %szDecode status worker failed)r   r   r   AUX_DATA_HEADER_handle_aux_datar   r   r   rR   r6   r2   r   Successr   add#required_prefill_response_num_tabler   r   r   r   r   r   r   r   )r   rB   r   status_codeprefill_rankr   trackerexpectedrf   r$   r(   decode_workerw  sb   







z9MoriKVManager._start_decode_thread.<locals>.decode_workerTr   r   )rg   r  r$   rf   r(   r   v  s   3z"MoriKVManager._start_decode_threadNr   List[TransferInfo]Optional[str]c           	      C  s   |sd S t t|dtt|dt| j| j | j d|r'|dndg}|D ].}zt|j|j	}| j
|t|jd}|| W q, tyZ   td||j|j	| Y q,w d S )NrD   r  r#   is_ipv6z=Failed to sync status %s to decode endpoint %s:%s for room %s)r   r8   r,   r6   r   pp_sizepp_rankr   r9   r:   _connectr   send_multipartr   r   r   )	rg   r   r   r   r   rB   infor9   socketr$   r$   r(   notify_decode_status  s2   
z"MoriKVManager.notify_decode_statusr   r[   c                 C  sP   |j }|| jv rtd| d S | j|j || j|< td||j|j d S )Nz,Remote peer %s already registered. Skipping.z!Registered decode peer %s (%s:%s))	r;   decode_kv_args_tabler   r   r   register_remote_enginer\   r9   r:   )rg   r   r;   r$   r$   r(   r     s   

zMoriKVManager._add_remote_peerdst_mem_descsr    Rtuple[List[MemoryDesc], List[MemoryDesc], List[MemoryDesc], List[MemoryDesc], int]c                 C  s   | j }|s	tdt|d }|d | }||d  }| jj}|| }t|d }t|dk s3||kr7td||| }	||| ||  }
|||	|
|fS )Nz/KV memory descriptors are empty on prefill siderF   z@Destination KV descriptors do not match prefill pp configuration)r   RuntimeErrorrR   rx   prefill_start_layer
ValueError)rg   r  	src_descsnum_local_layerssrc_k_descssrc_v_descsstart_layer	end_layerdst_total_layersdst_k_descsdst_v_descsr$   r$   r(   _get_mha_mem_desc_slices  s$   z&MoriKVManager._get_mha_mem_desc_slices.tuple[List[MemoryDesc], List[MemoryDesc], int]c                 C  sH   | j }t|}| jj}|| }|t|krtd||| }|||fS )NzDDestination MLA KV descriptors do not match prefill pp configuration)r   rR   rx   r  r  )rg   r  r  r  r"  r#  	dst_slicer$   r$   r(   _get_mla_mem_desc_slices  s   
z&MoriKVManager._get_mla_mem_desc_slicessrc_descr   dst_desckv_item_len
src_groupsList[List[int]]
dst_groupsList[TransferStatus]c                   sn   |sg S  fdd|D } fdd|D } fdd|D }| j  }	| j |g|g|g|g|g|	g}
|
S )Nc                      g | ]
}t |d    qS r   r6   r&   	src_groupr-  r$   r(   r)         z8MoriKVManager._issue_layer_transfers.<locals>.<listcomp>c                   r2  r3  r4  )r&   	dst_groupr7  r$   r(   r)     r8  c                   s   g | ]}t |  qS r$   )rR   r5  r7  r$   r(   r)     s    )r   allocate_transfer_uidbatch_write)rg   r+  r,  r-  r.  r0  local_offsetsremote_offsetssizestransfer_uidstatusesr$   r7  r(   _issue_layer_transfers  s   
z$MoriKVManager._issue_layer_transfers	peer_infor   c              
   C  s   | j j}| j jd }|j}|| }|| }| j}|j}| j j}	|	}
|	| | }|dkr/td|| }|dkr;td| j j| }|j	| }||krSd}|
}||
 }n
|| |
 }|}d}|| }|| }|| }||krqtdt
||||||||dS )Nr   z,Destination heads per rank evaluates to zeroz!Head slice size evaluates to zerozCSlice size exceeds destination token capacity for TP slice transfer)r   r   r   r   r   r   r   r   )rx   r   kv_item_lensrc   attn_tp_sizera   kv_head_numr  engine_rankrb   r   )rg   rB  r   r   r   r   r   prefill_tp_sizera   num_kv_headssrc_heads_per_rankdst_heads_per_rankbytes_per_head_slicelocal_tp_rankdst_tp_ranksrc_head_startnum_heads_to_senddst_head_startr   r   heads_bytes_per_tokenr$   r$   r(   _build_tp_slice_config  sR   

z$MoriKVManager._build_tp_slice_config
kv_indicesr<   dst_indicestp_cfgc                 C  s&  |j dks
|j dkrg S t|j |j }|sg S |d | tj}|d | tj}tj|jtjd}	||j }
||j }|	|j	 }|	|j
 }|
d d tjf | |j   }|d d tjf | |j   }||j }|jg| }|s{g S | j }| j|g|g|g|g|g|g}|S )Nr   rI   )rS   minastyperN   int64aranger   r   r   r   r   newaxisr   flattentolistr   r   r   r:  r;  )rg   r+  r,  rS  rT  rU  limit	src_pages	dst_pagestoken_slotssrc_page_basesdst_page_basessrc_token_offsetsdst_token_offsetsr<  r=  num_transfersr>  r?  r@  r$   r$   r(   _issue_tp_slice_transfersR  sT   






z'MoriKVManager._issue_tp_slice_transfersprefill_kv_indicesr=   c              
   C  sJ  t ||\}}g }| jjd }| jr4| |j\}}	}
t|
D ]}|| || |	| ||| q|S |j	| j
k}| |j\}}}}}
|ru| |}t|
D ]"}|| || || ||| || || || ||| qP|S t ||\}}t|
D ]"}|| || || ||| || || || ||| q|S )Nr   )r   rx   rC  r   r*  r]   rangeextendrA  ra   rD  r'  rR  rf  )rg   rB  rg  r=   r.  r0  r@  r-  r  	dst_descslayers_current_pp_stagelayer_idtp_mismatchr   r!  r%  r&  rU  r$   r$   r(   send_kvcache  s   
@

	!	
zMoriKVManager.send_kvcacheprefill_aux_indexr>   r7   c                 C  s   |  ||||S rd   )send_aux_tcp)rg   rB  ro  r>   r7   r$   r$   r(   send_aux  s   zMoriKVManager.send_auxc              	   C  sd   | j j}| j j}tt|D ]!}|| }|| ||  }	t|	|}
| j|j|j	||||
d qg S )N)remoter:   r7   ry   rz   r{   )
rx   rv   rw   rh  rR   rn   ru   send_aux_data_to_endpointr9   r:   )rg   rB  ro  r>   r7   prefill_aux_ptrsprefill_aux_item_lensir   rr   r{   r$   r$   r(   rp    s   	zMoriKVManager.send_aux_tcprr  r:   ry   rz   r{   r"   c                 C  s^   | j t||t|d}|tjt|dt|dt|dt	dt
||g d S )Nr  rD   >I)r  r   r   r  r   r  r8   r,   structr%   rR   )rg   rr  r:   r7   ry   rz   r{   r  r$   r$   r(   rs    s   	z'MoriKVManager.send_aux_data_to_endpointc                 C  s   t |d d}t |d d}t |d d}td|d d }|d }t||kr9td	|  d
S t| j	||| t
d| dt|  d
S )z7Handle AUX_DATA messages received by the decode thread.rE   rD   rF   rG   rw  rH   r   rK   z,AUX_DATA length mismatch for bootstrap_room Nz%Received AUX_DATA for bootstrap_room z with length:)r6   r2   rx  r0   rR   r   errorrn   r   rx   r   )rg   r   r7   ry   rz   rs   r{   r$   r$   r(   r    s   
zMoriKVManager._handle_aux_dataindex_slicesliceis_lastr@   Optional[int]state_indicesOptional[npt.NDArray[np.int32]]9Tuple[List[TransferStatus], Optional[List[TransferInfo]]]c              
   C  sZ  | j tjksJ | j|}|std| g }d }	| j | |tj	 |
 D ]L}
| j|
j}|sG| |d|
j  td|
j |
js[|
j| }| |||}|| |rv|d urv|
jdkrv| jjrv|| |||
j| q*|r| |tj t|
 }	| j|d  W d    ||	fS W d    ||	fS 1 sw   Y  ||	fS )Nz*No transfer info found for bootstrap_room=zPeer info missing for engine zMissing decode peer info for r   )r   r   r   r   r   r  r   r   r   Transferringvaluesr  r;   r   rA   r=   rn  ri  r>   pp_groupis_last_rankrq  r  listr   )rg   r   rS  rz  r|  rz   r~  r   result_statusestarget_infos_snapshotr  rB  dst_indices_chunkr@  r$   r$   r(   add_transfer_request2  sb   	






!
!!z"MoriKVManager.add_transfer_request)F)r   r   r   r   r   r   r   r   )r!   r   r!   r   )r   r6   )r   r6   r   r   )r   r6   r   r8   r!   r   )rB   rC   r!   r   )r   rC   r!   r   )r   r6   r!   r   rd   )
r   r  r   r6   r   r   r   r  r!   r   )r   r[   r!   r   )r  r    r!   r  )r  r    r!   r(  )r+  r   r,  r   r-  r6   r.  r/  r0  r/  r!   r1  )rB  r[   r!   r   )r+  r   r,  r   rS  r<   rT  r<   rU  r   r!   r1  )rB  r[   rg  r<   r=   r<   r!   r1  )
rB  r[   ro  r6   r>   r6   r7   r6   r!   r1  )rr  r8   r:   r6   r7   r6   ry   r6   rz   r6   r{   r"   )r   rC   NN)r   r6   rS  r<   rz  r{  r|  r@   rz   r}  r~  r  r!   r  )rV   rW   rX   r  r   r   r   r   r   r   r   r   r   r   r   r   r  r   r'  r*  rA  rR  rf  rn  rq  rp  rs  r  r  __classcell__r$   r$   r   r(   r      s>    

8






	

	;
 




6
=
T
	

r   c                      s   e Zd Zd, fd
dZ	d-d.ddZd/ddZd0ddZd0ddZd1ddZ	d-d2d"d#Z	d-d3d$d%Z
d4d&d'Zd(d) Zd*d+ Z  ZS )5MoriKVSendermgrr   r   r8   r   r6   dest_tp_ranks	List[int]r  c                   s@   t  ||||| g | _d | _d| _d | _d| _t | _d S )NF)	r   r   transfer_statusespending_infossent_last_chunkconclude_statestatus_notifiedtime	init_time)rg   r  r   r   r  r  r   r$   r(   r   h  s   zMoriKVSender.__init__NrS  r<   r~  Optional[List[int]]c                 C  s   t | j| jt| }|  jt|7  _| j| jk}| jj| j||||r'| jnd d\}}| j	| |d ur?|| _
d| _d S d S )N)rz   T)r{  curr_idxrR   num_kv_indiceskv_mgrr  r   rz   r  ri  r  r  )rg   rS  r~  rz  r|  r@  r   r$   r$   r(   sendx  s   

zMoriKVSender.sendr!   r   c                 C  s"  | j d ur| j S | j| j}|tjkrGt | j }|| jjkrEd| j d|dd}| j	| j| | j
| jtj | | tjS |S |tjkrS|   tjS |  }|r|  rz|  }| j	| j| | j
| jtj | | tjS | tj tj| _ tjS |tjkrtjS |S )NRequest  timed out after .1fzs waiting for decode handshake)r  r  r   r   r   Bootstrappingr  r  r   r   r   r   _finalize_failure_all_transfers_finished_has_transfer_error_collect_failure_reason_notify_decoder  r  )rg   r   elapsedreasontransfers_doner$   r$   r(   poll  s8   




zMoriKVSender.pollr@   c                 C  s(   | j sdS | js
dS tdd | jD S )NFTc                 s  s    | ]}|   V  qd S rd   )
InProgressr&   r   r$   r$   r(   	<genexpr>  s    z7MoriKVSender._all_transfers_finished.<locals>.<genexpr>)r  r  allrf   r$   r$   r(   r    s
   z$MoriKVSender._all_transfers_finishedc                 C  s   t dd | jD S )Nc                 s  s    | ]}|  V  qd S rd   )r   r  r$   r$   r(   r    s    z3MoriKVSender._has_transfer_error.<locals>.<genexpr>)anyr  rf   r$   r$   r(   r    s   z MoriKVSender._has_transfer_errorc                 C  s*   | j D ]}| rd|    S qdS )NzKV transfer failed: z(KV transfer failed due to unknown reason)r  r   Message)rg   r   r$   r$   r(   r    s
   
z$MoriKVSender._collect_failure_reasonr   r   r  r   c                 C  s0   | j rd S | jr| j| j| j|| d| _ d S )NT)r  r  r  r  r   )rg   r   r   r$   r$   r(   r    s   
zMoriKVSender._notify_decodec                 C  sD   | j tjkrd S |d u r| jj| jd}| tj| tj| _ d S NzKV transfer failed)r  r   r   r  r   r   r   r  rg   r   r$   r$   r(   r    s   zMoriKVSender._finalize_failurec                 C  s   | j j| jd  d S rd   )r  r   r   r   rf   r$   r$   r(   clear  s   zMoriKVSender.clearc                 C  sb   | j d u r	|   |   | jj | jj| jd}W d    t|1 s(w   Y  t|r  )	r  r  r  r  r   r   r   r   r  r  r$   r$   r(   failure_exception  s   


zMoriKVSender.failure_exceptionc                 C  s.   d}| j | j| | tj| tj| _d S NzAborted by AbortReq.)r  r   r   r  r   r   r  rg   r  r$   r$   r(   abort  s   zMoriKVSender.abort)
r  r   r   r8   r   r6   r  r  r  r6   rd   )rS  r<   r~  r  r!   r   )r!   r@   rl   )r   r   r   r  r!   r   )r   r  r!   r   r  )rV   rW   rX   r   r  r  r  r  r  r  r  r  r  r  r  r$   r$   r   r(   r  g  s    

#




r  c                      s`   e Zd Z		dd  fd	d
Zdd Z		dd!ddZd"ddZd#ddZdd Zdd Z	  Z
S )$MoriKVReceiverNr  r   r   r8   r   r}  prefill_dp_rankc                   sz   t  |||| d | _d | _| jd u s| jd u rd S | jj| j 	| j | j| jj
| j< | j| jtj |   d S rd   )r   r   r  r  r   bootstrap_infosr  r   r   r  r   r   r   r   _register_kv_args)rg   r  r   r   r  r   r$   r(   r     s   zMoriKVReceiver.__init__c                 C  s  | j d u rd S | jj }t| jj}t| jj}t| jj}t| jj	j
d}t| jj	jd}t| jj	jd}t| jj	jd d}| j D ];}	| |	\}
}|( |
tdd| jjdt| jjd||||||||g W d    n1 sw   Y  qLd S )NrD   r   r   )r  r  r\   r%   r.   r   r   r   r8   rx   r`   r,   ra   rF  rC  _connect_to_bootstrap_serverr  r   r   	rank_port)rg   engine_desc_blobpacked_kv_descspacked_aux_descspacked_state_descsr`   ra   rb   r-  bootstrap_infosocklockr$   r$   r(   r    s>   

z MoriKVReceiver._register_kv_argsrS  r<   rz   r~  r  c                 C  s  | j d u s
| jd u rd S |jrtj|tjd nd}|d ur&t|dnd}d}| j D ]U}| 	|\}}	|
dd}
|	< |tt| jd| jjdt| jjd| jjjd|
sb|nd|
sg|nd|t| jdg	 W d    n1 s}w   Y  q-t | _d S )NrI   r#   rD   rA   F)r  r   rS   rN   asarrayrP   tobytesr8   r,   r  r   r  r   r  r   r  r\   re   r?   r  r  )rg   rS  rz   r~  kv_indices_bytes	aux_bytesstate_bytesr  r  r  rA   r$   r$   r(   init  s2   


zMoriKVReceiver.initr!   r   c                 C  s   | j d ur| j S | j| j}|tjtjfv r|| _ |S |tjkrV| jd urVt		 | j }|| jj
krVd| j d|dd}| j| j| | j| jtj tj| _ tjS |S )Nr  r  r  zs waiting for KV transfer)r  r  r   r   r   r  r   r   r  r  r   r   r   )rg   r   r  r  r$   r$   r(   r  >  s   
zMoriKVReceiver.pollr   c                 C  sV   | j d u rd S | jj| j d  | jj| j d  | jj| j d  | j| j  d S rd   )r   r  r   r   r  r   r   rf   r$   r$   r(   r  R  s   
zMoriKVReceiver.clearc                 C  sb   | j d u r	tj| _ |   | jj | jj| jd}W d    t	|1 s(w   Y  t	|r  )
r  r   r   r  r  r   r   r   r   r  r  r$   r$   r(   r  Z  s   


z MoriKVReceiver.failure_exceptionc                 C  sH   | j d u rd S d}| j| j | | j| j tj tj| _|   d S r  )r   r  r   r   r   r   r  r  r  r$   r$   r(   r  e  s   
zMoriKVReceiver.abortr  )r  r   r   r8   r   r}  r  r}  )rS  r<   rz   r}  r~  r  r  r  )rV   rW   rX   r   r  r  r  r  r  r  r  r$   r$   r   r(   r    s    #
"
r  c                   @  s   e Zd ZdS )MoriKVBootstrapServerN)rV   rW   rX   r$   r$   r$   r(   r  o  s    r  )r   r    r!   r"   )r/   r"   r!   r    )C
__future__r   ro   dataclassesloggingr   rx  r   r  collectionsr   typingr   r   r   r   r   r*   numpyrN   numpy.typingnptmori.cppr	   mori.ior
   r   r   r   r   r   r   r   #sglang.srt.disaggregation.base.connr   r   %sglang.srt.disaggregation.common.connr   r   r   r   &sglang.srt.disaggregation.common.utilsr   sglang.srt.disaggregation.utilsr   sglang.srt.server_argsr   sglang.srt.utils.commonr   r   r   r   	getLoggerrV   r   r   r.   r4   	dataclassr5   r[   rn   r   r   r  r  r  r$   r$   r$   r(   <module>   sV    (


+*     :  