o
    پiG                     @  sd  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlZd dlZd dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZmZm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z.m/Z/m0Z0 e1e2Z3e
rd dl4m5Z5 eG dd dZ6G dd dZ7G dd dZ8dS )    )annotationsN)deque)	dataclass)TYPE_CHECKINGDictListOptionalTuple)tqdm)KVPoll)DisaggregationModepoll_and_all_reduce)P2PWork)envs)get_attention_dp_rankget_attention_dp_sizeis_dp_attention_enabled)ReqScheduleBatch)GenerationBatchResultget_logprob_dict_from_resultget_logprob_from_pp_outputs)ForwardBatchPPProxyTensors)SamplingParams)DynamicGradModebroadcast_pyobjpoint_to_point_pyobj)	Schedulerc                   @  s   e Zd ZU ded< dS )PPBatchMetadataboolcan_run_cuda_graphN)__name__
__module____qualname____annotations__ r&   r&   Z/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/scheduler_pp_mixin.pyr   (   s   
 r   c                   @  sZ  e Zd Ze dtddZe dtddZe dtddZdtd	d
ZdtddZduddZ	dvddZ
dtddZdtddZdwd d!Zdxd%d&Zdyd*d+Zdzd.d/Zd{d|d3d4Zdtd5d6Zd}d<d=Z	>d~dd@dAZddCdDZddEdFZddKdLZddNdOZddUdVZddYdZZdd^d_ZddcddZddedfZdtdgdhZdtdidjZddldmZ ddodpZ!ddqdrZ"dsS )SchedulerPPMixinselfr   c              	   C  s  |    	 d}t| jD ]:}| j| | _| j| | _|| j | j }|d | j }tj	
d |  }| | W d   n1 sDw   Y  | jjsq| | j tj	
d | j|dd| _W d   n1 slw   Y  tj	
d |  | j|< W d   n1 sw   Y  | j| j|< | j| | _| jrd}|  }d}d}d}	| jjd	kr| ||\}}}	| | j | jr| ||| j| j\}
| _| jjd	kr| ||\}}}	| j| dur|	  tj	
d
 | | j| | W d   n	1 sw   Y  | j| | j|< | jjsD| jrDtj ! "| j tj	
d | j#|
j$j%dd| _W d   n	1 s?w   Y  || _&q|rO| '  q)a  
        A scheduler loop for pipeline parallelism.
        Notes:
        1. Each stage runs in the same order and is notified by the previous stage.
        2. We use async send but sync recv to avoid desynchronization while minimizing the communication overhead.
        3. We can use async batch depth to buffer the outputs in the last stage for to allow overlapping the GPU computation and CPU processing and avoid last PP rank staggler.

        Unified Schedule:
        ====================================================================
        Stage P
        recv ith req from previous stage
        recv ith proxy from previous stage
        run ith batch
        recv prev (i+1)% mb_size th outputs
        process batch result of prev (i+1)% mb_size th batch (can be run in parallel with the curr batch GPU computation)
        send ith req to next stage
        send ith proxy to next stage
        send current stage's outputs to next stage(can be stashed and delayed to send later)

        the above order can be optimized and reordered to minimize communication-related CPU stall and overhead bubbles.

        ====================================================================
        T   recv_requestsNsend_reqs_to_next_stage
async_sendget_next_batch_to_runFr   process_batch_resultsend_proxy_dict_to_next_stage)(init_pp_loop_staterangepp_loop_sizerunning_mbsrunning_batchlast_mbs
last_batchpp_sizetorchprofilerrecord_functionr+   process_input_requestspp_groupis_last_rank_pp_commit_comm_worksend_req_work_pp_send_pyobj_to_next_stager/   mbs	cur_batch_pp_recv_proxy_tensorsserver_argspp_async_batch_depth9_pp_commit_send_output_work_and_preprocess_output_tensorssend_proxy_work_pp_launch_batchmb_metadatalast_rank_comm_queuelaunch_eventsynchronize_pp_process_batch_resultcudacurrent_stream
wait_event_pp_send_dict_to_next_stagepp_hidden_states_proxy_tensorstensors
pp_outputsself_check_during_idle)r)   server_is_idlemb_idnext_first_rank_mb_id
next_mb_id	recv_reqspp_proxy_tensorsnext_pp_outputsnext_batch_result	d2h_eventresultr&   r&   r'   event_loop_pp.   s   



