o
    پi                     @  s  d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZmZmZmZmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' ddl(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7m8Z8m9Z9m:Z:m;Z; ddl<m=Z= ddl>m?Z?m@Z@ ddlAmBZB ddlCmDZD eEeFZGerddl*mHZH ddlImJZJ eBddZKG dd dZLG dd de8ZMeG d d! d!ZNG d"d# d#ZOG d$d% d%ZPG d&d' d'ZQdS )(au  
Life cycle of a request in the decode server

1. PreallocQueue:
    a. Initialize a receiver for each request
    b. The request handshakes first, and pre-allocate kv once there is available kv.
    c. Move the request to TransferQueue.

2. TransferQueue:
    a. Poll the receiver to check the transfer state
    b. If the transfer has finished, move the request to waiting queue

3. WaitingQueue:
    a. Use the requests in the queue to construct a PrebuiltExtendBatch
    b. Skip the prefill forward but only populate metadata

4. RunningBatch:
    a. Merge the resolved PrebuiltExtendBatch into running batch to run decoding
    )annotationsN)deque)	dataclass)
HTTPStatus)TYPE_CHECKINGListOptionalTupleType)ProcessGroup)Mamba2CacheParams)GPU_MEMORY_TYPE_KV_CACHE)BaseKVManagerBaseKVReceiverKVPoll)FAKE_BOOTSTRAP_HOSTDisaggregationModeKVClassTypeMetadataBuffersReqToMetadataIdxAllocatorTransferBackendget_kv_classis_mla_backendkv_to_page_indicespoll_and_all_reduceprepare_abort)get_attention_tp_size)FINISH_ABORTRequestStageScheduleBatch)GenerationBatchResult)BaseTokenToKVPoolAllocator)BasePrefixCache)release_kv_cache)HybridLinearKVPoolHybridReqToTokenPoolKVCacheNSATokenToKVPoolReqToTokenPool)	SWAKVPool)trace_event_batchtrace_slice_end)get_int_env_var)TorchMemorySaverAdapter)Req)	Scheduler%SGLANG_CLIP_MAX_NEW_TOKENS_ESTIMATIONi   c                   @  sF   e Zd ZdZdd
dZdd Zdd ZdddZdddZdd Z	dS ) DecodeReqToTokenPoola  
    The difference of DecodeReqToTokenPool and ReqToTokenPool is that
    DecodeReqToTokenPool subscribes memory for pre-allocated requests.

    In ReqToTokenPool, if `--max-running-requests` is 8,
    #pre-allocated + #transfer + #running <= 8, but there are in fact more memory can carry pre-allocated requests.

    In DecodeReqToTokenPool, if `--max-running-requests` is 8,
    #running <= 8, #pre-allocated + #transfer <= pre_alloc_size, so we can use the free memory to pre-allocate requests to unblock prefill.
    sizeintmax_context_lendevicestrenable_memory_saverboolpre_alloc_sizec                 C  s   t j|d}|| _|| _|| _|| _|jtd tj	|| |ftj
|d| _W d    n1 s1w   Y  tt|| | _d S )N)enable)tagdtyper5   )r-   creater2   r4   r5   r9   regionr   torchzerosint32req_to_tokenlistrange
free_slots)selfr2   r4   r5   r7   r9   memory_saver_adapter rI   T/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/decode.py__init__X   s   

zDecodeReqToTokenPool.__init__c                 C  s   || j |< d S N)rC   )rG   indicesvaluesrI   rI   rJ   writeq   s   zDecodeReqToTokenPool.writec                 C  s
   t | jS rL   )lenrF   rG   rI   rI   rJ   available_sizet   s   
