o
    پiA                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	 d dl
Z
d dlmZ d dlmZmZ d dlmZmZ d dlmZmZmZmZmZ d d	lmZ d d
lmZ d dlmZ d dlm Z m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z* e*e+Z,dZ-G dd dZ.dS )    N)deque)deepcopy)AnyList)ModelTaskType)_parse_sizesave_image_to_path)GetWeightsChecksumReqInputUpdateWeightFromDiskReqInput)ListLorasReqMergeLoraWeightsReq
SetLoraReqShutdownReqUnmergeLoraWeightsReq)	GPUWorker)Req)OutputBatch)PortArgs
ServerArgsset_global_server_args)get_zmq_socket)broadcast_pyobj)GREENRESETinit_loggera  data:image/jpg;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAACXBIWXMAAA7EAAAOxAGVKw4bAAAAbUlEQVRYhe3VsQ2AMAxE0Y/lIgNQULD/OqyCMgCihCKSG4yRuKuiNH6JLsoEbMACOGBcua9HOR7Y6w6swBwMy0qLTpkeI77qdEBpBFAHBBDAGH8WrwJKI4AAegUCfAKgEgpQDvh3CR3oQCuav58qlAw73kKCSgAAAABJRU5ErkJggg==c                   @   s  e Zd ZdZ		d3dededededef
dd	Zd
e	e
 defddZd
e	e
 fddZd
e	e
 defddZde	e
 defddZde	e
 defddZd
e	e
 defddZd
e	e
 defddZd
e	e fddZ		d4dededB d efd!d"Zdeeeef  dB fd#d$Zd%d& Zd'e	eee
f  de	eee
f  fd(d)Zde	eee
f  fd*d+Zd5d,d-Zd.eee
f ddfd/d0Z de	eee
f  fd1d2Z!dS )6	Schedulerz
    Runs the main event loop for the rank 0 worker.
    It listens for external requests via ZMQ and coordinates with other workers.
    This class does NOT manage worker processes.
    Nserver_argsgpu_id	port_argstask_pipes_to_slavesresult_pipes_from_slavesc           	      C   s  || _ || _t|d tjdd| _|j}|dkr.t| jtj|d\| _	}t
d|  nd | _	t||j||d}|| _|| _|| _|| _d| _t| jt| jt| jt| jtt | jt| jt| jt | j!t"| j#i	| _$t% | _&d| _'d| _(d| _)| *  d	| _+d| _,d S )
N)r      )
io_threadsr   TzScheduler bind at endpoint: )
local_rankmaster_portrankr   F   )-r   r   r   zmqContextcontextscheduler_endpointr   ROUTERreceiverloggerinfor   r$   workerr   r    r   _runningr   _handle_set_lorar   _handle_merge_lorar   _handle_unmerge_lorar   _handle_generationr   r   _handle_list_lorasr   _handle_shutdownr
    _handle_update_weights_from_diskr	   _handle_get_weights_checksumrequest_handlersr   waiting_queue	warmed_up_warmup_total_warmup_processedprepare_server_warmup_reqs_max_consecutive_errors_consecutive_error_count)	selfr   r   r   r   r    endpointactual_endpointr/    rD   d/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/managers/scheduler.py__init__5   sN   



zScheduler.__init__reqsreturnc                 C   s"   |d }| j |j|j|j|jS Nr   )r/   set_loralora_nickname	lora_pathtargetstrengthrA   rG   reqrD   rD   rE   r1   u   s   zScheduler._handle_set_lorac                 C   s   |d }| j |j|jS rI   )r/   merge_lora_weightsrM   rN   rO   rD   rD   rE   r2   }   s   zScheduler._handle_merge_lorac                 C   s   |d }| j |jS rI   )r/   unmerge_lora_weightsrM   rO   rD   rD   rE   r3      s   zScheduler._handle_unmerge_lora_reqsc                 C   s
   | j  S )N)r/   