zSchedulerPPMixin.event_loop_ppc                 C  s
  |    dg| j }dg| j }d}g }d}g }g }g }g }		 d}
t| jD ]M}| j| | _| j| | _|| j | j }|d | j }d}d}d}d}d}|  }| 	| | j
jsa| | j |  }|||< | | |  }| | |||< |   |  }| |}|| j|< | j| j|< | j| | _| jrd}
|  }| jjdkr| ||\}}}| | j | jr| ||| j| j\}| _| jjdkr| ||\}}}| ||||\}}| ||||\}	}|| dur|   }| !|}| | || dur	|   }| |	 | j| dur+|"  | #| j| | | j| | j|< || dur7| $| | j
jsi| j%|dd| _| j%|dd}| j%|dd}| jrit&j'( )| j | j*|j+j,dd| _|| _-|}|}d| j_.q&|
rt/| j0dkr| 1  q)a  
        This is the prefill server event loop for pipeline parallelism.

        Notes:
        1. Following the same rules as the event_loop_pp.
        2. Adds extra steps for KV transfer process: bootstrap + release.

        Prefill Server Schedule:
        ====================================================================
        Stage P
        recv ith req from previous stage
        recv ith bootstrap req from previous stage
        recv ith transferred req from previous stage
        recv ith proxy from previous stage
        run ith batch
        recv prev (i+1) % mb_size th consensus bootstrapped req from previous stage
        local consensus on bootstrapped req
        recv prev (i+1) % mb_size th release req from previous stage
        local consensus on release req
        recv prev (i+1) % mb_size th outputs
        process batch result of prev (i+1)% mb_size th batch (can be run in parallel with the curr batch GPU computation)
        send ith req to next stage
        send ith bootstrap req to next stage
        send ith transferred req to next stage
        send ith proxy to next stage
        send current stage's outputs to next stage (can be stashed and delayed to send later)

        the above order can be optimized and reordered to minimize communication-related CPU stall and overhead bubbles.
        ====================================================================

        There are two additional elements compared to the regular schedule:

        Bootstrap Requests + Release Requests:
        - Both can have local failure and need to be consensus on. PP needs to guarantee eventual consistency of local failure and flush malfunc requests out as soft error.

        NTr*   Fr   r-   )2r2   r4   r3   r5   r6   r7   r8   r9   r+   r=   r>   r?   r@   rA   _pp_pd_get_bootstrapped_ids"_pp_pd_get_prefill_transferred_idsprocess_prefill_chunkget_new_batch_prefillmaybe_prepare_mlp_sync_batchrC   rD   rE   rF   rG   rH   rI   rJ   rK   rL   rM   &_pp_pd_send_consensus_bootstrapped_ids!_pp_pd_send_consensus_release_ids_pp_recv_pyobj_from_prev_stageprocess_bootstrapped_queuerN   rO   %process_disagg_prefill_inflight_queuerB   r:   rP   rQ   rR   rS   rT   rU   rV   batch_is_fulllendisagg_prefill_inflight_queuerW   )r)   bmbstmbsconsensus_bootstrapped_ridstransferred_ridsrelease_ridssend_bootstrapped_worksend_transfer_work send_consensus_bootstrapped_worksend_release_workrX   rY   rZ   r[   r^   next_release_rids next_consensus_bootstrapped_ridsr`   r_   r\   bootstrapped_ridsbatchr]   ra   r&   r&   r'   event_loop_pp_disagg_prefill   s   &





	




