o
    پi/                     @  s   d dl mZ d dlZd dlZd dlmZmZmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZmZ d dlmZmZ d dlmZ eeZerMd d	lmZ G d
d dZG dd dZdS )    )annotationsN)TYPE_CHECKINGListOptionalSetUnion)
DllmConfig)DllmReqPhase)ReqRequestStageScheduleBatch)AddReqResultPrefillAdder)ForwardMode)	Schedulerc                   @  sz   e Zd Zd-ddZd.ddZd-d	d
Zd/ddZd0ddZd1ddZd2ddZ	d3d d!Z
d4d$d%Zd5d(d)Zd5d*d+Zd,S )6SchedulerDllmMixinselfr   c                 C  s0   | j jd urt| j nd | _t| jd| _d S )Ndllm_config)server_argsdllm_algorithmr   from_server_argsr   DllmManagerdllm_managerr    r   S/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/dllm/mixin/scheduler.pyinit_diffusion_llm   s
   z%SchedulerDllmMixin.init_diffusion_llmreturnOptional[ScheduleBatch]c                 C  s   | j rd| j_|  rdS t| jj}| j| j | 	|}| j
  |   | |}|j}|s4dS | ||| | ||}|S )z9Generate a new batch for DLLM (Diffusion LLM) scheduling.FN)try_preemptionrunning_batchbatch_is_full_should_skip_prefilllenreqspolicycalc_prioritywaiting_queue_create_dllm_prefill_adderr   init_next_round_fetch_waiting_reqs_process_dllm_batchescan_run_list#_update_metrics_and_state_for_batch_create_dllm_batch)r   
running_bsadderforward_moder-   	new_batchr   r   r   get_new_batch_dllm   s    


z%SchedulerDllmMixin.get_new_batch_dllmc                 C  s^   | j jt| jj }t|t| j}|dkr-| jd | }| j| | j|d  | _d S d S )Nr   )r   max_running_requestsr$   r   r(   minadd_waiting_reqs)r   max_dllm_capacitynum_requests_to_addrequests_to_addr   r   r   r+   =   s   z&SchedulerDllmMixin._fetch_waiting_reqsboolc                 C  sV   | j js| js| j rdS t| j j}| |dkr)| j r)| js)d| j _dS dS )z(Check if DLLM prefill should be skipped.Tr   F)	r!   r"   r(   r   is_emptyr$   r%   get_num_allocatable_reqsr    r   r0   r   r   r   r#   I   s    z'SchedulerDllmMixin._should_skip_prefillr0   intr   c                 C  s>   t | j| j| j| j| j| j| j| jr|nd| j	| j
j| jdS )z6Create a prefill adder configured for DLLM scheduling.r   )prefill_max_requestsr   )r   	page_size
tree_cachetoken_to_kv_pool_allocatorr!   new_token_ratiomax_prefill_tokenschunked_prefill_sizeis_mixed_chunk(priority_scheduling_preemption_thresholdr   r@   r   r>   r   r   r   r)   [   s   z-SchedulerDllmMixin._create_dllm_prefill_adderr1   r   c                 C  sN   t j}| j }|r| ||tjtj |S | j }| ||tj	tj
 |S )z+Process prefill or decode batches for DLLM.)r   DLLM_EXTENDr   get_prefill_requests_process_batch_by_phaser	   STAGING_PREFILLINCOMING_PREFILLget_decode_requestsSTAGING_DECODEINCOMING_DECODE)r   r1   r2   prefill_reqsdecode_reqsr   r   r   r,   k   s$   

