o
    پi't                     @  sT  d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
mZmZmZ ddlZddlmZmZ ddlmZmZmZmZmZmZmZmZmZmZmZmZ dd	lm Z m!Z!m"Z"m#Z# dd
l$m%Z% ddl&m'Z'm(Z( ddl)m*Z* ddl+m,Z,m-Z-m.Z. e
rddl/m0Z0 ddl1m2Z2m3Z3 ddl&m4Z4 e5e6Z7dddZ8G dd dZ9G dd dZ:dS )a;  
Life cycle of a request in the prefill server

1. Bootstrap Queue
    a. Initialize a sender for each request
    b. Use the queue to store requests whose bootstrap (handshake and preallocation) has not finished
    c. Poll senders to check bootstrap state
    d. Once bootstrap is complete, move request to Waiting Queue

2. Waiting Queue
    a. Use PrefillAdder to pop requests
    b. Run forward
    c. Add the request to Inflight Queue

3. Inflight Queue
    a. Poll (non-blocking) the sender of the request
    b. Once the transfer has finished, return the request
    )annotationsN)deque)
HTTPStatus)TYPE_CHECKINGListOptionalType)BaseKVManagerKVPoll)FAKE_BOOTSTRAP_HOSTDisaggregationModeKVClassTypeMetadataBuffersReqToMetadataIdxAllocatorTransferBackendget_kv_classis_mla_backendkv_to_page_indiceskv_to_page_numpoll_and_all_reduceprepare_abort)FINISH_LENGTHReqRequestStageScheduleBatch)release_kv_cache)HybridLinearKVPoolNSATokenToKVPool)	SWAKVPool)trace_event_batchtrace_slicetrace_slice_end)ProcessGroup)GenerationBatchResult	Scheduler)KVCachereqr   	allocatorr   returnNonec                 C  s@   t | dr| jdur| jdkr|| j d| _dS dS dS dS )a\  
    Release the metadata buffer index allocated for a request in prefill disaggregation mode.

    This function safely releases the metadata buffer index if it was allocated.

    Args:
        req: The request object that may have a metadata_buffer_index allocated
        allocator: The ReqToMetadataIdxAllocator instance to free the index
    metadata_buffer_indexNr   )hasattrr*   free)r&   r'    r.   U/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/prefill.pyrelease_req_to_metadata_bufferA   s   


r0   c                   @  s\   e Zd ZdZd6ddZd7ddZd8d$d%Zd9d(d)Zd:d+d,Zd;d-d.Z		/	0d<d=d4d5Z
d0S )>PrefillBootstrapQueuez-
    Store the requests in bootstrapping
    token_to_kv_poolr%   draft_token_to_kv_poolOptional[KVCache]$req_to_metadata_buffer_idx_allocatorr   metadata_buffersr   tp_rankinttp_sizegpu_idbootstrap_port