z-SchedulerPPMixin.event_loop_pp_disagg_prefillc                 C  s  |    d g| j }d g| j }d g| j }d }d }d }g }g }g }	g }
g }g }	 d}t| jD ]}| j| | _| j| | _|| j | j }|d | j }d }d }d }d }d }d }|  }| 	| | j
jsm| | j | |}|||< | | |  }|||< | | |  }|||< | |	 |  }|| j|< | j| j|< | j| | _| jrd}d }| jj s|  }| jjdkr| ||\}}}| | j | jr| ||| j| j\}| _| jjdkr| ||\}}}| ||||\}
}| ||||\}}|  ||||\}}| jj!r| j"#  || d ur'| $ }| %|}| |
 || d ur<| $ }| &|}| | || d urQ| $ }| '|}| | | j| d ur|| j| j st|(  | )| j| | | j| | j|< | j
js| j*|dd| _| j*|dd}| j*|dd}| j*|dd}	| jr| jj st+j,- .| j | j/|j0j1dd| _|| _2|}|}|}d| j_3q0t4| j5t4| j6j7 t4| j8j7 }| jj!r|t4| j"j97 }|r|dkr| :  q))NTr*   Fr   r-   );r2   r4   r3   r5   r6   r7   r8   r9   r+   r=   r>   r?   r@   rA   _pp_pd_get_retract_ids_pp_pd_get_prealloc_ids!_pp_pd_get_decode_transferred_ids#get_next_disagg_decode_batch_to_runrC   rD   forward_modeis_prebuiltrE   rF   rG   rH   rI   rJ   rK   rL   rM   rh   ri   ,disaggregation_decode_enable_offload_kvcachedecode_offload_managercheck_offload_progressrj   process_retract_queueprocess_prealloc_queueprocess_decode_transfer_queuerN   rO   rB   r:   rP   rQ   rR   rS   rT   rU   rV   rm   rn   waiting_queuedisagg_decode_transfer_queuequeuedisagg_decode_prealloc_queueongoing_offloadrW   )r)   rmbspmbsrq   consensus_retract_ridsconsensus_prealloc_ridsrt   send_retract_worksend_prealloc_workrv   send_consensus_retract_worksend_consensus_prealloc_workrx   rX   rY   rZ   r[   r^   next_consensus_retract_ridsnext_consensus_prealloc_ridsry   r`   r_   r\   retract_ridsprealloc_ridsrs   r|   r]   ra   
queue_sizer&   r&   r'   event_loop_pp_disagg_decodeA  s,  



















 z,SchedulerPPMixin.event_loop_pp_disagg_decodec                 C  s   | j | jj | _| jj | _d g| j | _d g| j | _dd t| jD | _	d g| j | _
d | _t | _g | _g | _g | _d | _d S )Nc                 S  s   g | ]}t g d dqS )F)reqsrm   )r   ).0_r&   r&   r'   
<listcomp>  s    
z7SchedulerPPMixin.init_pp_loop_state.<locals>.<listcomp>)r9   rF   rG   r4   #enable_nsa_prefill_context_parallelrequire_attn_tp_allgatherrC   r7   r3   r5   rK   rV   r   rL   rA   rI   send_output_workrM   r)   r&   r&   r'   r2     s    
z#SchedulerPPMixin.init_pp_loop_statec              
   C  s"  g }g }| j jrM| jj}|j}g }tdD ]*}t| jd || jd d   }|dkr. ntj	j
dd|tjd }|| qtddd}	tt|dd	D ]\}}tt|d
||	d}
|
j|
_d|
_|
t|
jt|
j  t|
g| j| j| j| jd| j}t|
j}t rt  }dg| }t! }|||< ||_"||_#t$j%||j&f|j'ddt$j%||j&f|j'ddd}t(|}t$j)* rt$j)+  t,- }|.  |/ }t0||}|j1||d}t$j)* rt$j)+  t,- | }|d }|t| || |
j2dur| jj3|
j2dt|
jf }| j4| | j4|
 qOt56dt| d| d|  | j7dkrM||g}t8|| j9j:| j;| j9j<d d}|\}}t$j=* rit$j=> ri||g}| j j?|dd |\}}t@ | _A| jAB|| | jAC| j d| jA_Dt56d| jE d| jAjFdd dS )z
        Profile prefill latency for dynamic chunk sizing.

        Only runs on PP0 (first rank), then broadcasts data to all ranks.
        All ranks fit coefficients using the same data.
           g      ?r   i'  )sizedtyper*   )temperaturemax_new_tokensz.Profiling prefill latency for dynamic chunking)desc )ridorigin_input_textorigin_input_idssampling_paramsFrP   )r   device)hidden_statesresidual)forward_batchr]   g     @@Nz"[PP Dynamic Chunk] [PP0] Profiled z samples: seq_lens=z, latencies_ms=srcT[PP Dynamic Chunk] [PPz/] Predictor ready (quadratic). Target latency: .2fms)Gr>   is_first_rank	tp_workermodel_runnermodel_configr3   intchunked_prefill_sizenprandomrandintint64tolistappendr   	enumerater
   r   strr   fill_idslogprob_start_lenset_extend_input_lenrn   prefix_indicesr   init_newreq_to_token_pooltoken_to_kv_pool_allocator