z#DecodeReqToTokenPool.available_sizereqsList['Req']returnOptional[List[int]]c                   s   dd t  D }t|dksJ dt fdd|D s"J dt t| }|t| jkr3d S | jd | }| j|d  | _d} D ]}|jd u rV|| |_|d7 }qFd	d  D S )
Nc                 S  s   g | ]\}}|j d ur|qS rL   req_pool_idx).0irrI   rI   rJ   
<listcomp>x   s    z.DecodeReqToTokenPool.alloc.<locals>.<listcomp>   z:only one chunked request may reuse req_pool_idx in a batchc                 3  s,    | ]} | j d kp | jd kV  qdS )r   N)
is_chunkedkv_committed_len)rY   rZ   rS   rI   rJ   	<genexpr>|   s    
z-DecodeReqToTokenPool.alloc.<locals>.<genexpr>z+request has req_pool_idx but is not chunkedr   c                 S     g | ]}|j qS rI   rW   rY   r[   rI   rI   rJ   r\          )	enumeraterP   allrF   rX   )rG   rS   chunked	need_sizeselect_indexoffsetr[   rI   r`   rJ   allocw   s*   


zDecodeReqToTokenPool.allocreq'Req'c                 C  s*   |j d us	J d| j|j  d |_ d S )Nzrequest must have req_pool_idx)rX   rF   append)rG   rl   rI   rI   rJ   free   s   
zDecodeReqToTokenPool.freec                 C  s   t t| j| j | _d S rL   )rD   rE   r2   r9   rF   rQ   rI   rI   rJ   clear   s   zDecodeReqToTokenPool.clearN)
r2   r3   r4   r3   r5   r6   r7   r8   r9   r3   )rS   rT   rU   rV   )rl   rm   )
__name__
__module____qualname____doc__rK   rO   rR   rk   ro   rp   rI   rI   rI   rJ   r1   L   s    


r1   c                   @  s"   e Zd Z	ddddZdd ZdS )HybridMambaDecodeReqToTokenPoolNr2   r3   r4   r5   r6   r7   r8   cache_params'Mamba2CacheParams'speculative_num_draft_tokensenable_mamba_extra_bufferr9   
mamba_sizec
                 C  sh   t j| |||||d |d u rdnd| _|| _|| _|	d ur |	n|| }
| j|
|| ||| j|d d S )N)r2   r4   r5   r7   r9      r]   )r2   mamba_spec_state_sizerv   r5   ry   rx   )r1   rK   !mamba_ping_pong_track_buffer_sizery   r7   _init_mamba_pool)rG   r2   r4   r5   r7   rv   rx   ry   r9   rz   effective_mamba_sizerI   rI   rJ   rK      s.   	
z(HybridMambaDecodeReqToTokenPool.__init__c                 C  s$   t t| j| j | _| j  d S rL   )rD   rE   r2   r9   rF   
mamba_poolrp   rQ   rI   rI   rJ   rp      s   z%HybridMambaDecodeReqToTokenPool.clearrL   )r2   r3   r4   r3   r5   r6   r7   r8   rv   rw   rx   r3   ry   r8   r9   r3   rz   r3   )rq   rr   rs   rK   rp   rI   rI   rI   rJ   ru      s    %ru   c                   @  sD   e Zd ZU ded< ded< dZded< dZd	ed
< edddZdS )DecodeRequestr.   rl   r   kv_receiverFr8   waiting_for_inputr3   metadata_buffer_indexrU   c                 C  s   | j jS rL   )rl   seqlenrQ   rI   rI   rJ   r      s   zDecodeRequest.seqlenN)rU   r3   )rq   rr   rs   __annotations__r   r   propertyr   rI   rI   rI   rJ   r      s   
 r   c                   @  s   e Zd ZdZdId d!ZdJd$d%ZdKdLd,d-ZdMd.d/ZdKdNd2d3Z	4dOdPd7d8Z		4dOdQd9d:Z
	4dOdRd<d=Zed>d? Z	@dSdTdDdEZdUdGdHZd4S )VDecodePreallocQueuez4
    Store the requests that are preallocating.
    req_to_token_poolr(   token_to_kv_pool_allocatorr!   draft_token_to_kv_poolOptional[KVCache]$req_to_metadata_buffer_idx_allocatorr   metadata_buffersr   	schedulerr/   transfer_queueDecodeTransferQueue
tree_cacher"   
gloo_groupr   tp_rankr3   tp_sizedp_sizegpu_idbootstrap_portmax_total_num_tokensprefill_pp_sizepp_ranknum_reserved_decode_tokenstransfer_backendr   c                 C  s   || _ || _| | _|| _t| j| _|| _|| _|| _|| _	|| _
|	| _|
| _|| _|| _|| _|| _|| _|| _|| _|| _|| _g | _g | _|| _|  | _| jjjrdt| j| jjjj| _d S d S rL   )r   r   get_kvcachetoken_to_kv_poolr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   queueretracted_queue_init_kv_manager
kv_manager	tp_workeris_hybrid_swaminmodel_runnerswa_max_total_num_tokens)rG   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rI   rI   rJ   rK      s>   




zDecodePreallocQueue.__init__rU   r   c                 C  s  t | jtj}| }t }| j| |_||_| j|_| j	j
|_| j|_| j \}}}| jd urD| j \}}}	||7 }||7 }||	7 }||_||_||_| jj|_| j \|_|_|_t| jdr| j \}
}}|
|_||_||_t| jtr~d|_ n0t| jt!rd|_ t| jdr| j" |_#nt| jt$rd|_ nd|_ ng |_g |_g |_d|_ | j	j%j&|_'| j	j(|_(t | jtj)}||t*j+| j	j%| j,}|S )Nget_state_buf_infosswamambaget_state_dim_per_tensornsanone)-r   r   r   KVARGSr   r   engine_rankdecode_tp_sizer   r   dp_ranksystem_dp_rankr   r   get_contiguous_buf_infosr   kv_data_ptrskv_data_lenskv_item_lens	page_sizer   get_buf_infosaux_data_ptrsaux_data_lensaux_item_lenshasattrr   state_data_ptrsstate_data_lensstate_item_lens
isinstancer)   
state_typer$   r   state_dim_per_tensorr'   server_argsdisaggregation_ib_device	ib_devicer   MANAGERr   DECODEr   )rG   kv_args_classkv_argsattn_tp_sizer   r   r   draft_kv_data_ptrsdraft_kv_data_lensdraft_kv_item_lensr   r   r   kv_manager_classr   rI   rI   rJ   r   
  sn   



z$DecodePreallocQueue._init_kv_managerFrl   r.   is_retractedr8   Nonec                 C  s   |  |rdS |rd|_| j| dS |jtks%|jdu r-| jjjdkr-t	t
jtj}nt	| jtj}|| j|j d|j |j|jd}|tj ttj|jdd | jt||dd dS )	#Add a request to the pending queue.Nfake:)mgrbootstrap_addrbootstrap_roomprefill_dp_rankTauto_next_anonF)rl   r   r   ) _check_if_req_exceed_kv_capacityretraction_mb_idr   rn   bootstrap_hostr   r   r   disaggregation_transfer_backendr   r   FAKEr   RECEIVERr   r   r   r   data_parallel_rankadd_latencyr   DECODE_PREPAREr+   ridr   r   )rG   rl   r   kv_receiver_classr   rI   rI   rJ   addS  s2   


zDecodePreallocQueue.addc                 C  sd   t |j| jkr0d|j dt |j d| j }t| t||tjd | j	
|g|j dS dS )NzRequest z' exceeds the maximum number of tokens: z > status_codeTF)rP   origin_input_idsr   r   loggererrorr   r   BAD_REQUESTr   stream_outputreturn_logprob)rG   rl   messagerI   rI   rJ   r   v  s    
z4DecodePreallocQueue._check_if_req_exceed_kv_capacityrS   	List[Req]c                 C  s   |D ]	}| j ||d qdS )r   )r   N)r   )rG   rS   r   rl   rI   rI   rJ   extend  s   zDecodePreallocQueue.extendNrids_to_checkOptional[List[str]]c                   s   g }t   | jdd}t| jD ]H\}}|d ur|j|vrq| j dkr' n2t|jt|j	 | j
 }||kr: n||  | d|_| | ||8 }|| j| j q fddt| jD | _|S )NF)count_retractedr   c                      g | ]
\}}| vr|qS rI   rI   rY   rZ   entryindices_to_removerI   rJ   r\     s
    z=DecodePreallocQueue.resume_retracted_reqs.<locals>.<listcomp>)set_allocatable_tokensre   r   r   r   rR   rP   r   
output_idsr   rn   r   r   
_pre_allocload_kv_cacher   )rG   r   resumed_reqsallocatable_tokensrZ   rl   required_tokens_for_requestrI   r   rJ   resume_retracted_reqs  s4   



z)DecodePreallocQueue.resume_retracted_reqsc                 C  s:  | j sd S tdd | j D rd S tdd | j D | j}tt| j |D ]u\}\}}|d ur6|jj|vr6q%|tj	kr<q%|tj
krEd|_q%|tjkrd| j d|jjd|jj}z|j  W n tyz } z|d	| 7 }W Y d }~nd }~ww t| t|j|tjd
 | jjr| jj  q%td| d S )Nc                 s  s    | ]}|j V  qd S rL   )r   rY   
decode_reqrI   rI   rJ   ra     s    z@DecodePreallocQueue._update_handshake_waiters.<locals>.<genexpr>c                 S  rb   rI   r   r  rI   rI   rJ   r\     rd   zADecodePreallocQueue._update_handshake_waiters.<locals>.<listcomp>Tz)Decode handshake failed for request rank= decode_req.req.rid= decode_req.req.bootstrap_room= with exception r   Unexpected poll case: )r   rf   r   r   re   ziprl   r   r   BootstrappingWaitingForInputr   Failedr   r   r   failure_exception	Exceptionr   r   r   r   INTERNAL_SERVER_ERRORr   enable_metricsmetrics_collectorincrement_bootstrap_failed_reqs
ValueError)rG   r   pollsrZ   r  pollerror_messageerI   rI   rJ   _update_handshake_waiters  sB   


 
z-DecodePreallocQueue._update_handshake_waiters/Tuple[List[DecodeRequest], List[DecodeRequest]]c                   s  |  | g }g }t  tdd | jjjD }| j|dd}t| jD ]+\}}|dur3|j	j
|vr3q$t|j	jtrO| j|j	g|j	j ||  | q$t| jD ]&\}}|dure|j	j
|vreqU| v rjqU|jsnqU| j dkrx n| j dkr nt|j	j}|| j }	t|	|t|j	jjt | |kr n|	|kr n||	8 }| |j	 | jj|j	j  dt|j	j ! " }
| j#j$}t| j%t&r| jj'|j	j  ! " g}nct| j%t(rt|j	j}| jj)}td|| }|| | }| jj|j	j ||f }| j#*|}|! " }t+||}n't| j%t,r<t|j	j}| jj|j	j d|f }|! " }t+||}nd}| j- |_.|j.dusLJ t+|
|}|j/0||j.| ||  | t12 |j	j3_4|j	5t6j7 t8t6j7|j	j
dd qU fdd	t| jD | _||fS )
z<Pop the preallocated requests from the pending queue (FIFO).c                 s  s$    | ]}t |jt |j V  qd S rL   )rP   r   r   rc   rI   rI   rJ   ra     s
    
z7DecodePreallocQueue.pop_preallocated.<locals>.<genexpr>T)retractable_tokensr   Nr   r   c                   r   rI   rI   r   r   rI   rJ   r\   _      z8DecodePreallocQueue.pop_preallocated.<locals>.<listcomp>)9r  r   sumr   running_batchrS   r   re   r   rl   r   r   finished_reasonr   r   r   rn   r   r   r   rR   r   rP   r   r   maxr   sampling_paramsmax_new_tokensCLIP_MAX_NEW_TOKENr   rC   rX   cpunumpyr   r   r   r$    req_index_to_mamba_index_mappingr)   sliding_window_sizetranslate_loc_from_full_to_swar   r'   rk   r   r   inittimeperf_counter
time_stats decode_transfer_queue_entry_timer   r   DECODE_BOOTSTRAPr+   )rG   r   failed_reqspreallocated_reqsr  r  rZ   r  origin_input_lenr  
kv_indicesr   state_indicesseq_lenwindow_sizewindow_startwindow_kv_indices_fullwindow_kv_indices_swakv_indices_fullpage_indicesrI   r   rJ   pop_preallocated  s   


	



z$DecodePreallocQueue.pop_preallocatedc                 C  s   t dd | jjD S )Nc                 s  s    | ]	}t |jjV  qd S rL   )rP   rl   fill_idsr  rI   rI   rJ   ra   g  s    
z?DecodePreallocQueue.num_tokens_pre_allocated.<locals>.<genexpr>)r  r   r   rQ   rI   rI   rJ   num_tokens_pre_allocatede  s   z,DecodePreallocQueue.num_tokens_pre_allocatedTr  Optional[int]r   c                   s    d urt jjjdkrt fddjjjD nd}j }|tjt jjjt jj	 t jj
  | }jjrTjjj rT|jt jjj 8 }|rd|tfddjD 8 }|S )Nr   c                   s(   g | ]}t |jjtt|j   qS rI   )r   r"  r#  r$  rP   r   )rY   x)r  rI   rJ   r\   p  s    z;DecodePreallocQueue._allocatable_tokens.<locals>.<listcomp>c                   s&   g | ]}t |jt |j  j qS rI   )rP   r   r   r   )rY   rl   rQ   rI   rJ   r\     s    )rP   r   r  rS   r!  r   rR   r   r   r   waiting_queue
last_batchforward_modeis_prebuiltr  r   )rG   r  r   need_space_for_single_reqrR   r  rI   )r  rG   rJ   r   k  sD   





z'DecodePreallocQueue._allocatable_tokenstorch.Tensorc              
   C  s  | j |g}|dusJ dt|jtt|jd d }||_||_| jj	dkr1| j|}n6| jj
}| jjtjdgtj|dtjdgtjdtj|gtj|dtj|gtjdtjdgtj|d|d}|dusoJ d	| j |jtdt|f| |j|j |_|t|j |S )
z:Pre-allocate the memory for req_to_token and token_kv_poolNz>req_pool_indices is full! There is a bug in memory estimation.r]   r   r<   )r=   r   )prefix_lensprefix_lens_cpuseq_lensseq_lens_cpulast_locextend_num_tokensz6KV cache is full! There is a bug in memory estimation.)r   rk   rP   r   r!  r   kv_allocated_lenr_   r   r   r5   alloc_extendr@   tensorint64rO   rX   slicer=  set_extend_input_len)rG   rl   req_pool_indicesfill_lenkv_locr5   rI   rI   rJ   r     s2   


zDecodePreallocQueue._pre_alloc)&r   r(   r   r!   r   r   r   r   r   r   r   r/   r   r   r   r"   r   r   r   r3   r   r3   r   r3   r   r3   r   r3   r   r3   r   r3   r   r3   r   r3   r   r   )rU   r   )F)rl   r.   r   r8   rU   r   )rl   r.   rU   r8   )rS   r   r   r8   rU   r   rL   r   r   rU   r   )r   r   rU   r   )r   r   rU   r  )NT)r  r?  r   r8   rU   r3   )rl   r.   rU   rF  )rq   rr   rs   rt   rK   r   r   r   r   r  r  r<  r   r>  r   r   rI   rI   rI   rJ   r      s&    

8I
#	+' 
2r   c                   @  sD   e Zd ZdZd#ddZd$ddZd%ddZd&ddZd'd(d!d"ZdS ))r   z/
    Store the requests that is polling kv
    r   r   r   r   r   r3   r   r   r   r/   r   r"   c                 C  s6   g | _ || _|| _|| _|| _|| _|| _|j| _d S rL   )r   r   r   r   r   r   r   spec_algorithm)rG   r   r   r   r   r   r   rI   rI   rJ   rK     s   	zDecodeTransferQueue.__init__r  r   rU   r   c                 C     | j | d S rL   )r   rn   )rG   r  rI   rI   rJ   r        zDecodeTransferQueue.adddecode_reqsList[DecodeRequest]c                 C  rX  rL   )r   r   )rG   rZ  rI   rI   rJ   r     rY  zDecodeTransferQueue.extendr8   c              
   C  s  |j }| j|\
}}}}}}}	}
}}|d  }|jjdur#|jjnd}|jjtks8|jjdu r9| jj	j
dkr9n3|dkr?dS ||krld|jj d| d| d| d		}t| t|jd
tjd |j  d|_dS |jj|d   |d  |j_| j s|	|j_|
|j_||j_|jjr|jj|d   |jj|d   |jj|d|jj    |jj!|d|jj    |j  d|_t"t#j$|jjdd t%& |jj'_(dS )z
        Returns:
            True if the request should be removed from the queue (success or corruption)
            False if metadata not ready yet (keep in queue for next poll)
        r   Nr   Fz%Context corruption detected: Request z (bootstrap_room=z() received metadata from bootstrap_room=z. Metadata buffer index: z1. This indicates metadata buffer index collision.z6Metadata corruption detected - bootstrap_room mismatchr   Tr   ))r   r   get_bufitemrl   r   r   r   r   r   r   r   r   r   r   r   r  r   rp   r   rn   cached_tokensrW  is_noneoutput_topk_poutput_topk_indexhidden_states_tensorr   output_token_logprobs_valoutput_token_logprobs_idxoutput_top_logprobs_valtop_logprobs_numtolistoutput_top_logprobs_idxr+   r   DECODE_TRANSFERREDr+  r,  r-  wait_queue_entry_time)rG   r  idx	output_idr^  rc  rd  re  rh  r`  ra  output_hidden_statesoutput_bootstrap_roomactual_roomexpected_room	error_msgrI   rI   rJ   _commit_transfer_to_req  s   






z+DecodeTransferQueue._commit_transfer_to_reqNr   r   r   c                   s.  | j sg S tdd | j D | j}g }t  tt| j |D ]\}\}}|d ur/|jj|vr/q|tj	krd| j
 d|jjd|jj}z|j  W n tyd } z|d| 7 }W Y d }~nd }~ww t| t|j|tjd | j|jg|jj t|j| jdd	  | | jjr| jj  q|tjkr| |}	|	rԈ | t|jj t!r| j|jg|jj t|j| jdd	 | jjr| jj  q|"|j q|tj#tj$tj%fv rqt&d