gloo_groupr"   max_total_num_tokensdecode_tp_sizedecode_dp_size	schedulerr$   pp_rankpp_sizetransfer_backendr   c                 C  s   || _ || _t|| _|| _|| _|| _|| _|| _|| _|| _	|| _
|| _|| _g | _|	| _|
| _|| _|| _|  | _| jjjrOt| j| jjjj| _d S d S N)r2   r3   r   r6   r5   r7   r9   r>   r?   rA   rB   r:   r;   queuer<   r=   r@   rC   _init_kv_manager
kv_manager	tp_workeris_hybrid_swaminmodel_runnerswa_max_total_num_tokens)selfr2   r3   r5   r6   r7   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   r.   r.   r/   __init__[   s2   




zPrefillBootstrapQueue.__init__r(   r	   c                 C  s  t | jtj}| }| j|_| j|_| jj|_	| j
| j |_
| j|_| jj|_| j \}}}| jd urH| j \}}}||7 }||7 }||7 }||_||_||_| jsY| jj|_| jj|_| j \|_|_|_| jjj |_!| jj"|_"t#| jdr| j$ \}	}
}|	|_%|
|_&||_'t(| jt)rd|_*n0t(| jt+rd|_*t#| jdr| j, |_-nt(| jt.rd|_*nd|_*ng |_%g |_&g |_'d|_*t | jtj/}||t0j1| jj| j}|S )Nget_state_buf_infosswamambaget_state_dim_per_tensornsanone)2r   rC   r   KVARGSr7   engine_rankrA   r@   dp_ranksystem_dp_rankr>   r?   rB   prefill_pp_sizer2   start_layerprefill_start_layerget_contiguous_buf_infosr3   kv_data_ptrskv_data_lenskv_item_lensr   head_numkv_head_num	page_sizer6   get_buf_infosaux_data_ptrsaux_data_lensaux_item_lensserver_argsdisaggregation_ib_device	ib_devicer:   r,   rO   state_data_ptrsstate_data_lensstate_item_lens
isinstancer   
state_typer   rR   state_dim_per_tensorr   MANAGERr   PREFILL)rM   kv_args_classkv_argsr]   r^   r_   draft_kv_data_ptrsdraft_kv_data_lensdraft_kv_item_lensrj   rk   rl   kv_manager_classrG   r.   r.   r/   rF      sr   





z&PrefillBootstrapQueue._init_kv_managerr&   r   num_kv_headsr)   c                 C  s   |  |rd S |jtkrttjtj}nt| jtj}| j	g}|| j
|j d| j |j|| jd|_| | |tj | j| ttj|jdd d S )N:)mgrbootstrap_addrbootstrap_roomdest_tp_ranksrA   Tauto_next_anon) _check_if_req_exceed_kv_capacitybootstrap_hostr   r   r   FAKEr   SENDERrC   r7   rG   r;   r|   rA   disagg_kv_sender_process_reqadd_latencyr   PREFILL_PREPARErE   appendr!   rid)rM   r&   rx   kv_sender_classr}   r.   r.   r/   add   s"   


zPrefillBootstrapQueue.addreqs	List[Req]c                 C  s   |D ]}|  || qd S rD   )r   )rM   r   rx   r&   r.   r.   r/   extend   s   zPrefillBootstrapQueue.extendboolc                 C  sd   t |j| jkr0d|j dt |j d| j }t| t||tjd | j	
|g|j dS dS )NzRequest z' exceeds the maximum number of tokens: z > status_codeTF)lenorigin_input_idsr=   r   loggererrorr   r   BAD_REQUESTr@   stream_outputreturn_logprob)rM   r&   messager.   r.   r/   r      s    
z6PrefillBootstrapQueue._check_if_req_exceed_kv_capacityc                 C  s   d|j _dS )zW
        Set max_new_tokens = 1, so PrefillAdder memory estimation is accurate
           N)sampling_paramsmax_new_tokens)rM   r&   r.   r.   r/   r      s   z"PrefillBootstrapQueue._process_reqFNreturn_failed_reqsrids_to_checkOptional[List[str]]c                   s  g }g }t   t| jdkr|du rg S g g fS tdd | jD | j}tt| j|D ]\}\}}|dur<|j|vr<q,|tj	krBq,|tj
krd| j d|jd|j}	z|j  W n tyu }
 z|	d	|
 7 }	W Y d}
~
nd}
~
ww t|	 t||	tjd
 | j|g|j  | || | jjr| jj  | jjr| jj|j q,t|j }| j!" dkr n<| j!# |_$|j$dusJ t%|| j&j'}|j(||j$ ||  | t)* |j+_,|-t.j/ t0t.j/|jdd q, fddt| jD | _|du r|S ||fS )a  
        pop the reqs which has finished bootstrapping

        return_failed_reqs: For PP, on rank 0, also return the failed reqs to notify the next rank
        rids_to_check: For PP, on rank > 0, check the rids from the previous rank has consensus with the current rank.
        r   Fc                 S     g | ]}|j qS r.   r   .0r&   r.   r.   r/   