tree_cachespec_algorithmr   r   r   global_num_tokensglobal_num_tokens_for_logprobr:   zeroshidden_sizer   r   rP   is_availablerN   timeperf_counterprepare_for_extendget_model_worker_batchr   forwardreq_pool_idxreq_to_tokenfreeloggerinfoattn_tp_sizer   attn_tp_grouprankattn_tp_cpu_groupranksdistributedis_initializedbroadcast_object_listChunkSizePredictorlength_predictorfitset_target_latencyis_readypp_ranktarget_latency)r)   seq_lens	latenciesr   r   input_ids_listi
chunk_size	input_idsr   reqr|   current_seq_lendp_sizer   dp_rankproxy_tensorspp_proxystartmodel_worker_batchr   r   latency_seconds
latency_ms
kv_indicesdata_to_sync_tpdata_to_syncr&   r&   r'   profile_and_init_predictor  s   











z+SchedulerPPMixin.profile_and_init_predictorhistory_lenr   returnOptional[int]c              	   C  sr   | j r| jdu s| jjsdS t| dd}| jj|| j| j| jj|d}|dur7t	
d| j d| d| d |S )z
        Predict next chunk size dynamically based on current history length.

        Args:
            history_len: Current sequence length

        Returns:
            Predicted chunk size, or None to use default chunked_prefill_size
        Nmax_prefill_tokens)r  base_chunk_size	page_sizecontext_lenmax_chunk_sizer   z] Predicted chunk size: z (history_len=))enable_dynamic_chunkingr   r   getattrpredict_next_chunk_sizer   r  r   r	  r   debugr   )r)   r  r
  predicted_sizer&   r&   r'   r    s.   

z(SchedulerPPMixin.predict_next_chunk_sizer{   Optional[List[str]]c                 C  sT   |d ur(|\}}| j jd|| d\}}| j| dd |D dd |D gS d S )NT)return_failed_reqsrids_to_checkc                 S     g | ]}|j qS r&   r   r   r   r&   r&   r'   r         z?SchedulerPPMixin.process_bootstrapped_queue.<locals>.<listcomp>)disagg_prefill_bootstrap_queuepop_bootstrappedr   extend)r)   r{    good_consensus_bootstrapped_ridsbad_consensus_bootstrapped_rids	good_reqsfailed_reqsr&   r&   r'   rk     s   z+SchedulerPPMixin.process_bootstrapped_queuec                 C     | j jr| | jjdtjgtjg\}}||gS |  }|\}}| | jjdtjgtjg\}}t	t
|t
|@ }t	t
|t
|B }||gS NT)r>   r   get_ridsr  r   r   WaitingForInputFailedrj   listset)r)   good_bootstrapped_ridsbad_bootstrapped_ridsprev_bootstrapped_ridsprev_good_bootstrapped_ridsprev_bad_bootstrapped_ridscurr_good_bootstrapped_ridscurr_bad_bootstrapped_ridsr&   r&   r'   rc     s0   z,SchedulerPPMixin._pp_pd_get_bootstrapped_idsc                 C  s\   | j jr| | jdtjtjg}|S |  }| | jdtjtjg}tt	|t	|@ }|S r   )
r>   r   r!  ro   r   Successr#  rj   r$  r%  r)   rs   prev_transferred_ridscurr_transferred_ridsr&   r&   r'   rd     s"   