|  D ]}| j | j'}
|
dksJ | j | j(t)j* | j+,|
 q fddt| j D | _ |S )Nc                 S  rb   rI   r  r  rI   rI   rJ   r\   >  rd   z7DecodeTransferQueue.pop_transferred.<locals>.<listcomp>z(Decode transfer failed for request rank=r  r  r	  r   F)	is_insertr
  r   c                   r   rI   rI   r   r   rI   rJ   r\   z  r  )-r   r   r   r   re   r  rl   r   r   r  r   r   r   r  r  r   r   r   r   r  r   r   r   r#   r   r   r  r  increment_transfer_failed_reqsSuccessrr  r   r   r   rn   r  r  Transferringr  r   r   r   ri  r   ro   )rG   r   r  transferred_reqsrZ   r  r  r  r  should_removerk  rI   r   rJ   pop_transferred:  s   
 






z#DecodeTransferQueue.pop_transferred)r   r   r   r   r   r3   r   r   r   r/   r   r"   )r  r   rU   r   )rZ  r[  rU   r   )r  r   rU   r8   rL   rV  )	rq   rr   rs   rt   rK   r   r   rr  ry  rI   rI   rI   rJ   r     s    



Zr   c                   @  sX   e Zd Ze dddZe dddZdddZdddZdddZ	dddZ
dS )"SchedulerDisaggregationDecodeMixinrG   r/   c                 C  sV   	 |   }| | |   |  }|| _|r#| |}| || n|   || _q)zAA normal scheduler loop for decode worker in disaggregation mode.)	recv_requestsprocess_input_requestsprocess_decode_queue#get_next_disagg_decode_batch_to_run	cur_batch	run_batchprocess_batch_resultself_check_during_idlerB  )rG   	recv_reqsbatchresultrI   rI   rJ   event_loop_normal_disagg_decode  s   