z(SchedulerDllmMixin._process_dllm_batchesbatch	List[Req]staging_phaser	   incoming_phaseNonec                   sZ   fdd|D }|r|  ||}|tjkrdS  fdd|D }|r+| || dS dS )z:Process a batch, separating staging and incoming requests.c                      g | ]	}|j  kr|qS r   
dllm_phase.0req)rU   r   r   
<listcomp>       z>SchedulerDllmMixin._process_batch_by_phase.<locals>.<listcomp>Nc                   rX   r   rY   r[   )rV   r   r   r^      r_   )process_dllm_staging_reqsr   CONTINUEprocess_dllm_incoming_reqs)r   r1   rS   rU   rV   staging_reqsstaging_resultincoming_reqsr   )rV   rU   r   rK      s   
z*SchedulerDllmMixin._process_batch_by_phaser-   c                 C  s   | j r|D ]}|tj q|jr|jD ]}| | q|r)| j| | j  || _	|| _
t| jj| _|D ]}|jjdkrRt |j_| j rR| j|j  q8dS )z'Update metrics and state for the batch.r   N)enable_metricsadd_latencyr   PREFILL_WAITINGpreempt_list_add_request_to_queuer   add_staging_reqsincrement_chunked_countr1   r-   r$   r!   r%   r0   
time_statsforward_entry_timetimeperf_countermetrics_collectorobserve_queue_timeget_queueing_time)r   r-   r1   r0   r]   r   r   r   r.      s*   

z6SchedulerDllmMixin._update_metrics_and_state_for_batchr2   r   c              
   C  sx   t j|| j| j| j| j| j| j| jd}|	  ||_
d|_ddlm} || jj| jj| jjt| jjt|d|_|S )z$Create and prepare a new DLLM batch.r   Nr   )PrefillStats)log_input_tokenslog_hit_tokensrD   r0   num_new_seqs)r   init_newreq_to_token_poolrC   rB   model_configenable_overlapspec_algorithmr   prepare_for_extendr2   decoding_reqs+sglang.srt.managers.scheduler_metrics_mixinrt   r1   ru   rv   rD   r$   r!   r%   prefill_stats)r   r-   r2   r3   rt   r   r   r   r/      s,   

z%SchedulerDllmMixin._create_dllm_batchr%   r   c                 C  s   t j}|D ]G}t| jj}t|j| |krd| j_| jjr,| jr)|	|| j
s, |S || j |j|d| jd}|t jkrL|t jkrId| j_ |S q|S )zGProcess incoming DLLM requests with resource allocation and preemption.T)has_chunked_reqtruncation_align_size)r   ra   r$   r!   r%   r-   r=   r"   r    preempt_to_scheduler   init_next_round_inputrB   add_one_reqr   NO_TOKEN)r   r1   r%   resr]   r0   r   r   r   rb      s0   


z-SchedulerDllmMixin.process_dllm_incoming_reqsc                 C  s,   |D ]}| |}|tjkr|  S qtjS )z7Process staging DLLM requests with resource allocation.)add_dllm_staging_reqr   r   ra   )r   r1   r%   r]   r   r   r   r   r`      s   

z,SchedulerDllmMixin.process_dllm_staging_reqsN)r   r   )r   r   r   r   )r   r   r   r;   )r   r   r0   r?   r   r   )r   r   r1   r   r   r   )
r1   r   rS   rT   rU   r	   rV   r	   r   rW   )
r   r   r-   rT   r1   r   r0   r?   r   rW   )r   r   r-   rT   r2   r   r   r   )r   r   r1   r   r%   rT   r   r   )__name__
__module____qualname__r   r4   r+   r#   r)   r,   rK   r.   r/   rb   r`   r   r   r   r   r      s    


!






!r   c                   @  s   e Zd ZdZd!d"ddZd#d	d
Zd#ddZd$ddZd$ddZd%ddZ	d&ddZ
d&ddZd'ddZd'ddZd'dd ZdS )(r   z
    Manager for Diffusion LLM request scheduling.

    Maintains two queues:
    - waiting_queue: The requests waiting to be scheduled with max running requests limit
    - staging_queue: Requests allocated resources by PrefillAdder
    Nr   Optional[DllmConfig]c                 C  s*   || _ |d ur
|jnd| _g | _g | _d S )N   )r   r5   max_running_reqsr(   staging_queue)r   r   r   r   r   __init__  s
   