z3SchedulerPPMixin._pp_pd_get_prefill_transferred_idsrp   List[List[str]]rZ   rr   	List[str]c                 C  P   g }| j jr|| d ur|}| j|dd}||fS |d ur$| j|dd}||fS NTr-   r>   r?   rB   )r)   rp   rZ   rr   r{   rw   r&   r&   r'   rh     s   	z7SchedulerPPMixin._pp_pd_send_consensus_bootstrapped_idsrq   rt   rs   c                 C  r3  r4  r5  )r)   rq   rZ   rt   rs   rx   r&   r&   r'   ri   ,  s   	z2SchedulerPPMixin._pp_pd_send_consensus_release_idsworkList[P2PWork]Nonec                 C  s    |D ]}|j   q|  d S N)r6  waitclear)r)   r6  p2p_workr&   r&   r'   r@   B  s   z%SchedulerPPMixin._pp_commit_comm_workr[   >Tuple[PPProxyTensors, GenerationBatchResult, torch.cuda.Event]c                 C  s>   | j | jd | ||| j| j| j| j\}}}| _|||fS )N)r6  )r@   r   +_pp_send_recv_and_preprocess_output_tensorsrC   rK   rL   rV   )r)   rZ   r[   r^   r_   r`   r&   r&   r'   rH   G  s   
	zJSchedulerPPMixin._pp_commit_send_output_work_and_preprocess_output_tensorsFr.   r    c                 C  sd   g }| j dkr0| j| j }t|| j| j | | jj| j| j | | jd | j | j | |d}|S )Nr   r*   r-   )	attn_tp_rankattn_dp_rankr   r   r   tp_sizeworld_group	cpu_groupr9   )r)   datar.   r<  	dp_offsetr&   r&   r'   rB   \  s   
z-SchedulerPPMixin._pp_send_pyobj_to_next_stagec                 C  s   | j dkr-| j| j }tg | j| j | | jj| jd | j | j | | j| j | }nd }| jdkrCt	|| j
j| j| j
jd d}|S )Nr   r*   r   )r?  r@  r   r   r   rA  rB  rC  r9   r   r   r   r   r   )r)   rE  rD  r&   r&   r'   rj   j  s$   