zBSchedulerDisaggregationDecodeMixin.event_loop_normal_disagg_decodec                 C  s   t  | _d | _	 |  }| | |   |  }|| _|r.| |}| j	|
 |f nd }| jrA| j \}}| || n|d u rI|   | | || _qrL   )r   result_queuerB  r{  r|  r}  r~  r  r  rn   copypopleftr  r  launch_batch_sample_if_needed)rG   r  r  batch_result	tmp_batch
tmp_resultrI   rI   rJ    event_loop_overlap_disagg_decode  s(   


zCSchedulerDisaggregationDecodeMixin.event_loop_overlap_disagg_decoder  r   rU   r    c                 C  s&   |j d ur|j }d |_ | |S t S rL   )inner_idle_batchr  r    )rG   r  
idle_batchrI   rI   rJ   _run_batch_prebuilt  s
   

z6SchedulerDisaggregationDecodeMixin._run_batch_prebuiltOptional[ScheduleBatch]c                 C  s   | j }|r(|j r(| jdu sJ |  | s(| j r"|| _n| j| |  }d}|r3|}n| j r;d}n| 	| j| _| j sJ| jnd}| 
|}|rYtd|j |S )zFCreate fake completed prefill if possible and merge with running batchNschedule)rB  rC  rD  chunked_reqfilter_batchis_emptyr  merge_batchget_new_prebuilt_batchupdate_running_batchmaybe_prepare_mlp_sync_batchr*   rS   )rG   rB  new_prebuilt_batchretrI   rI   rJ   r~    s(   


zFSchedulerDisaggregationDecodeMixin.get_next_disagg_decode_batch_to_runc           
   	   C  s"  | j  r| j  }|D ]}| | qt| jdkrdS | j }t| j	j
| j}|| }g }g }tt| jD ]"}| j| }||k rV|| |tj || j q9|| q9|| _t|dkrgdS |D ]}t |j_qit|| j	| j| j| j| j| j}	|	  |	| j| j  |	S )z1Create a schedulebatch for fake completed prefillr   N)!grammar_managerhas_waiting_grammarsget_ready_grammar_requests_add_request_to_queuerP   rA  r  
batch_sizer   r   r2   max_running_requestsrE   rn   r   r   DECODE_WAITINGinit_next_round_inputr   r+  r,  r-  forward_entry_timer   init_newr   model_configenable_overlaprW  prepare_for_prebuiltprocess_prebuiltr   
future_map)
rG   ready_grammar_requestsrl   curr_batch_sizer  num_not_used_batchcan_run_listrA  rZ   	new_batchrI   rI   rJ   r    sF   




