o
    پi                     @   s8  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZmZmZmZmZ ddlZddlZddlZddlmZmZmZmZmZmZmZmZ ddlmZm Z  ddl!m"Z"m#Z#m$Z$ ddl%m&Z& e'e(Z)G dd	 d	Z*G d
d dZ+ej,G dd dZ-G dd dZ.dd Z/dS )z
gRPC Request Manager - Orchestrates request lifecycle without tokenization.
Mimics TokenizerManager's state management and ZMQ communication patterns.
    N)AnyAsyncGeneratorDictListOptionalUnion)AbortReqBatchEmbeddingOutputBatchTokenIDOutputGetLoadsReqInputGetLoadsReqOutputHealthCheckOutputTokenizedEmbeddingReqInputTokenizedGenerateReqInput)PortArgs
ServerArgs)get_or_create_event_loopget_zmq_socketkill_process_tree)get_exception_tracebackc                   @   sR   e Zd ZdZdZddejdefddZefde	d	e
e fd
dZdefddZdS )_GrpcCommunicatorz
    Communicator for request/response patterns with scheduler.

    Thread-safe and handles the async request/response cycle with proper
    timeout handling to prevent hangs if the scheduler becomes unresponsive.
    g      >@   senderfan_outc                 C   s&   || _ || _d | _d | _t | _d S N)_sender_fan_out_result_event_result_valuesasyncioLock_lock)selfr   r    r#   X/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/grpc/grpc_request_manager.py__init__1   s
   z_GrpcCommunicator.__init__timeoutreturnc              	      s   | j 4 I dH : t | _g | _|r| j| z!tj| j |dI dH  | jW d| _d| _W  d  I dH  S d| _d| _w 1 I dH sHw   Y  dS )aW  
        Send request and wait for response(s).

        Args:
            obj: Request object to send to scheduler
            timeout: Maximum time to wait for response (seconds)

        Returns:
            List of response objects from scheduler(s)

        Raises:
            asyncio.TimeoutError: If no response within timeout
        N)r&   )	r!   r   Eventr   r   r   
send_pyobjwait_forwait)r"   objr&   r#   r#   r$   __call__8   s   
z_GrpcCommunicator.__call__recv_objc                 C   sJ   | j dur| jdur!| j | t| j | jkr#| j  dS dS dS dS )z
        Handle received response from scheduler.

        Called by handle_loop when a matching response type is received.
        Safe to call even if no request is pending (will be ignored).
        N)r   r   appendlenr   set)r"   r.   r#   r#   r$   handle_recvX   s   z_GrpcCommunicator.handle_recvN)r   )__name__
__module____qualname____doc__DEFAULT_TIMEOUTzmqSocketintr%   floatr   r   r-   r2   r#   r#   r#   r$   r   '   s     r   c                   @   s,   e Zd ZdZdd Zd	ddZd	ddZdS )
GrpcSignalHandlerzTMinimal signal handler for gRPC server - delegates real crash handling to scheduler.c                 C   s
   || _ d S r   )grpc_manager)r"   r=   r#   r#   r$   r%   h   s   
zGrpcSignalHandler.__init__Nc                 C   s$   t d|d|d d| j_dS )z7Handle SIGTERM by gracefully shutting down gRPC server.zSIGTERM received. signum=z frame=z. Shutting down gRPC server...TN)loggerwarningr=   gracefully_exitr"   signumframer#   r#   r$   sigterm_handlerk   s   z!GrpcSignalHandler.sigterm_handlerc                 C   s(   t d t d tt dd dS )z-Handle SIGQUIT from failed scheduler process.zUReceived SIGQUIT from scheduler process. Scheduler failed, shutting down gRPC server.zLNote: Crash dumps are handled by the scheduler process, not the gRPC server.Tinclude_parentN)r>   errorinfor   osgetpidrA   r#   r#   r$   running_phase_sigquit_handlerr   s   z/GrpcSignalHandler.running_phase_sigquit_handlerNN)r3   r4   r5   r6   r%   rD   rK   r#   r#   r#   r$   r<   e   s
    