list_lorasrA   rS   rD   rD   rE   r5      s   
zScheduler._handle_list_lorasc                 C   s   d| _ t S NF)r0   r   rU   rD   rD   rE   r6      s   zScheduler._handle_shutdownc                 C   sB   |d }| j j|j|j|jd\}}t||d|rddS |dS )z9Handle update_weights_from_disk request for RL workflows.r   )
model_pathflush_cachetarget_modules)successmessageN)outputerror)r/   update_weights_from_diskrW   rX   rY   r   )rA   rG   rP   rZ   r[   rD   rD   rE   r7      s   
z*Scheduler._handle_update_weights_from_diskc                 C   s"   |d }| j j|jd}t|dS )z$Handle get_weights_checksum request.r   )module_names)r\   )r/   get_weights_checksumr_   r   )rA   rG   rP   	checksumsrD   rD   rE   r8      s   
z&Scheduler._handle_get_weights_checksumc                 C   sb   dd |D }|r+|  j t|7  _ | jdkr&td| j  d| j d ntd | j|S )Nc                 S   s   g | ]}|j r|qS rD   	is_warmup.0rP   rD   rD   rE   
<listcomp>   s    z0Scheduler._handle_generation.<locals>.<listcomp>r   zProcessing warmup req... (/)zProcessing warmup req...)r=   lenr<   r-   r.   r/   execute_forward)rA   rG   warmup_reqsrD   rD   rE   r4      s   

zScheduler._handle_generationFoutput_batchidentityrc   c                 C   s>   |s| j dur|dur| j |dt|g dS dS dS dS )z3
        replies to client, only on rank 0
        N    )r,   send_multipartpickledumps)rA   rl   rm   rc   rD   rD   rE   return_result   s   	zScheduler.return_resultc                 C   s   | j sdS | j  }|gS )zpull a req from waiting_queueN)r:   popleft)rA   itemrD   rD   rE   get_next_batch_to_run   s   
zScheduler.get_next_batch_to_runc              	   C   s   | j jry| js{| j jd ur}t| j j| _d| _| j jD ]X}t|\}}| j jj	}|t
jt
jt
jt
jfv r]tjdd}tj|dd ttttj|d}t| ||dd|gd}n
t| ||dd	}|  | jd |f qd| _d S d S d S d S )
Nr   outputsuploadsT)exist_okzwarmup_image.jpg )	data_typewidthheightpromptnegative_prompt
image_path)rz   r{   r|   r}   )r   warmupr;   warmup_resolutionsri   r<   r=   r   pipeline_config	task_typer   I2ITI2II2VTI2Vospathjoinmakedirsasynciorunr   !MINIMUM_PICTURE_BASE64_FOR_WARMUPr   rz   set_as_warmupr:   append)rA   
resolutionr{   r|   r   uploads_dir
input_pathrP   rD   rD   rE   r>      sV   
	
z$Scheduler.prepare_server_warmup_reqs	recv_reqsc                 C   sn   | j s| jjr|r| jjd ur|S |d \}}t|tr5t|}|  |d||f d| _	d| _
d| _ |S )Nr      T)r;   r   r   r   
isinstancer   r   r   insertr<   r=   )rA   r   rm   rP   
warmup_reqrD   rD   rE   +process_received_reqs_with_req_based_warmup   s"   
z5Scheduler.process_received_reqs_with_req_based_warmupc                    sL  | j durVz3z| j tj}|d |d  }t|dkr"t|ng }W n tjtjt	t
fy6   g }Y nw W n	 tjyA    w |rUt|tsL|g} fdd|D }nd}| jjdkrpt|| jjj| jj| jjjd d}| jjrt|| jjj| jj| jjjd d}| jjdkrt|| jjj| jj| jjjd d}|dusJ |S )	z_
        For non-main schedulers, reqs are broadcasted from main using broadcast_pyobj
        Nr   r!   c                    s   g | ]} |fqS rD   rD   rd   rm   rD   rE   rf   !      z'Scheduler.recv_reqs.<locals>.<listcomp>r   )src)r,   recv_multipartr'   NOBLOCKri   rp   loadsAgainUnpicklingError
IndexErrorEOFErrorZMQErrorr   listr   	sp_degreer   r/   sp_groupr%   sp_cpu_groupranksenable_cfg_parallel	cfg_groupcfg_cpu_grouptp_sizetp_grouptp_cpu_group)rA   partspayloadr   rD   r   rE   r     sT   

