o
    ÔÙ¾i{  ã                   @   s~   d dl Z d dlmZ d dlZd dlZd dlmZ d dlmZ ee	ƒZ
defdd„ZG dd	„ d	ƒZG d
d„ dƒZeƒ Zeƒ ZdS )é    N)ÚAny)Ú
ServerArgs)Úinit_loggerÚserver_argsc                 Ã   s   t j ¡ }| t j¡}d| j› }| |¡ t d|› ¡ 	 z&| 	¡ I dH }t
 |¡}t d¡ t |¡I dH }| t
 |¡¡I dH  W n8 ty~ } z,tjd|› dd z| t
 dt|ƒd	œ¡¡I dH  W n	 tys   Y nw W Y d}~nd}~ww q )
z•
    This function runs as a background task in the FastAPI process.
    It listens for TCP requests from offline clients (e.g., DiffGenerator).
    ztcp://*:z,ZMQ Broker is listening for offline jobs on TNz-Broker received an offline job from a client.zError in ZMQ Broker: )Úexc_infoÚerror)ÚstatusÚmessage)ÚzmqÚasyncioÚContextÚsocketÚREPÚbroker_portÚbindÚloggerÚinfoÚrecvÚpickleÚloadsÚasync_scheduler_clientÚforwardÚsendÚdumpsÚ	Exceptionr   Ústr)r   Úctxr   Úbroker_endpointÚpayloadÚrequest_batchÚresponse_batchÚe© r"   úb/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/scheduler_client.pyÚrun_zeromq_broker   s.   €



$ÿ€ûór$   c                   @   óN   e Zd ZdZdd„ Zdefdd„Zdedefd	d
„Zde	fdd„Z
dd„ ZdS )ÚSchedulerClientz£
    A synchronous, singleton client for communicating with the Scheduler service.
    Designed for use in DiffGenerator, where synchronous usage is preferred
    c                 C   s   d | _ d | _d | _d S ©N)ÚcontextÚscheduler_socketr   ©Úselfr"   r"   r#   Ú__init__5   s   
zSchedulerClient.__init__r   c                 C   sŒ   | j d ur| j jst d¡ |  ¡  || _t ¡ | _ | j  tj	¡| _
| j
 tjd¡ | j
 tjd¡ | jj}| j
 |¡ t d|› ¡ d S )Nz8SchedulerClient is already initialized. Re-initializing.r   é€[ z2SchedulerClient connected to backend scheduler at )r(   Úclosedr   ÚwarningÚcloser   r
   r   r   ÚREQr)   Ú
setsockoptÚLINGERÚRCVTIMEOÚscheduler_endpointÚconnectÚdebug)r+   r   r5   r"   r"   r#   Ú
initialize:   s   

ÿzSchedulerClient.initializeÚbatchÚreturnc                 C   sB   z| j  |¡ | j  ¡ }|W S  tjjy    t d¡ tdƒ‚w )úESends a batch or request to the scheduler and waits for the response.ú,Timeout waiting for response from scheduler.ú"Scheduler did not respond in time.)r)   Ú
send_pyobjÚ
recv_pyobjr
   r   ÚAgainr   ÚTimeoutError)r+   r9   Úoutput_batchr"   r"   r#   r   O   s   

þzSchedulerClient.forwardc                 C   s²   | j du s	| j jrt d¡ dS | j  tj¡}| tjd¡ | tj	d¡ | j
j}z*z| |¡ | ddi¡ | ¡  W W | ¡  dS  tjjyS   Y W | ¡  dS w | ¡  w ©	zS
        Checks if the scheduler server is alive using a temporary socket.
        Nz'Cannot ping: client is not initialized.Fr   iÐ  ÚmethodÚpingT)r(   r.   r   r   r   r
   r1   r2   r3   r4   r   r5   r6   r>   r?   r0   r@   ©r+   Úping_socketÚendpointr"   r"   r#   rE   Y   s$   

ýý
zSchedulerClient.pingc                 C   s4   | j r| j  ¡  d| _ | jr| j ¡  d| _dS dS ©z-Closes the socket and terminates the context.N)r)   r0   r(   Útermr*   r"   r"   r#   r0   q   s   


þzSchedulerClient.closeN©Ú__name__Ú
__module__Ú__qualname__Ú__doc__r,   r   r8   r   r   ÚboolrE   r0   r"   r"   r"   r#   r&   /   s    
r&   c                   @   r%   )ÚAsyncSchedulerClienta?  
    An asynchronous, singleton client for communicating with the Scheduler service.
    Designed for use in asynchronous environments like FastAPI entrypoints.

    To support high concurrency, it creates a new REQ socket for each request
    rather than sharing a single one (which would cause ZMQ state errors).
    c                 C   s   d | _ d | _d S r'   )r(   r   r*   r"   r"   r#   r,   „   s   
zAsyncSchedulerClient.__init__r   c                 C   sD   | j d ur| j jst d¡ |  ¡  || _tj ¡ | _ t 	d¡ d S )Nz=AsyncSchedulerClient is already initialized. Re-initializing.z9AsyncSchedulerClient initialized with zmq.asyncio.Context)
r(   r.   r   r/   r0   r   r
   r   r   r7   )r+   r   r"   r"   r#   r8   ˆ   s   ÿzAsyncSchedulerClient.initializer9   r:   c                 Ã   s¼   | j du r
tdƒ‚| j  tj¡}| tjd¡ | tjd¡ | jj	}| 
|¡ z0z| t |¡¡I dH  | ¡ I dH }t |¡W W | ¡  S  tjjyX   t d¡ tdƒ‚w | ¡  w )r;   NzAAsyncSchedulerClient is not initialized. Call initialize() first.r   r-   r<   r=   )r(   ÚRuntimeErrorr   r
   r1   r2   r3   r4   r   r5   r6   r   r   r   r   r   r0   r   r@   r   rA   )r+   r9   r   rH   r   r"   r"   r#   r   “   s(   €
ÿ

ü
þ
zAsyncSchedulerClient.forwardc                 Ã   sÆ   | j du s
| j jrt d¡ dS | j  tj¡}| tjd¡ | tj	d¡ | j
j}z3z!| |¡ | t ddi¡¡I dH  | ¡ I dH  W W | ¡  dS  tjjy]   Y W | ¡  dS w | ¡  w rC   )r(   r.   r   r   r   r
   r1   r2   r3   r4   r   r5   r6   r   r   r   r   r0   r@   rF   r"   r"   r#   rE   ­   s&   €

ýý
zAsyncSchedulerClient.pingc                 C   s   | j r| j  ¡  d| _ dS dS rI   )r(   rJ   r*   r"   r"   r#   r0   Å   s   

þzAsyncSchedulerClient.closeNrK   r"   r"   r"   r#   rQ   {   s    rQ   )r   Útypingr   r
   Úzmq.asyncioÚ)sglang.multimodal_gen.runtime.server_argsr   Ú1sglang.multimodal_gen.runtime.utils.logging_utilsr   rL   r   r$   r&   rQ   r   Úsync_scheduler_clientr"   r"   r"   r#   Ú<module>   s    "LR