r<   c                   @   s  e Zd ZU dZeed< eejj	 ed< e
jed< eed< e
jed< eeef ed< eed< d	Zeed
< d	Zeed< d	Zeed< dZeed< d	Zeed< d	Zeed< dZeed< dZeed< ejedZee ed< ejedZ ee ed< ejedZ!ee ed< ejedZ"ee ed< ejedZ#ee ed< ejedZ$eee  ed< ejedZ%eee  ed< ejedZ&eee  ed< ejedZ'eee  ed< dZ(ee ed< dZ)eed < dS )!GrpcReqStatez"State tracking for a gRPC request.
request_idgrpc_context	out_queuefinishedeventr,   created_time        finished_timefirst_token_time	last_timer   last_completion_tokensfinished_time_perffirst_token_time_perfFstream_finishedinput_logprobs_sent)default_factory
output_idsinput_token_logprobs_valinput_token_logprobs_idxoutput_token_logprobs_valoutput_token_logprobs_idxinput_top_logprobs_valinput_top_logprobs_idxoutput_top_logprobs_valoutput_top_logprobs_idxN
session_idis_session_request)*r3   r4   r5   r6   str__annotations__r   grpcaioServicerContextr   Queueboolr(   r   r   r   r;   rU   rV   rW   rX   r:   rY   rZ   r[   r\   dataclassesfieldlistr^   r   r_   r`   ra   rb   rc   rd   re   rf   rg   rh   r#   r#   r#   r$   rM   ~   s8   
 

rM   c                   @   s  e Zd ZdZ	d6dedefddZ		d7dedee	 d	ee
jj d
eeeee f df fddZ		d7dedee	 d	ee
jj fddZde	fddZ	d6dedee	 d
ejfddZde	d
efddZdd ZdededefddZdefddZde fddZ!d e"fd!d"Z#d#e$fd$d%Z%d&d' Z&d(d) Z'd*d+ Z(d
ee	e)f fd,d-Z*	d6d.ee	 d/ee d
ee+ fd0d1Z,d2d3 Z-d4d5 Z.dS )8GrpcRequestManagerzx
    Manages gRPC request lifecycle, mimicking TokenizerManager's orchestration
    behaviors without tokenization.
    Nserver_args	port_argsc                 C   s   || _ || _tjd| _t| jtj|jdd| _	t| jtj
|jdd| _i | _t | _d| _d| _d| _d| _t | _t | _g | _d| _|| _t| j|jd| _td|j d|j  | jrotd	|j   dS dS )
z$Initialize the gRPC request manager.   T)bindFN)r   z2GrpcRequestManager initialized with ZMQ IPC: recv=z, send=z6Bootstrap server initialized for disaggregation mode: )!rt   ru   r8   r   Contextcontextr   PULLdetokenizer_ipc_namerecv_from_schedulerPUSHscheduler_input_ipc_namesend_to_schedulerrid_to_stater1   asyncio_tasksr@   no_create_loop
event_loopis_pause	Conditionis_pause_condtimelast_receive_tstampcrash_dump_request_listcrash_dump_performedbootstrap_serverr   dp_sizeget_loads_communicatorr>   rH   disaggregation_mode)r"   rt   ru   r   r#   r#   r$   r%      sJ   

zGrpcRequestManager.__init__r,   rN   rO   r'   c              	   C  s  t |jdd}|dkr | |||2 z	3 dH W }|V  q6 dS td| d |du r6dt j }n|}td|  t|}t|j|_d|j_	d|j_
| || d	|2 z3 dH W }q^6 td
|  td| d g }	g }
t|D ](}t|}t|j|_d|j_
| d| }|
| |	| ||| qt |dd}|std| d g }|	D ]}|2 z3 dH W }|| q6 q|V  dS td| d dd t|
D }i }|	D ]}t| }|||< q|rPtj| tjdI dH \}}|D ]?}||}z-|I dH }t|tr0|dd}||v r0|| |d< |V  t| }|||< W q tyK   Y qw |sdS dS )a  
        Submit a generation request to the scheduler with n>1 parallel sampling support.

        This method implements the same two-phase approach as tokenizer_manager.py:
        1. Phase 1: Send prefix caching request (max_new_tokens=0)
        2. Phase 2: Send n generation requests that reuse the cached prefix

        Yields individual responses for streaming, or aggregated responses for non-streaming.
        nr   NzMultiple sampling request (n=z), using two-phase approachgrpc-z$Phase 1: Caching prefix for request r   z-prefixz%Phase 1 completed: Prefix cached for zPhase 2: Generating z parallel requests-streamFzNon-streaming mode: collecting z
 responseszStreaming mode: multiplexing z streamsc                 S   s   i | ]\}}||qS r#   r#   ).0iridr#   r#   r$   
<dictcomp>@  s    z7GrpcRequestManager.generate_request.<locals>.<dictcomp>)return_whenrN    index)getattrsampling_params_handle_single_requestr>   debuguuiduuid4hexcopymax_new_tokensr   ranger/   	enumerater   create_task	__anext__r+   keysFIRST_COMPLETEDpop
isinstancedictgetStopAsyncIteration)r"   r,   rN   rO   r   responsebase_request_id
prefix_obj_
generatorsrequest_idsr   gen_objgen_request_id	is_stream	responses	generatorrid_to_indextask_maptaskdoneresponse_rid	next_taskr#   r#   r$   generate_request   s   








z#GrpcRequestManager.generate_requestc              	   C  s>  |du rdt  j }||_t||t dt |t d}t	|dr1|j
r1|j
j|_d|_|| j|< | | z]| |I dH  t|dd}	 z*|j I dH }|rY|V  t|trt|ddrt|sr| }|j|d	< |V  W nW n tjy   td
| d | |I dH   w qKW | | dS | | w )z@Handle a single request - core implementation without n>1 logic.Nr   FrN   rO   rP   rQ   rR   r,   rS   session_paramsTr   rQ   	token_idszRequest z cancelled by client)r   r   r   r   rM   r   rn   r(   r   hasattrr   rg   rh   r   record_request_for_crash_dump_send_to_schedulerr   rP   r   r   r   r   r^   CancelledErrorr>   rH   abort_request_cleanup_request_state)r"   r,   rN   rO   stater   r   final_responser#   r#   r$   r   c  sR   




z)GrpcRequestManager._handle_single_requestc                 C   s   || j v r| j |= dS dS )z9Clean up local request state (does not notify scheduler).N)r   )r"   rN   r#   r#   r$   r     s   
z)GrpcRequestManager._cleanup_request_statec              
      s   du rdt  j |_tdt dt |t dj	< t
  z
|I dH  W n tyQ } zj	=  |  W  Y d}~S d}~ww  fdd}t|   S )z
        Submit an embedding request to the scheduler.
        Returns a future that will contain the embedding result.
        Nzgrpc-embed-Fr   c               
      s   z=zj  I d H  j I d H }  |  W n ty0 } z | W Y d }~nd }~ww W jv r=j= d S d S jv rHj= w r   )rR   r+   rP   r   
set_result	Exceptionset_exceptionr   )resultefuturerN   r"   r   r#   r$   wait_for_result  s   

z=GrpcRequestManager.embedding_request.<locals>.wait_for_result)r   r   r   r   rM   r   rn   r(   r   r   Futurer   r   r   r   )r"   r,   rN   r   r   r#   r   r$   embedding_request  s4   


z$GrpcRequestManager.embedding_requestc              
      s   | drdS | j|}|rd|_d|_td| d t|d}z| |I dH  td|  W dS  t	yR } zt
d	|  W Y d}~dS d}~ww )
zAbort a running request.

        Sends abort request to scheduler and marks local state as finished
        to stop processing any further outputs from the scheduler.
        HEALTH_CHECKFTzMarked request z as aborted locally)r   Nz$Sent abort to scheduler for request z+Failed to send abort request to scheduler: )
startswithr   r   rQ   r[   r>   r   r   r   r   rG   )r"   rN   r   	abort_reqr   r#   r#   r$   r     s$   

z GrpcRequestManager.abort_requestc              
      s  | j sz| j I dH }t | _| jr@| j4 I dH  | jr+| j I dH  | js W d  I dH  n1 I dH s;w   Y  t|t	rN| 
|I dH  n@t|tr\| |I dH  n2t|trj| |I dH  n$t|trx| |I dH  nt|tr| j| n
tdt|  W nj tjjy   | j rY dS Y q tjjy } z%| j rtd|  W Y d}~dS td| dt   W Y d}~dS d}~w ty } ztd| dt   | j rW Y d}~dS W Y d}~nd}~ww | j rdS dS )zt
        Main event loop - processes outputs from scheduler.
        Mimics TokenizerManager's handle_loop.
        NzUnknown output type: z&ZMQ recv interrupted during shutdown: zZMQ error in handle loop: 
zHandle loop error: )r@   r|   
recv_pyobjr   r   r   r   r+   r   r
   _handle_batch_outputr	   _handle_embedding_outputr   _handle_health_check_outputr   _handle_abort_reqr   r   r2   r>   r?   typer8   rG   AgainZMQErrorr   r   r   )r"   r.   r   r#   r#   r$   handle_loop  sX   
(




zGrpcRequestManager.handle_loopr   	batch_outbatch_indexc                 C   s   |j du rdS t|j dkr |j |j |  |j|j|  |j|j|  |j|j|  |jjdkret|jdkrQ|j|j|  |j	|j	|  |j
|j
|  |j|j|  dS dS )z
        Convert and accumulate logprobs from batch output to state.
        Follows the same logic as tokenizer_manager.convert_logprob_style.
        Nr   )r_   r0   extendr`   ra   rb   r,   top_logprobs_numrc   rd   re   rf   )r"   r   r   r   r#   r#   r$   _convert_logprob_style0  s<   
z)GrpcRequestManager._convert_logprob_stylec              	      s  g }g }t   }t  }t jD ]"\}|jvrqj| }|jr.td|  q|jdkr9||_||_	||_
| jrE j ng  j du jrT j nd jr] j nd jrf j nd j rq j nddd}|jjr|  |jjr|jjdkr|jr|jjr|js|j|j|j|jd|d< d	|_n|jjs|d
 r|j|j|j|jd|d< |jjr jrt jk r|jjr fdd}	 j |	d|	d|	dd|d< n|d
 r|j|j|j|jd|d< |d r|j|d  | |j!"| |d
 r5d	|_||_#||_$d	|_%|j&'  fdd}
