o
    $ip                     @   s6  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	 d dl
mZ d dlZd dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZ eeZ G dd deZ!dddedede j"de#de j$f
ddZ%de&de'de(defddZ)dej*j+j,defddZ-defd d!Z.dS )"    N)deepcopy)CallableListOptionalSequenceTuple)Mock)Server)RayActorErrorRayTaskError)DEFAULT_GRPC_SERVER_OPTIONS&RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_SSERVE_LOGGER_NAME)ResponseStatus)gRPCOptions)BackPressureErrorDeploymentUnavailableError)(add_RayServeAPIServiceServicer_to_serverc                	       sX   e Zd ZdZdddedeeeeef   f fddZ	de
ej f fd	d
Z  ZS )gRPCGenericServerzCustom gRPC server that will override all service method handlers.

    Original implementation see: https://github.com/grpc/grpc/blob/
        60c1701f87cacf359aa1ad785728549eeef1a4b0/src/python/grpcio/grpc/aio/_server.py
    Nextra_optionsservice_handler_factoryr   c                   s0   t  jd ddd d t|pg  d g | _|| _d S )N )thread_poolgeneric_handlersinterceptorsmaximum_concurrent_rpcscompressionoptions)super__init__r   generic_rpc_handlersr   )selfr   r   	__class__r   Y/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/serve/_private/grpc_util.pyr       s   

zgRPCGenericServer.__init__r!   c                    sr   i }|d }|j  D ]\}}|jd| j|dd| j|ddd}|||< q||d _ | j| t | dS )a   Override generic_rpc_handlers before adding to the gRPC server.

        This function will override all user defined handlers to have
            1. None `response_serializer` so the server can pass back the
            raw protobuf bytes to the user.
            2. `unary_unary` is always calling the unary function generated via
            `self.service_handler_factory`
            3. `unary_stream` is always calling the streaming function generated via
            `self.service_handler_factory`
        r   NF)service_methodstreamT)response_serializerunary_unaryunary_stream)_method_handlersitems_replacer   r!   appendr   add_generic_rpc_handlers)r"   r!   serve_rpc_handlersrpc_handlerr&   method_handlerserve_method_handlerr#   r   r%   r/   0   s$   

z*gRPCGenericServer.add_generic_rpc_handlers)__name__
__module____qualname____doc__r   r   r   r   strr    r   grpcGenericRpcHandlerr/   __classcell__r   r   r#   r%   r      s    
r   F)enable_so_reuseportr   grpc_options
event_loopr<   returnc                   sx   ddl m} t| dtt|fgd}||d|j  t }tg|j D ]}||| q&|	 I dH  |
| S )zStart a gRPC server that handles requests with the service handler factory.

    Returns a task that blocks until the server exits (e.g., due to error).
    r   )add_grpc_addresszgrpc.so_reuseportr   z[::]:N)ray.serve._private.default_implr@   r   r8   intportr   r   grpc_servicer_func_callablestartcreate_taskwait_for_termination)r   r=   r>   r<   r@   servermock_servicerservicer_fnr   r   r%   start_grpc_serverQ   s   rK   excrequest_timeout_s
request_idc                 C   s   t | trd| d}ttjjd|dS t | tjr)d| d}ttjjd|dS t | t	r8ttjj
d| jdS t | trXt | trNtjd|  dd	id
 ttjjd| jdS t | ttfrltjd|  dd	id
 ntd ttjjdt| dS )NzRequest timed out after zs.T)codeis_errormessagezClient for request z disconnected.zRequest failed: log_to_stderrF)extraz'Request failed due to unexpected error.)
isinstanceTimeoutErrorr   r9   
StatusCodeDEADLINE_EXCEEDEDasyncioCancelledError	CANCELLEDr   RESOURCE_EXHAUSTEDrQ   r   r   loggerwarningUNAVAILABLEr
   	exceptionINTERNALr8   )rL   rM   rN   rQ   r   r   r%   get_grpc_response_statusq   sH   




ra   contextstatusc                 C   s0   |   s
| |j  |  s| |j d S d S N)rO   set_codedetailsset_detailsrQ   )rb   rc   r   r   r%   set_grpc_code_and_details   s
   	rh   c                 C   s(   t | pt } | jstr| jpt| _| S rd   )r   r   rM   r   )r=   r   r   r%   set_proxy_default_grpc_options   s
   
ri   )/rX   loggingcopyr   typingr   r   r   r   r   unittest.mockr   r9   grpc.aio._serverr	   ray.exceptionsr
   r   ray.serve._private.constantsr   r   r   )ray.serve._private.proxy_request_responser   ray.serve.configr   ray.serve.exceptionsr   r   "ray.serve.generated.serve_pb2_grpcr   	getLoggerr\   r   AbstractEventLoopboolTaskrK   BaseExceptionfloatr8   ra   _cythoncygrpc_ServicerContextrh   ri   r   r   r   r%   <module>   sT    
>
 
+