zDllmManager.__init__r   rT   c                 C     dd | j D S )z,Get all prefill requests from waiting queue.c                 S  s   g | ]}|  r|qS r   is_dllm_prefillr[   r   r   r   r^         z4DllmManager.get_prefill_requests.<locals>.<listcomp>r(   r   r   r   r   rJ        z DllmManager.get_prefill_requestsc                 C  r   )z+Get all decode requests from waiting queue.c                 S     g | ]}|  s|qS r   r   r[   r   r   r   r^     r   z3DllmManager.get_decode_requests.<locals>.<listcomp>r   r   r   r   r   rN     r   zDllmManager.get_decode_requestsr%   Union[Req, List[Req]]rW   c                 C  sH   | j dus	J dt|tr|n|g}| |rtd| j| dS )z4Add requests to waiting queue with redundancy check.Nz Diffusion LLM config is not set.z-Redundant requests detected in dLLM requests.)r   
isinstancelist_has_duplicate_reqsRuntimeErrorr(   extendr   r%   reqs_to_addr   r   r   r7     s
   
zDllmManager.add_waiting_reqsc                 C  s$   t |tr|n|g}| j| dS )z:Add requests to staging queue (allocated by PrefillAdder).N)r   r   r   r   r   r   r   r   rk   #  s   zDllmManager.add_staging_reqsr;   c                   s&   dd | j D  t fdd|D S )z8Check if any request ID already exists in waiting queue.c                 S  s   h | ]}|j qS r   rid)r\   rr   r   r   	<setcomp>*  s    z2DllmManager._has_duplicate_reqs.<locals>.<setcomp>c                 3  s    | ]}|j  v V  qd S Nr   r[   existing_ridsr   r   	<genexpr>+  s    z2DllmManager._has_duplicate_reqs.<locals>.<genexpr>)r(   any)r   r%   r   r   r   r   (  s   zDllmManager._has_duplicate_reqsc                 C  s   | j duot| jdkS )z-Check if there are requests in staging queue.Nr   )r   r$   r   r   r   r   r   any_staging_reqs-  s   zDllmManager.any_staging_reqsc                 C  s   | j du rdS t| jdkS )z9Check if both queues are empty or DLLM is not configured.NTr   )r   r$   r(   r   r   r   r   r<   1  s   
zDllmManager.is_emptyc                 C  s   | j D ]	}| jd7  _qdS )z1Increment chunked count for all staging requests.r   N)r   
is_chunkedr   r]   r   r   r   rl   7  s   
z#DllmManager.increment_chunked_countc                 C  s(   dd | j D | _ dd | jD | _dS )z*Remove finished requests from both queues.c                 S  r   r   finishedr[   r   r   r   r^   >  r   z4DllmManager.filter_finished_reqs.<locals>.<listcomp>c                 S  r   r   r   r[   r   r   r   r^   ?  r   N)r(   r   r   r   r   r   filter_finished_reqs<  s   z DllmManager.filter_finished_reqsc                 C  s   | j D ]}|  qg | _ dS )zCInitialize staging requests for next round and clear staging queue.N)r   r   r   r   r   r   r*   A  s   


zDllmManager.init_next_roundr   )r   r   )r   rT   )r%   r   r   rW   )r%   rT   r   r;   )r   r;   )r   rW   )r   r   r   __doc__r   rJ   rN   r7   rk   r   r   r<   rl   r   r*   r   r   r   r   r      s    








r   )
__future__r   loggingro   typingr   r   r   r   r   sglang.srt.dllm.configr   sglang.srt.dllm.mixin.reqr	   "sglang.srt.managers.schedule_batchr
   r   r   #sglang.srt.managers.schedule_policyr   r   ,sglang.srt.model_executor.forward_batch_infor   	getLoggerr   loggersglang.srt.managers.schedulerr   r   r   r   r   r   r   <module>   s    
 l