| t()|
| q|rFt(j*|dd	iI dH  dS dS )z.Handle batch generation output from scheduler.z$Skipping output for aborted request rT   Nr   )prompt_tokenscompletion_tokenscached_tokensfinish_reason)rN   r   rQ   	meta_info)token_logprobs_valtoken_logprobs_idxtop_logprobs_valtop_logprobs_idxinput_logprobsTrQ   c                    s(   t  | d }|rt|k r| S g S r   )r   r0   )	attr_namesource_list)r   r   r#   r$   get_part  s   z9GrpcRequestManager._handle_batch_output.<locals>.get_partrb   re   rf   output_logprobsr   c                    s,   t dI d H  |  jv r j| = d S d S )Ng      @)r   sleepr   )rN   r"   r#   r$   cleanup  s
   
z8GrpcRequestManager._handle_batch_output.<locals>.cleanupreturn_exceptions)+r   perf_counterr   ridsr   rQ   r>   r   rV   rZ   rW   r^   finished_reasonsr   r   r   r,   return_logprobr   logprob_start_lenr_   r   r\   r`   rc   rd   ra   r0   rb   re   rf   r   r/   rP   putrU   rY   r[   rR   r1   r   r   gather)r"   r   	put_taskscleanup_tasksnownow_perf_counterr   r   output_datar   r   r#   )r   r   r"   r$   r   b  s   




		



z'GrpcRequestManager._handle_batch_outputc                    s   t |jD ]D\}}|| jvrq| j| }||j| |jr"|j| nd|jr+|j| ndd}|j|I dH  d|_t		 |_
t	 |_|j  qdS )z-Handle batch embedding output from scheduler.r   N)rN   	embeddingr   r   T)r   r   r   
embeddingsr   r   rP   r   rQ   r   rU   r   rY   rR   r1   )r"   r   r   r   r   r   r#   r#   r$   r     s$   



z+GrpcRequestManager._handle_embedding_output
health_outc                    s   |j }|| jvrtd|  dS | j| }|dt|dr"|jndt|dr+|jndd}|j|I dH  d|_	t

 |_t
 |_|j  dS )	z*Handle health check output from scheduler.z)Health check output for unknown request: NT
output_strr   r   stop)rN   healthyoutput_textr   )r   r   r>   r?   r   r  r   rP   r   rQ   r   rU   r   rY   rR   r1   )r"   r  r   r   r   r#   r#   r$   r     s$   



z.GrpcRequestManager._handle_health_check_outputr.   c                    s   |j dr	dS |j | jvrtd|j  d dS | j|j  }d|_d|_|jr=|j |jddd|j |jdd	}n|j dd|j d
dddddd	}|j	
|I dH  |j  td|j   dS )a  Handle abort request from scheduler.

        The scheduler sends AbortReq back to notify us that a request was aborted,
        either due to explicit abort_request() call or scheduler-initiated abort
        (priority preemption, queue full, KV cache pressure, etc).
        r   NzAbort request for zB not in local state (may have already finished or not started yet)TmessagezRequest aborted)idr   )rN   rG   rQ   r   abortzAbort before prefill)r   r  r   )r  r   r   r   zHandled abort request for )r   r   r   r>   r   rQ   r[   finished_reasonr   rP   r   rR   r1   )r"   r.   r   abort_responser#   r#   r$   r   ,  sB   
z$GrpcRequestManager._handle_abort_reqc              
      s@   z	| j | W dS  ty } z	td|   d}~ww )z(Send an object to the scheduler via ZMQ.zFailed to send to scheduler: N)r   r)   r   r>   rG   )r"   r,   r   r#   r#   r$   r   i  s   z%GrpcRequestManager._send_to_schedulerc                 C   s<   t | jdk r| jt t|ddt|jd dS dS )z(Record request for potential crash dump.d   r   unknown)r   rN   r   N)r0   r   r/   r   r   r   r3   )r"   r,   r#   r#   r$   r   q  s   
z0GrpcRequestManager.record_request_for_crash_dumpc              
      sh  t d d| _t| jD ]
}| s|  q| jr*tjt| jddiI dH  t| j	
 D ]\}}|jsL|jdddI dH  d|_|j  q1| jr^tjt| jddiI dH  | jrt d zt| jdrt| jjr}| j I dH  n| j  W n ty } zt d	|  W Y d}~nd}~ww | j  | j  | j  t d
 dS )z(Gracefully shutdown the request manager.z Shutting down GrpcRequestManagerTr   NzServer shutting down)rG   shutdownzShutting down bootstrap serverr  z&Error shutting down bootstrap server: z$GrpcRequestManager shutdown complete)r>   rH   r@   rr   r   r   cancelr   r   r   itemsrQ   rP   r   rR   r1   r   r   iscoroutinefunctionr  r   r?   r|   closer   ry   term)r"   r   r   r   r   r#   r#   r$   r  |  sF   







zGrpcRequestManager.shutdownc                 C   s   t | j| j| jdS )z)Get server information for health checks.)active_requestspausedlast_receive_time)r0   r   r   r   r   r#   r#   r$   get_server_info  s   z"GrpcRequestManager.get_server_infoincludedp_rankc                    s<   t | d}| |I dH } dur fdd|D }|S )a  
        Get comprehensive load metrics from the scheduler.

        This method uses the communicator pattern to send GetLoadsReqInput to the
        scheduler and wait for GetLoadsReqOutput responses.

        Args:
            include: List of metric sections to include (core, memory, spec, lora, disagg, queues, all)
            dp_rank: Optional DP rank filter (None for all ranks)

        Returns:
            List of GetLoadsReqOutput objects, one per scheduler/DP rank
        )r  r  Nc                    s   g | ]	}|j  kr|qS r#   r  )r   rr  r#   r$   
<listcomp>  s    z0GrpcRequestManager.get_loads.<locals>.<listcomp>)r   r   )r"   r  r  reqresultsr#   r  r$   	get_loads  s   zGrpcRequestManager.get_loadsc                 C   s   | j rdS d| _ t }| j|t| j || _t	 t
 u r6t| }|tj|j |tj|j | j|t| j dS )zWAutomatically create and start the handle_loop task, matching TokenizerManager pattern.NT)r   r   r   addr   print_exception_wrapperr   r   	threadingcurrent_threadmain_threadr<   add_signal_handlersignalSIGTERMrD   SIGQUITrK   sigterm_watchdog)r"   loopsignal_handlerr#   r#   r$   auto_create_handle_loop  s"   z*GrpcRequestManager.auto_create_handle_loopc                    s&   | j stdI dH  | j rdS dS )zIWatchdog to handle SIGTERM gracefully, matching TokenizerManager pattern.g      ?N)r@   r   r   r   r#   r#   r$   r.    s   z#GrpcRequestManager.sigterm_watchdogr   rL   )/r3   r4   r5   r6   r   r   r%   r   r   ri   rk   rl   rm   r   r   r   r   r   r   r   r   r   r   r   ro   r   r   rM   r
   r:   r   r   r	   r   r   r   r   r   r   r   r  r   r  r   r$  r1  r.  r#   r#   r#   r$   rs      s    	
@

x

@
83
2 !=0	
rs   c                    sz   z	|  I dH  W dS  t y<   t }td|  t| dr,t| jtr,| j  t	t
 dd td Y dS w )zt
    Sometimes an asyncio function does not print exception.
    We do another wrapper to handle the exception.
    Nz%GrpcRequestManager hit an exception: __self__TrE   r   )r   r   r>   rG   r   r   r2  rs   dump_requests_before_crashr   rI   rJ   sysexit)func	tracebackr#   r#   r$   r&    s   
r&  )0r6   r   r   rp   loggingrI   r+  r4  r'  r   r   typingr   r   r   r   r   r   rk   r8   zmq.asynciosglang.srt.managers.io_structr   r	   r
   r   r   r   r   r   sglang.srt.server_argsr   r   sglang.srt.utilsr   r   r   sglang.utilsr   	getLoggerr3   r>   r   r<   	dataclassrM   rs   r&  r#   r#   r#   r$   <module>   s@     (

>,      I