<listcomp>      z:PrefillBootstrapQueue.pop_bootstrapped.<locals>.<listcomp>Nz*Prefill bootstrap failed for request rank=	 req.rid= req.bootstrap_room= with exception r   Tr~   c                   s   g | ]
\}}| vr|qS r.   r.   )r   ientryindices_to_remover.   r/   r   I  s    )1setr   rE   r   r<   	enumeratezipr   r
   BootstrappingFailedr7   r|   r   failure_exception	Exceptionr   r   r   r   INTERNAL_SERVER_ERRORr@   r   r   r   r   enable_metricsmetrics_collectorincrement_bootstrap_failed_reqsenable_hicache_storage
tree_cacherelease_aborted_requestr   r5   available_sizeallocr*   r   r2   rb   inittimeperf_counter
time_statswait_queue_entry_timer   r   PREFILL_BOOTSTRAPr!   )rM   r   r   bootstrapped_reqsfailed_reqspollsr   r&   pollerror_messageenum_kv_indices	num_pagesr.   r   r/   pop_bootstrapped   sr   











z&PrefillBootstrapQueue.pop_bootstrapped) r2   r%   r3   r4   r5   r   r6   r   r7   r8   r9   r8   r:   r8   r;   r8   r<   r"   r=   r8   r>   r8   r?   r8   r@   r$   rA   r8   rB   r8   rC   r   )r(   r	   )r&   r   rx   r8   r(   r)   )r   r   rx   r8   r(   r)   )r&   r   r(   r   )r&   r   r(   r)   FN)r   r   r   r   r(   r   )__name__
__module____qualname____doc__rN   rF   r   r   r   r   r   r.   r.   r.   r/   r1   V   s    

.
J


	r1   c                   @  sz   e Zd ZdZd'ddZe d(d	d
Ze d(ddZd)ddZ		d*d+ddZ
d,ddZd(ddZ		d-d.d%d&ZdS )/#SchedulerDisaggregationPrefillMixinz>
    Mixin for Scheduler to handle disaggregation prefill
    rM   r$   r(   Optional[ScheduleBatch]c                 C  s6   d| j _|   |  }| |}|rtd|j |S )NFschedule)running_batchbatch_is_fullprocess_prefill_chunkget_new_batch_prefillmaybe_prepare_mlp_sync_batchr   r   )rM   batchr.   r.   r/   $get_next_disagg_prefill_batch_to_runX  s   
zHSchedulerDisaggregationPrefillMixin.get_next_disagg_prefill_batch_to_runr)   c                 C  sh   	 |   }| | | j| j  |  }|| _|r(| |}| 	|| n| 
  |   || _q)zBA normal scheduler loop for prefill worker in disaggregation mode.)recv_requestsprocess_input_requestswaiting_queuer   disagg_prefill_bootstrap_queuer   r   	cur_batch	run_batch#process_batch_result_disagg_prefillself_check_during_idle%process_disagg_prefill_inflight_queue
last_batch)rM   	recv_reqsr   resultr.   r.   r/    event_loop_normal_disagg_prefilli  s   

zDSchedulerDisaggregationPrefillMixin.event_loop_normal_disagg_prefillc                 C  s   t  | _	 |  }| | | j| j  |  }|| _	|r0| 
|}| j| |f nd }| jrC| j \}}| || n|d u rK|   |   | | || _qrD   )r   result_queuer   r   r   r   r   r   r   r   r   r   copyr   popleftr   r   r   launch_batch_sample_if_needed)rM   r   r   batch_result	tmp_batch
tmp_resultr.   r.   r/   !event_loop_overlap_disagg_prefill  s,   


zESchedulerDisaggregationPrefillMixin.event_loop_overlap_disagg_prefillr   r   r   r#   c                 C  s  |j |j|j|j|jf\}}}}}|dur|  d}|j }|jr;|jdur.|j |_|j	dur;t
|j	 |_	tt|j|ddD ]\}	\}
}|
jdkr|
jjdkr^t |
j_|
j| | j|
 |