z/SchedulerPPMixin._pp_recv_pyobj_from_prev_stagera   r   r|   r   Dict[str, torch.Tensor]c                 C  s(   d|j i}|jrt|}i ||}|S )Nnext_token_ids)rG  return_logprobr   )r)   ra   r|   tensor_dictlogprob_dictr&   r&   r'   _pp_prepare_tensor_dict  s   z(SchedulerPPMixin._pp_prepare_tensor_dictTrI  c                 C  s,   g }| | jj|| jr| jnd |d |S )N)rI  all_gather_groupr.   )r  r>   send_tensor_dictr   r   )r)   rI  r.   r<  r&   r&   r'   rS     s   	z,SchedulerPPMixin._pp_send_dict_to_next_stageOptional[PPProxyTensors]c                 C  s.   d }| j jst| j j| jr| jnd d}|S N)rL  )r>   r   r   recv_tensor_dictr   r   )r)   r]   r&   r&   r'   rE     s   z'SchedulerPPMixin._pp_recv_proxy_tensorsc                 C  s   | j j| jr	| jnd d}|S rO  )r>   rP  r   r   )r)   resr&   r&   r'   _pp_recv_dict_from_prev_stage  s   z.SchedulerPPMixin._pp_recv_dict_from_prev_stagerK   r   rV   r   c           	      C  sT   ddl m} d }d }d }|jrt|\}}}|d |_||d |d |||jd}|S )Nr   )r   rG  )logits_outputrT   rG  extend_input_len_per_req extend_logprob_start_len_per_reqr!   )sglang.srt.managers.schedulerr   rH  r   
output_idsr!   )	r)   r|   rK   rV   r   rS  rT  rU  output_resultr&   r&   r'   _pp_prep_batch_result  s(   
z&SchedulerPPMixin._pp_prep_batch_resultrX  c                 C  s,   | j tjkr| || d S | || d S r9  )disaggregation_moder   PREFILL#process_batch_result_disagg_prefillr0   )r)   r|   rX  r&   r&   r'   rO     s   z)SchedulerPPMixin._pp_process_batch_resultrC   List[ScheduleBatch]rL   .deque[Tuple[torch.cuda.Event, PPProxyTensors]]PPProxyTensors | Nonec                 C  s   g }| j jr?|| d ur?| \}}|| j s?tj | tj	
d | j|jdd}W d    n1 s:w   Y  | j jsd|rdtj	
d | j|jdd}W d    |S 1 s_w   Y  |S )Nsend_res_dict_to_next_stageTr-   )r>   r?   popleftr   r   r:   rP   rQ   rR   r;   r<   rS   rU   )r)   rZ   rC   rL   rV   r   q_eventpp_outputs_to_sendr&   r&   r'   _pp_send_output_to_next_stage  s.   
z.SchedulerPPMixin._pp_send_output_to_next_stageList[PPBatchMetadata]6Tuple[PPProxyTensors, List[P2PWork], torch.cuda.Event]c                 C  s   d }d }d }	|  ||||}
|| d urrtjd d }|| j s*t|  }W d    n1 s4w   Y  || j sr| j' | j	
| j | || || |}	tj }|tj  W d    n1 smw   Y  ||	||
fS )Nrecv_res_dict_from_prev_stage)rd  r:   r;   r<   r   r   r   rR  copy_stream_ctxcopy_streamwait_streamdefault_streamrY  rP   EventrecordrQ   )r)   rZ   r[   rC   rK   rL   rV   r^   r`   batch_resultr   r&   r&   r'   r>    s8   	
z<SchedulerPPMixin._pp_send_recv_and_preprocess_output_tensorsrY   r]   List[Optional[PPBatchMetadata]]c              
   C  s   t jd] | j= | j| j | | j|}t	|j
d||< t j }|t j  | jjr@||t| || jf W d    n1 sJw   Y  W d    ||fS W d    ||fS 1 sfw   Y  ||fS )N	run_batch)r!   )r:   r;   r<   forward_stream_ctxforward_streamrj  rk  rp  rD   r   r!   rP   rl  rm  rQ   r>   r?   r   r   rK  )r)   rY   r]   rK   rL   ra   eventr&   r&   r'   rJ     s4   



z!SchedulerPPMixin._pp_launch_batch	req_queue	List[Req]is_sendc                   sd   t  fdd|D | j}g }|D ]| fddt||D  qt|dkr.t|S |d S )zQ
        Used by PP, get the required rids with the given poll statuses.
        c                   s   g | ]
} r	|j n|jqS r&   )disagg_kv_senderkv_receiverr  )rv  r&   r'   r   A  s    z-SchedulerPPMixin.get_rids.<locals>.<listcomp>c                   s*   g | ]\}}|v r r|j n|jj qS r&   )r   r   )r   r   pollrv  poll_statusesr&   r'   r   G  s
    r*   r   )r   r   r   ziprn   tuple)r)   rt  rv  poll_statuses_grouppollsridsr&   rz  r'   r!  :  s   zSchedulerPPMixin.get_ridsc                   s\   | j jD ]
}|jd u r |_q fdd| j jD }| jjr |S |  }tt|t|@ S )Nc                   s   g | ]
}|j  kr|jqS r&   )retraction_mb_idr   r  rY   r&   r'   r   U  s
    
z;SchedulerPPMixin._pp_pd_get_retract_ids.<locals>.<listcomp>)r   retracted_queuer  r>   r   rj   r$  r%  )r)   rY   r   curr_retract_ridsprev_retract_ridsr&   r  r'   r~   O  s   

z'SchedulerPPMixin._pp_pd_get_retract_idsc                 C  r  NF)r>   r   r!  r   r   r   r"  r#  rj   r$  r%  )r)   good_prealloc_ridsbad_prealloc_ridsprev_prealloc_ridsprev_good_prealloc_ridsprev_bad_prealloc_ridscurr_good_prealloc_ridscurr_bad_prealloc_ridsr&   r&   r'   r   b  s.   z(SchedulerPPMixin._pp_pd_get_prealloc_idsc                 C  s`   | j jr| | jjdtjtjg}|S |  }| | jjdtjtjg}t	t
|t
|@ }|S r  )r>   r   r!  r   r   r   r-  r#  rj   r$  r%  r.  r&   r&   r'   r   ~  s"   