zScheduler.recv_reqsc           
      C   s  t d| jj  | jrYz|  }| |}| j| d| _	W nG t
yi } z;|  j	d7  _	t jd| j	 d| j d| dd | j	| jkr_t d	| j d
 td| j d| |W Y d}~q
d}~ww |  }|sqq
dd |D }dd |D }z|d }| jt|}|r||}n
tdt| d}W n3 t
y } z't jd| dd |rt|d trtt|dntt|d}W Y d}~nd}~ww zgt|tr|jnd}	|	r.|jdu r| jdkrt d| j d| j dt dt d	|jj n*t dt dt d|jj n| jdkr)t d| j d| j d nt d | j||d |	d W n tj yU } zt d|  W Y d}~q
d}~ww | js| j!durd| j!"  | j#j$dd dS )z]
        The main event loop that listens for ZMQ requests.
        Handles abortion
        z&Rank 0 scheduler listening on tcp://*:r   r   z:Error receiving requests in scheduler event loop (attempt rg   z): T)exc_infozMaximum consecutive errors (z,) reached. Terminating scheduler event loop.zScheduler terminated after z! consecutive errors. Last error: Nc                 S      g | ]}|d  qS )r   rD   re   rt   rD   rD   rE   rf   k  r   z(Scheduler.event_loop.<locals>.<listcomp>c                 S   r   )r   rD   r   rD   rD   rE   rf   l  r   zUnknown request type: )r]   z1Error executing request in scheduler event loop: FzWarmup req (z) processed in z%.2fz secondszWarmup req processed in z) processing failedzWarmup req processing failedrb   zZMQ error sending reply: )linger)%r-   debugr   scheduler_portr0   r   r   r:   extendr@   	Exceptionr]   r?   RuntimeErrorru   r9   gettyper   r   r   strrc   r<   r.   r=   r   r   metricstotal_duration_srr   r'   r   r,   closer)   destroy)
rA   new_reqseitems
identitiesrG   processed_reqhandlerrl   rc   rD   rD   rE   
event_loopB  s   



 
X
zScheduler.event_loopr   c                 C   s>   |d }dd |  D }||d}| jD ]}|| qdS )z/Broadcast a task to all slave worker processes.methodc                 S   s   i | ]\}}|d kr||qS )r   rD   )re   kvrD   rD   rE   
<dictcomp>  s    z-Scheduler._broadcast_task.<locals>.<dictcomp>)r   kwargsN)r   r   send)rA   r   r   r   taskpiperD   rD   rE   _broadcast_task  s   

zScheduler._broadcast_taskc                 C   s"   g }| j D ]	}||  q|S )z0Collect results from all slave worker processes.)r    r   recv)rA   resultsr   rD   rD   rE   _collect_slave_results  s   
z Scheduler._collect_slave_results)NNrV   )rH   N)"__name__
__module____qualname____doc__r   intr   r   rF   r   r   r   r1   r2   r3   r5   r6   r7   r8   r   r4   bytesboolrr   tupleru   r>   r   r   r   dictr   r   r   rD   rD   rD   rE   r   .   sV    
@

0

:fr   )/r   r   rp   collectionsr   copyr   typingr   r   r'   3sglang.multimodal_gen.configs.pipeline_configs.baser   6sglang.multimodal_gen.runtime.entrypoints.openai.utilsr   r   Asglang.multimodal_gen.runtime.entrypoints.post_training.io_structr	   r
   /sglang.multimodal_gen.runtime.entrypoints.utilsr   r   r   r   r   1sglang.multimodal_gen.runtime.managers.gpu_workerr   ,sglang.multimodal_gen.runtime.pipelines_corer   ;sglang.multimodal_gen.runtime.pipelines_core.schedule_batchr   )sglang.multimodal_gen.runtime.server_argsr   r   r   *sglang.multimodal_gen.runtime.utils.commonr   /sglang.multimodal_gen.runtime.utils.distributedr   1sglang.multimodal_gen.runtime.utils.logging_utilsr   r   r   r   r-   r   r   rD   rD   rD   rE   <module>   s*   