o
    `۷i                      @   sX  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	 d dl
mZ d dlZd dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZ dZ e!eZ"G dd deZ#dddedede j$de%de j&f
ddZ'e fde(de)de(fddZ*de+de,de(defdd Z-d!ej.j/j0d"efd#d$Z1defd%d&Z2dS )'    N)deepcopy)CallableListOptionalSequenceTuple)Mock)Server)RayActorErrorRayTaskError)DEFAULT_GRPC_SERVER_OPTIONS&RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_SSERVE_LOGGER_NAME)ResponseStatus)gRPCOptions)BackPressureErrorDeploymentUnavailableErrorgRPCStatusError)(add_RayServeAPIServiceServicer_to_serveri   c                	       sX   e Zd ZdZdddedeeeeef   f fddZ	de
ej f fd	d
Z  ZS )gRPCGenericServerzCustom gRPC server that will override all service method handlers.

    Original implementation see: https://github.com/grpc/grpc/blob/
        60c1701f87cacf359aa1ad785728549eeef1a4b0/src/python/grpcio/grpc/aio/_server.py
    Nextra_optionsservice_handler_factoryr   c                   s0   t  jd ddd d t|pg  d g | _|| _d S )N )thread_poolgeneric_handlersinterceptorsmaximum_concurrent_rpcscompressionoptions)super__init__r   generic_rpc_handlersr   )selfr   r   	__class__r   R/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/grpc_util.pyr!   '   s   

zgRPCGenericServer.__init__r"   c                    sr   i }|d }|j  D ]\}}|jd| j|dd| j|ddd}|||< q||d _ | j| t | dS )a   Override generic_rpc_handlers before adding to the gRPC server.

        This function will override all user defined handlers to have
            1. None `response_serializer` so the server can pass back the
            raw protobuf bytes to the user.
            2. `unary_unary` is always calling the unary function generated via
            `self.service_handler_factory`
            3. `unary_stream` is always calling the streaming function generated via
            `self.service_handler_factory`
        r   NF)service_methodstreamT)response_serializerunary_unaryunary_stream)_method_handlersitems_replacer   r"   appendr    add_generic_rpc_handlers)r#   r"   serve_rpc_handlersrpc_handlerr'   method_handlerserve_method_handlerr$   r   r&   r0   8   s$   

z*gRPCGenericServer.add_generic_rpc_handlers)__name__
__module____qualname____doc__r   r   r   r   strr!   r   grpcGenericRpcHandlerr0   __classcell__r   r   r$   r&   r       s    
r   F)enable_so_reuseportr   grpc_options
event_loopr=   returnc                   sx   ddl m} t| dtt|fgd}||d|j  t }tg|j D ]}||| q&|	 I dH  |
| S )zStart a gRPC server that handles requests with the service handler factory.

    Returns a task that blocks until the server exits (e.g., due to error).
    r   )add_grpc_addresszgrpc.so_reuseportr   z[::]:N)ray.serve._private.default_implrA   r   r9   intportr   r   grpc_servicer_func_callablestartcreate_taskwait_for_termination)r   r>   r?   r=   rA   servermock_servicerservicer_fnr   r   r&   start_grpc_serverY   s   rL   message
max_lengthc                 C   s,   t | |kr| S d}| d|t |  | S )zTruncate a message to avoid exceeding HTTP/2 trailer limits.

    gRPC status details are sent as part of HTTP/2 trailers, which have a fixed size limit.
    If the message (e.g., a stack trace) is too long, it can cause issues on the client side.
    z... [truncated]N)len)rM   rN   truncation_noticer   r   r&   _truncate_messagey   s   rQ   excrequest_timeout_s
request_idc                 C   sp  t | trd| d}ttjjd|dS t | tjr)d| d}ttjjd|dS t | t	r8ttjj
d| jdS t | trXt | trNtjd|  dd	id
 ttjjd| jdS t | tr| j}t |ttfrttjd| dd	id
 n
td| j d | jr| jnt|}t| jdt|dS t | ttfrtjd|  dd	id
 ntd ttjjdtt| dS )NzRequest timed out after zs.T)codeis_errorrM   zClient for request z disconnected.zRequest failed: log_to_stderrF)extraz.Request failed with user-set gRPC status code .z'Request failed due to unexpected error.)
isinstanceTimeoutErrorr   r:   
StatusCodeDEADLINE_EXCEEDEDasyncioCancelledError	CANCELLEDr   RESOURCE_EXHAUSTEDrM   r   r   loggerwarningUNAVAILABLEr   original_exceptionr
   	exception	grpc_codegrpc_detailsr9   rQ   INTERNAL)rR   rS   rT   rM   original_excr   r   r&   get_grpc_response_status   sf   






rk   contextstatusc                 C   s0   |   s
| |j  |  s| |j d S d S N)rU   set_codedetailsset_detailsrM   )rl   rm   r   r   r&   set_grpc_code_and_details   s
   	rr   c                 C   s(   t | pt } | jstr| jpt| _| S rn   )r   r   rS   r   )r>   r   r   r&   set_proxy_default_grpc_options   s
   
rs   )3r^   loggingcopyr   typingr   r   r   r   r   unittest.mockr   r:   grpc.aio._serverr	   ray.exceptionsr
   r   ray.serve._private.constantsr   r   r   )ray.serve._private.proxy_request_responser   ray.serve.configr   ray.serve.exceptionsr   r   r   "ray.serve.generated.serve_pb2_grpcr   GRPC_MAX_STATUS_DETAILS_LENGTH	getLoggerrb   r   AbstractEventLoopboolTaskrL   r9   rC   rQ   BaseExceptionfloatrk   _cythoncygrpc_ServicerContextrr   rs   r   r   r   r&   <module>   sf    
>
!

>