z2SchedulerPPMixin._pp_pd_get_decode_transferred_idsr   c                 C  2   |d ur| j |}| j| dd |D S d S )Nc                 S  r  r&   r  r  r&   r&   r'   r     r  z:SchedulerPPMixin.process_retract_queue.<locals>.<listcomp>)r   resume_retracted_reqsr   r  )r)   r   resumed_reqsr&   r&   r'   r     s   z&SchedulerPPMixin.process_retract_queuer   c                 C  sj   t | jjdkrg g gS |d ur3|\}}| jj|| d\}}| j| dd |D dd |D gS d S )Nr   )r  c                 S     g | ]}|j jqS r&   r   r   r  r&   r&   r'   r         z;SchedulerPPMixin.process_prealloc_queue.<locals>.<listcomp>c                 S  r  r&   r  r  r&   r&   r'   r     r  )rn   r   r  pop_preallocatedr   r  )r)   r   good_consensus_prealloc_ridsbad_consensus_prealloc_ridsr  r  r&   r&   r'   r     s"   
z'SchedulerPPMixin.process_prealloc_queuec                 C  r  )Nc                 S  r  r&   r  r  r&   r&   r'   r     r  zBSchedulerPPMixin.process_decode_transfer_queue.<locals>.<listcomp>)r   pop_transferredr   r  )r)   rt   released_reqsr&   r&   r'   r     s   z.SchedulerPPMixin.process_decode_transfer_queueN)r)   r   )r)   r   r  r   r  r  )r)   r   r{   r  )
r)   r   rp   r1  rZ   r   rr   r2  r{   r2  )
r)   r   rq   r1  rZ   r   rt   r2  rs   r2  )r)   r   r6  r7  r  r8  )r)   r   rZ   r   r[   r   r  r=  )F)r)   r   r.   r    )r)   r   ra   r   r|   r   r  rF  )T)r)   r   rI  rF  r.   r    )r)   r   r  rN  )r)   r   r  rF  )r)   r   r|   r   rK   r   rV   r   )r)   r   r|   r   rX  r   )r)   r   rZ   r   rC   r]  rL   r^  rV   r_  r  r7  )r)   r   rZ   r   r[   r   rC   r]  rK   re  rL   r^  rV   r_  r  rf  )
r)   r   rY   r   r]   r   rK   ro  rL   r^  )r)   r   rt  ru  rv  r    )r)   r   rY   r   )r)   r   r   r  )r)   r   r   r  )r)   r   rt   r  )#r"   r#   r$   r   rb   r}   r   r2   r  r  rk   rc   rd   rh   ri   r@   rH   rB   rj   rK  rS   rE   rR  rY  rO   rd  r>  rJ   r!  r~   r   r   r   r   r   r&   r&   r&   r'   r(   -   sN    c / 
=
 

"















%






r(   c                   @  s:   e Zd ZdZdd Zddd	ZdddZ	ddddZdS )r   z
    Predictor for dynamic chunk size based on quadratic latency model.

    Models latency as: f(l) = a*l^2 + b*l + c
    Predicts next chunk size x such that: f(L+x) - f(L) = target_latency
    c                 C  s"   d| _ d| _d| _d | _d| _d S )N        F)quadratic_coeff_alinear_coeff_bconstant_coeff_cr   r   r   r&   r&   r'   __init__  s
   