tj ttj|
jdd | j|
 | j r|jdur|jj|	 |
_ |jj!|	 |
_"|jj#|	 $ % |
_&nd|
_&|
jr|dusJ |dusJ ||	 }||	 }|| }| '|	|
|||| ||7 }| j(|
dd t) |
j_*|
j+durz|
j+,| W n- t-y } z d|
j d	| d
| }t.|
| j t/|
|t0j1d W Y d}~nd}~ww |
2 |
j+_2qE|
 jd8  _|
jrJ||	 }||	 }||k rJ|| }| j3|	|
|||dd ||7 }| j4rW| j(|
d|
j5d ttj6|
jdd qE| 7  dS )z
        Transfer kv for prefill completed requests and add it into disagg_prefill_inflight_queue
        Adapted from process_batch_result_prefill
        Nr   T)strictg        r~   )
last_chunkz$Grammar accept_token failed for req z with token z: r   r   F)last_prefill_chunk)r   end_idx)8logits_outputnext_token_idsextend_input_len_per_req extend_logprob_start_len_per_req	copy_donesynchronizetolistr   next_token_logprobsinput_token_logprobstupler   r   r   
is_chunkedr   prefill_finished_tsr   
output_idsr   r   cache_unfinished_reqr   r   PREFILL_FORWARDr    r   disagg_prefill_inflight_queuespec_algorithmis_eagle	spec_infotopk_poutput_topk_p
topk_indexoutput_topk_indexhidden_statescpuclonehidden_states_tensoradd_logprob_return_valuessend_kv_chunkr   !prefill_transfer_queue_entry_timegrammaraccept_token
ValueErrorr   r   r   r   finishedadd_input_logprob_return_valuesenable_overlaptmp_end_idxPREFILL_CHUNKED_FORWARDmaybe_send_health_check_signal)rM   r   r   r   r   r   r   r   
logprob_ptr   r&   next_token_idextend_logprob_start_lenextend_input_lennum_input_logprobsr   r   r.   r.   r/   r     s   	





zGSchedulerDisaggregationPrefillMixin.process_batch_result_disagg_prefillNr   r   r   c           	      C  s  t | jdkr	g S g }tdd | jD | j}g }t| j|D ]\}}|dur>|j|vr2|| q|tjks>|tj	ks>J |tj
tjfv rL|| q|tjkrnt|| j tdd|_t|jdrh|j  || q|tj	krd| j d|jd	|j}z|j  W n ty } z|d
| 7 }W Y d}~nd}~ww t| t|| j t||tjd || | jr| j  qJ d||D ]}t  |j!_"q| #|t$dd |D d |D ]}|%t&j' t(|| j) t*t&j'|jdd q|| _|S )z
        Poll the requests in the middle of transfer. If done, return the request.
        rids_to_check: For PP, on rank > 0, check the rids from the previous rank has consensus with the current rank.
        r   c                 S  r   r.   r   r   r.   r.   r/   r   3  r   z]SchedulerDisaggregationPrefillMixin.process_disagg_prefill_inflight_queue.<locals>.<listcomp>N)lengthclearz)Prefill transfer failed for request rank=r   r   r   r   FzUnexpected polling state poll=c                 s  s    | ]}|j V  qd S rD   )r   r   r.   r.   r/   	<genexpr>b  s    z\SchedulerDisaggregationPrefillMixin.process_disagg_prefill_inflight_queue.<locals>.<genexpr>T)thread_finish_flag)+r   r  r   attn_tp_cpu_groupr   r   r   r
   Successr   WaitingForInputTransferringr   r   r   finished_reasonr,   r   r!  r7   r|   r   r   r   warningr   r   r   r   r   increment_transfer_failed_reqsr   r   r   completion_timer   anyr   r   PREFILL_TRANSFER_KV_CACHEr0   r5   r    )	rM   r   	done_reqsr   undone_reqsr&   r   r   r   r.   r.   r/   r   &  sr   