z9SchedulerDisaggregationDecodeMixin.get_new_prebuilt_batchc                 C  s   | j jr	| j  | j }| j| t| jj	dkrd S t
| ds+d| _| j j| _| jd | j | _| j| j dkrV| j \}}| j| | j }| j| d S d S )Nr   polling_countr]   )r   ,disaggregation_decode_enable_offload_kvcachedecode_offload_managercheck_offload_progressdisagg_decode_prealloc_queuer  rA  r   rP   r   r   r  &disaggregation_decode_polling_intervalpolling_intervalr<  disagg_decode_transfer_queuery  )rG   r   	req_conns_
alloc_reqsrI   rI   rJ   r}  +  s$   


z7SchedulerDisaggregationDecodeMixin.process_decode_queueN)rG   r/   )rG   r/   r  r   rU   r    )rG   r/   rU   r  )rq   rr   rs   r@   no_gradr  r  r  r~  r  r}  rI   rI   rI   rJ   rz    s    
$

(6rz  )Rrt   
__future__r   loggingr+  collectionsr   dataclassesr   httpr   typingr   r   r   r	   r
   r@   torch.distributedr   sglang.srt.configs.mamba_utilsr   sglang.srt.constantsr   sglang.srt.disaggregation.baser   r   r   sglang.srt.disaggregation.utilsr   r   r   r   r   r   r   r   r   r   r   sglang.srt.layers.dp_attentionr   "sglang.srt.managers.schedule_batchr   r   r   sglang.srt.managers.utilsr    sglang.srt.mem_cache.allocatorr!   &sglang.srt.mem_cache.base_prefix_cacher"   sglang.srt.mem_cache.commonr#    sglang.srt.mem_cache.memory_poolr$   r%   r&   r'   r(   $sglang.srt.mem_cache.swa_memory_poolr)   sglang.srt.tracing.tracer*   r+   sglang.srt.utilsr,   +sglang.srt.utils.torch_memory_saver_adapterr-   	getLoggerrq   r   r.   sglang.srt.managers.schedulerr/   r$  r1   ru   r   r   r   rz  rI   rI   rI   rJ   <module>   sR    4

I,   y ?