zChunkSizePredictor.__init__r   	List[int]r   List[float]c              
   C  s`  t j|dd t jd}t j|dd t jd}t|dk r(tdt| dt || |t |g}z,t jj||dd\}}}}	t|dkr\t	|d	 }
t	|d }t	|d
 }ntdW n t jj
yv } ztd| d}~ww |
d	krtd|
dd|d	k rtd|dd d}|
| _|| _|| _td|
dd|dd|d dS )zAFit quadratic coefficients f(l) = al^2 + bl + c from data points.r*   N)r      z.Not enough data points for quadratic fitting (z? < 8). Need at least 8 samples with different sequence lengths.)rcond   r      z-Failed to fit coefficients: insufficient rankz$Failed to fit f(l) = al^2 + bl + c: zFitted quadratic coefficient a=.2ezd is not positive. Attention has O(n^2) complexity, so a must be positive. Check warmup data quality.zFitted linear coefficient b=z is negative. Setting b=0.r  z,[ChunkSizePredictor] Fitted coefficients: a=z, b=z, c=)r   arrayfloat64rn   
ValueErrorcolumn_stack	ones_likelinalglstsqfloatLinAlgErrorr   warningr  r  r  r   )r)   r   r   LTXcoeffs	residualsr   sfitted_afitted_bfitted_cer&   r&   r'   r     sL   
zChunkSizePredictor.fitr  r   c                   sb   d fdd}|t ||d  _ jdkr!td jd	d
td jd	d| d dS )zPSet target latency based on base chunk size: target = f(base_chunk_size) - f(0).lr  r  c                   s    j |  |   j|    j S )zCTotal latency function: f(l) = al^2 + bl + c (or bl + c for linear))r  r  r  )r  r   r&   r'   f  s   z0ChunkSizePredictor.set_target_latency.<locals>.fr  r   zCalculated target_latency=r   z.ms is not positive. Check warmup data quality.z%[ChunkSizePredictor] Target latency: zms (base_chunk_size=r  N)r  r  r  r  )r  r   r  r   r   )r)   r  r  r&   r   r'   r     s   
z%ChunkSizePredictor.set_target_latencyNr  r  r	  r
  r  r  c              	   C  sf  | j r| jdu r
dS | jdkrdS | j}d| j | | j }| j }|| d| |  }	|	dk rDtd|	dd| d| jd	d
 dS t|	}
| |
 d|  }|dkrjtd|d	d| d| jd	d
 dS tj	
 }||||   }tt||d }t|d}|| | }||k r|}|| d }|durt||}t||}|| | }||k rdS |S )a  
        Predict next chunk size x such that f(history_len + x) - f(history_len) = target_latency.

        Args:
            history_len: Current sequence length (L)
            base_chunk_size: Base chunk size
            page_size: Page size for alignment
            context_len: Maximum context length
            max_chunk_size: Maximum allowed chunk size (optional)

        Returns:
            Predicted chunk size, or None if prediction fails
        Nr   r     zDiscriminant is negative (r  z&). No real solution for chunk size. L=z, T=r   zms.z'Calculated chunk size is non-positive (z). L=@   d   )r   r   r  r  r   r  mathsqrtr   %SGLANG_DYNAMIC_CHUNKING_SMOOTH_FACTORgetmaxr   min)r)   r  r  r  r	  r
  ABCdiscriminantsqrt_discriminantcalculated_chunk_size_floatsmooth_coeffsmoothed_chunk_sizecalculated_chunk_sizealignment_sizedynamic_chunk_sizemax_allowedr&   r&   r'   r    sZ   







z*ChunkSizePredictor.predict_next_chunk_size)r   r  r   r  )r  r   r9  )r  r   r  r   r  r   r	  r   r
  r  r  r  )r"   r#   r$   __doc__r  r   r   r  r&   r&   r&   r'   r     s    

1r   )9
__future__r   loggingr  r   collectionsr   dataclassesr   typingr   r   r   r   r	   numpyr   r:   torch.distributedr
   #sglang.srt.disaggregation.base.connr   sglang.srt.disaggregation.utilsr   r   %sglang.srt.distributed.parallel_stater   sglang.srt.environr   sglang.srt.layers.dp_attentionr   r   r   "sglang.srt.managers.schedule_batchr   r   sglang.srt.managers.utilsr   r   r   ,sglang.srt.model_executor.forward_batch_infor   r   #sglang.srt.sampling.sampling_paramsr   sglang.srt.utilsr   r   r   	getLoggerr"   r   rV  r   r   r(   r   r&   r&   r&   r'   <module>   sJ    
         