zISchedulerDisaggregationPrefillMixin.process_disagg_prefill_inflight_queue	List[str]c                 C  sV   t dd | jD | j}g }t| j|D ]\}}|tjks"|tjkr(||j q|S )zI
        Used by PP, get the transferred rids but **do not pop**
        c                 S  r   r.   r   r   r.   r.   r/   r   x  r   zLSchedulerDisaggregationPrefillMixin.get_transferred_rids.<locals>.<listcomp>)	r   r  r$  r   r
   r%  r   r   r   )rM   r   transferred_ridsr&   r   r.   r.   r/   get_transferred_ridss  s   z8SchedulerDisaggregationPrefillMixin.get_transferred_ridsc                 C  s   t  }| jr2|| j | jj| jdd | jr(tt| jjt| jj	| j_
n| | j d| j_| jra| jj rc| jjrF|| jj | j }| jjt|d | j |k red| j_d S d S d S d S )NT)chunkedF)chunked_req_to_exclude)r   chunked_reqr   r   r  r  rJ   r   fill_idsr   r  r  r   r   r   forward_mode	is_extend
batch_sizefilter_batchlist)rM   r4  last_bsr.   r.   r/   r     s,   




z9SchedulerDisaggregationPrefillMixin.process_prefill_chunkFr&   r   r   r   r   Optional[int]c                 C  s  | j j}|j}|dur|n
tt|jt|j}|s |||  }| jj|j	||f 
  }||_d}|r| j| t| j  trQ| jj|j	 
  g}n]t| j  trt|j}| j}	td||	 }
|
| | }
| jj|j	|
|f }| j |}|
  }t||}n#t| j  trt|j}| jj|j	d|f }|
  }t||}t||}t|dkrtd|jd|jd dS |j|| dS )z=
        Send a prefilled chunk to the decode server
        Nr   z*Skip sending kv chunk for request req.rid=r   z because page_indices is empty)token_to_kv_pool_allocatorrb   start_send_idxrJ   r   r6  r   req_to_token_poolreq_to_tokenreq_pool_idxr  numpydisagg_metadata_buffersset_bufrm   get_kvcacher    req_index_to_mamba_index_mappingr   sliding_window_sizemaxtranslate_loc_from_full_to_swar   r   r   infor   r|   r   send)rM   r&   r   r   rb   	start_idx
kv_indicesstate_indicesseq_lenwindow_sizewindow_startwindow_kv_indices_fullwindow_kv_indices_swakv_indices_fullpage_indicesr.   r.   r/   r    sn   	





z1SchedulerDisaggregationPrefillMixin.send_kv_chunk)rM   r$   r(   r   )rM   r$   r(   r)   )rM   r$   r   r   r   r#   r(   r)   rD   )rM   r$   r   r   r(   r   )rM   r$   r(   r0  r   )
rM   r$   r&   r   r   r   r   r=  r(   r)   )r   r   r   r   r   torchno_gradr   r   r   r   r2  r   r  r.   r.   r.   r/   r   S  s    

'z
M
r   )r&   r   r'   r   r(   r)   );r   
__future__r   loggingr   collectionsr   httpr   typingr   r   r   r   rW  sglang.srt.disaggregation.baser	   r
   sglang.srt.disaggregation.utilsr   r   r   r   r   r   r   r   r   r   r   r   "sglang.srt.managers.schedule_batchr   r   r   r   sglang.srt.mem_cache.commonr    sglang.srt.mem_cache.memory_poolr   r   $sglang.srt.mem_cache.swa_memory_poolr   sglang.srt.tracing.tracer   r    r!   torch.distributedr"   sglang.srt.managers.schedulerr#   r$   r%   	getLoggerr   r   r0   r1   r   r.   r.   r.   r/   <module>   s0    8

 ~