o
    `۷i}                     @   s>  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@mAZAmBZB d dlCmDZD d dlEmFZFmGZGmHZH eIe:ZJeddG dd dZKdeLde%fddZM	 dVd!ee d"eNdee$ fd#d$ZOG d%d& d&ZPd'd( ZQG d)d* d*e'ZRG d+d, d,ZSd-eddfd.d/ZTd0ejdeUfd1d2ZVG d3d4 d4ZWd5ede!gfd6d7ZXG d8d9 d9ZYd:e#d5ee de#fd;d<ZZd:e#d=e[fd>d?Z\d:e#d=e[fd@dAZ]dBdCd:e#dDeDdEe j^dFeUde j_f
dGdHZ`dIeadJebdKe[de>fdLdMZcdNe>dOeUdee$ fdPdQZddDeDdeDfdRdSZedDeDdeDfdTdUZfdS )W    N)deque)deepcopy)	dataclass)	AnyAsyncGenerator	AwaitableCallableListOptionalTupleTypeUnion)FastAPI)jsonable_encoder)version)MutableHeaders)
Middleware)ASGIAppMessageReceiveScopeSend)Config)
LifespanOn)is_ipv6)IS_PYDANTIC_2)RayActorErrorRayTaskError)RequestMetadata)#RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S)RAY_SERVE_HTTP_PROXY_CALLBACK_IMPORT_PATH&RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_SSERVE_HTTP_REQUEST_ID_HEADERSERVE_LOGGER_NAME)warn_if_deprecated_env_var_set)ResponseStatus)call_function_from_import_pathgenerate_request_idserve_encoders)HTTPOptions)BackPressureErrorDeploymentUnavailableErrorRayServeExceptionT)frozenc                   @   sP   e Zd ZU eed< eed< eed< deeeef fddZde	j
jfddZd	S )
ASGIArgsscopereceivesendreturnc                 C   s   | j | j| jfS Nr/   r0   r1   self r7   R/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/http_util.pyto_args_tupleC   s   zASGIArgs.to_args_tuplec                 C   s   t jj|   S r3   )	starletterequestsRequestr9   r5   r7   r7   r8   to_starlette_requestF   s   zASGIArgs.to_starlette_requestN)__name__
__module____qualname__r   __annotations__r   r   r   r9   r:   r;   r<   r=   r7   r7   r7   r8   r.   =   s   
 r.   serialized_bodyr2   c                    s   d  fdd}|S )zAReturns an ASGI receiver that returns the provided buffered body.Fc                     s,    rt  } |  I d H  d dddS )NThttp.requestF)bodytype	more_body)asyncioEventwait)block_foreverreceivedrB   r7   r8   mock_receiveS   s   z0make_buffered_asgi_receive.<locals>.mock_receiver7   )rB   rM   r7   rK   r8   make_buffered_asgi_receiveL   s   rN      objstatus_codec                 C   s   d}d}| du rd}d}n&t | tr| }d}nt | tr$| d}d}ntjt| tddd }d	}d
|d|ggdd|dgS )zSerializes the provided object and converts it to ASGI messages.

    These ASGI messages can be sent via an ASGI `send` interface to comprise an HTTP
    response.
    N    s
   text/plainzutf-8s   text/plain; charset=utf-8)custom_encoder),:)
separatorss   application/jsonhttp.response.starts   content-type)rE   statusheaderszhttp.response.body)rE   rD   )
isinstancebytesstrencodejsondumpsr   r(   )rP   rQ   rD   content_typer7   r7   r8   convert_object_to_asgi_messagesc   s0   



ra   c                   @   s"   e Zd ZdZdddZdd ZdS )	ResponseaQ  ASGI compliant response class.

    It is expected to be called in async context and pass along
    `scope, receive, send` as in ASGI spec.

    >>> from ray.serve.http_util import Response  # doctest: +SKIP
    >>> scope, receive = ... # doctest: +SKIP
    >>> await Response({"k": "v"}).send(scope, receive, send) # doctest: +SKIP
    NrO   c                 C   s   t ||d| _dS )zConstruct a HTTP Response based on input type.

        Args:
            content: Any JSON serializable object.
            status_code (int, optional): Default status code is 200.
        )rP   rQ   N)ra   	_messages)r6   contentrQ   r7   r7   r8   __init__   s   zResponse.__init__c                    s    | j D ]	}||I d H  qd S r3   )rc   )r6   r/   r0   r1   messager7   r7   r8   r1      s   
zResponse.sendNrO   )r>   r?   r@   __doc__re   r1   r7   r7   r7   r8   rb      s    

rb   c                    sN   g }d}|r"| I d H }|d dksJ |d }| |d  |sd|S )NTrE   rC   rF   rD   rR   )appendjoin)r/   r0   r1   body_bufferrF   rf   r7   r7   r8   receive_http_body   s   
rl   c                   @   s   e Zd ZdZdd Zdd ZdefddZd	efd
dZ	d	efddZ
dd Zdee fddZdefddZdejdeee df fddZdS )MessageQueueaq  Queue enables polling for received or sent messages.

    Implements the ASGI `Send` interface.

    This class:
        - Is *NOT* thread safe and should only be accessed from a single asyncio
          event loop.
        - Assumes a single consumer of the queue (concurrent calls to
          `get_messages_nowait` and `wait_for_message` is undefined behavior).
    c                 C   s"   t  | _t | _d| _d | _d S )NF)r   _message_queuerG   rH   _new_message_event_closed_errorr5   r7   r7   r8   re      s   

zMessageQueue.__init__c                 C   s   d| _ | j  dS )zClose the queue, rejecting new messages.

        Once the queue is closed, existing messages will be returned from
        `get_messages_nowait` and subsequent calls to `wait_for_message` will
        always return immediately.
        TN)rp   ro   setr5   r7   r7   r8   close   s   zMessageQueue.closeec                 C   
   || _ d S r3   )rq   )r6   rt   r7   r7   r8   	set_error      
zMessageQueue.set_errorrf   c                 C   s   | j | | j  d S r3   )rn   ri   ro   rr   r6   rf   r7   r7   r8   
put_nowait   s   zMessageQueue.put_nowaitc                    s   | j rtd| | dS )zSend a message, putting it on the queue.

        `RuntimeError` is raised if the queue has been closed using `.close()`.
        z6New messages cannot be sent after the queue is closed.N)rp   RuntimeErrorry   rx   r7   r7   r8   __call__   s   zMessageQueue.__call__c                    s    | j s| j I dH  dS dS )a   Wait until at least one new message is available.

        If a message is available, this method will return immediately on each call
        until `get_messages_nowait` is called.

        After the queue is closed using `.close()`, this will always return
        immediately.
        N)rp   ro   rI   r5   r7   r7   r8   wait_for_message   s   	zMessageQueue.wait_for_messager2   c                 C   s>   g }t | jdkr|| j  t | jdks	| j  |S )a  Returns all messages that are currently available (non-blocking).

        At least one message will be present if `wait_for_message` had previously
        returned and a subsequent call to `wait_for_message` blocks until at
        least one new message is available.
        r   )lenrn   ri   popleftro   clear)r6   messagesr7   r7   r8   get_messages_nowait   s   
z MessageQueue.get_messages_nowaitc                    s   | j r| j | j I dH  t| jdkr,| j }t| jdkr*| js*| j  |S t| jdkr9| j r9| j t| jdkrE| jrGtdS dS )a  This blocks until a message is ready.

        This method should not be used together with get_messages_nowait.
        Please use either `get_one_message` or `get_messages_nowait`.

        Raises:
            StopAsyncIteration: if the queue is closed and there are no
                more messages.
            Exception (self._error): if there are no more messages in
                the queue and an error has been set.
        Nr   )	rq   ro   rI   r}   rn   r~   rp   r   StopAsyncIteration)r6   msgr7   r7   r8   get_one_message   s   

zMessageQueue.get_one_messagecall_futNc                 C  s   d}zL	 t |  }t j||gt jdI dH \}}|  }|r$|V  ||v r)nq| }|dur5|dW | s>|  |durL| sN|  dS dS dS | sX|  |dure| sf|  w w w )a  Repeatedly consume messages from the queue and yield them.

        This is used to fetch queue messages in the system event loop in
        a thread-safe manner.

        Args:
            call_fut: The async Future pointing to the task from the user
                code event loop that is pushing messages onto the queue.

        Yields:
            List[Any]: Messages from the queue.
        NT)return_when)	rG   create_taskr|   rI   FIRST_COMPLETEDr   	exceptiondonecancel)r6   r   wait_for_msg_taskr   _r   rt   r7   r7   r8   fetch_messages_from_queue  s8   

z&MessageQueue.fetch_messages_from_queue)r>   r?   r@   rh   re   rs   BaseExceptionrv   r   ry   r{   r|   r	   r   r   rG   Futurer   r   r   r7   r7   r7   r8   rm      s    

rm   c                	   @   sl   e Zd ZdZdededeegee f fddZ	de
fdd	Zedejfd
dZdd Zde
fddZdS )ASGIReceiveProxyzProxies ASGI receive from an actor.

    The `receive_asgi_messages` callback will be called repeatedly to fetch messages
    until a disconnect message is received.
    r/   request_metadatareceive_asgi_messagesc                 C   s&   |d | _ d | _|| _|| _d | _d S )NrE   )_type_queue_request_metadata_receive_asgi_messages_disconnect_message)r6   r/   r   r   r7   r7   r8   re   K  s
   

zASGIReceiveProxy.__init__r2   c                 C   s   | j dkr
dddS ddiS )a7  Return the appropriate disconnect message based on the connection type.

        HTTP ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

        WS ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event-ws
        	websocketwebsocket.disconnecti  )rE   coderE   http.disconnect)r   r5   r7   r7   r8   _get_default_disconnect_messageX  s
   
	z0ASGIReceiveProxy._get_default_disconnect_messagec                 C   s   | j d u r
t | _ | j S r3   )r   rG   Queuer5   r7   r7   r8   queuej  s   

zASGIReceiveProxy.queuec              
      s   	 z&|  | jI dH }t|D ]}| j| |d dv r&|| _ W dS qW n/ ty?   |  }| j| || _Y dS  t	yW } z| j| W Y d}~dS d}~ww q)a  Fetch messages repeatedly until a disconnect message is received.

        If a disconnect message is received, this function exits and returns it.

        If an exception occurs, it will be raised on the next __call__ and no more
        messages will be received.
        TNrE   >   r   r   )
r   r   pickleloadsr   ry   r   KeyErrorr   	Exception)r6   pickled_messagesrf   rt   r7   r7   r8   fetch_until_disconnectq  s0   
z'ASGIReceiveProxy.fetch_until_disconnectc                    s>   | j  r| jdur| jS | j  I dH }t|tr||S )zReturn the next message once available.

        This will repeatedly return a disconnect message once it's been received.
        N)r   emptyr   getrZ   r   rx   r7   r7   r8   r{     s   
zASGIReceiveProxy.__call__N)r>   r?   r@   rh   r   r   r   r   r[   re   r   r   propertyrG   r   r   r   r{   r7   r7   r7   r8   r   D  s    
 r   clsc                    s  ddl m}m} ddlm m dd } fdd| jD }| }|D ]O}| j| |j}t	
|}	t|	j }
t|
dkrFtd|
d }|j||d	}|gd
d |
dd D  }|	j|d}||j_|j_|j| q%| | t | jD ].}t| fsqtst| r|jr|jj|j_t|jdd}|dur|kr| qfdd| jD | jdd< dS )a_  Transform the `cls`'s methods and class annotations to FastAPI routes.

    Modified from
    https://github.com/dmontagu/fastapi-utils/blob/master/fastapi_utils/cbv.py

    Usage:
    >>> from fastapi import FastAPI
    >>> app = FastAPI() # doctest: +SKIP
    >>> class A: # doctest: +SKIP
    ...     @app.route("/{i}") # doctest: +SKIP
    ...     def func(self, i: int) -> str: # doctest: +SKIP
    ...         return self.dep + i # doctest: +SKIP
    >>> # just running the app won't work, here.
    >>> make_fastapi_class_based_view(app, A) # doctest: +SKIP
    >>> # now app can be run properly
    r   )	APIRouterDepends)APIRouteAPIWebSocketRoutec                     s   ddl m}  |  jS )Nr   serve)rayr   get_replica_contextservable_objectr   r7   r7   r8   get_current_servable_instance  s   
zDmake_fastapi_class_based_view.<locals>.get_current_servable_instancec                    s6   g | ] t  frt fd djD r qS )c                 3   s,    | ]}|t ur jj|jd  V  qdS ).N)objectendpointr@   
startswith).0baserouter7   r8   	<genexpr>  s    z;make_fastapi_class_based_view.<locals>.<listcomp>.<genexpr>)rZ   any__mro__)r   )r   r   r   r   r8   
<listcomp>  s    z1make_fastapi_class_based_view.<locals>.<listcomp>zOMethods in FastAPI class-based view must have ``self`` as their first argument.)defaultc                 S   s   g | ]
}|j tjjd qS ))kind)replaceinspect	ParameterKEYWORD_ONLY)r   	parameterr7   r7   r8   r     s       N)
parameters
_serve_clsc                    s   g | ]}| vr|qS r7   r7   )r   r)routes_to_remover7   r8   r     s    )fastapir   r   fastapi.routingr   r   routesremover   r   	signaturelistr   valuesr}   r,   r   __signature__r   ri   include_routerrZ   r   response_modelresponse_fieldouter_type_secure_cloned_response_fieldgetattr)fastapi_appr   r   r   r   class_method_routes
new_routerr   old_endpointold_signatureold_parametersold_self_parameternew_self_parameternew_parametersnew_signature	serve_clsr7   )r   r   r   r   r8   make_fastapi_class_based_view  sZ   





"r   sockc              
   C   s   z%|  tjtjd ttdr|  tjtjd W dS |  tjdd W dS  tyA } ztd| d W Y d}~dS d}~ww )	zMutate a socket object to allow multiple process listening on the same port.

    Returns:
        success: whether the setting was successful.
    r   SO_REUSEPORT   Tz'Setting SO_REUSEPORT failed because of z. SO_REUSEPORT is disabled.NF)	
setsockoptsocket
SOL_SOCKETSO_REUSEADDRhasattrr   r   loggerdebug)r   rt   r7   r7   r8   set_socket_reuse_port  s   
	
r   c                	   @   s|   e Zd ZdZdeeef fddZedefddZ	ede
e fdd	Zd
d Zdededede
e fddZdd ZdS )ASGIAppReplicaWrapperz;Provides a common wrapper for replicas running an ASGI app.app_or_funcc                 C   s@   t |r
| | _n|| _tt| jdd d dd| _t| j_d S )NonF)lifespan	log_level
log_config
access_log)r   
isfunction	_asgi_appr   r   _serve_asgi_lifespanr   )r6   r   r7   r7   r8   re   /  s   

zASGIAppReplicaWrapper.__init__r2   c                 C   s   | j S r3   r   r5   r7   r7   r8   appF  s   zASGIAppReplicaWrapper.appc                 C   s   t | jtr
| jjS d S r3   )rZ   r   r   docs_urlr5   r7   r7   r8   	docs_pathJ  s   zASGIAppReplicaWrapper.docs_pathc                    sf   ddl m} || jjtjd | j I d H  | jjr!tdW d    d S 1 s,w   Y  d S )Nr   LoggingContextlevelz=ASGI lifespan startup failed. Check replica logs for details.)	 ray.serve._private.logging_utilsr   r   r   loggingWARNINGstartupshould_exitrz   r6   r   r7   r7   r8   _run_asgi_lifespan_startupO  s   "z0ASGIAppReplicaWrapper._run_asgi_lifespan_startupr/   r0   r1   c                    s   |  |||I dH  dS )z Calls into the wrapped ASGI app.Nr   )r6   r/   r0   r1   r7   r7   r8   r{   [  s   zASGIAppReplicaWrapper.__call__c                    sV   ddl m} || jjtjd | j I d H  W d    d S 1 s$w   Y  d S )Nr   r   r  )r  r   r   r   r  r  shutdownr  r7   r7   r8   __del__j  s
   "zASGIAppReplicaWrapper.__del__N)r>   r?   r@   rh   r   r   r   re   r   r   r
   r\   r   r	  r   r   r   r{   r  r7   r7   r7   r8   r   ,  s$    
r   middlewaresc                 C   sN   | du rg } t | tstd| D ]}tt|ts$tdt| dq| S )a4  Validate the return value of HTTP proxy callback.

    Middlewares should be a list of Starlette middlewares. If it is None, we
    will treat it as an empty list. If it is not a list, we will raise an
    error. If it is a list, we will check if all the items in the list are
    Starlette middlewares.
    Nz@HTTP proxy callback must return a list of Starlette middlewares.zMHTTP proxy callback must return a list of Starlette middlewares, instead got z type item in the list.)rZ   r   
ValueError
issubclassrE   r   )r  
middlewarer7   r7   r8   #validate_http_proxy_callback_returns  s   
r  c                   @   s0   e Zd ZdefddZdededefddZd	S )
RequestIdMiddlewarer   c                 C   ru   r3   )_app)r6   r   r7   r7   r8   re     rw   zRequestIdMiddleware.__init__r/   r0   r1   c                    s\   t |d}|t  d u rt  |t  dtf fdd}| |||I d H  d S )Nr/   rf   c                    sJ   | d dkrt | d}|d  | d dkr | d< | I d H  d S )NrE   rW   r  zX-Request-IDzwebsocket.accept)r   ri   )rf   rY   
request_idr1   r7   r8   send_with_request_id  s   
z:RequestIdMiddleware.__call__.<locals>.send_with_request_id)r   r   r"   r'   ri   r   r  )r6   r/   r0   r1   rY   r  r7   r  r8   r{     s   

zRequestIdMiddleware.__call__N)	r>   r?   r@   r   re   r   r   r   r{   r7   r7   r7   r8   r    s    r  r   c                 C   s`   t tg| D ]&}ttjtdk r|j| fi |j} q|j| g|jR i |j	} q| S )zyWrap the ASGI app with the provided middlewares.

    The built-in RequestIdMiddleware will always be applied first.
    z0.35.0)
r   r  r   parser:   __version__r   optionsargskwargs)r   r  r  r7   r7   r8   _apply_middlewares  s   r  	root_pathc                    s   s S  fdd}|S )z/Middleware to inject root_path to the ASGI app.c                    s,   | d dv r| d<  | ||I d H  d S )NrE   )httpr   r  r7   r4   r   r  r7   r8   scope_root_path_middleware  s   z5_inject_root_path.<locals>.scope_root_path_middlewarer7   )r   r  r   r7   r  r8   _inject_root_path  s   r!  c                 C   s@   |s| |fS t tj}|t dk r| |fS t| |} | dfS )a[  Handle root_path parameter across different uvicorn versions.

    For uvicorn >= 0.26.0, root_path must be injected into the ASGI scope
    rather than passed to uvicorn.Config, as uvicorn changed its behavior
    in version 0.26.0.

    Reference: https://uvicorn.dev/release-notes/#0260-january-16-2024

    Args:
        app: The ASGI application
        root_path: The root path prefix for all routes

    Returns:
        Tuple of (app, root_path) where:
        - app may be wrapped with middleware (for uvicorn >= 0.26.0)
        - root_path is "" for uvicorn >= 0.26.0, unchanged otherwise
    z0.26.0 )r   r  uvicornr  r!  )r   r  uvicorn_versionr7   r7   r8   _apply_root_path  s   
r%  F)enable_so_reuseporthttp_options
event_loopr&  c          	         s`  t  |j t |j\ }tt|jrtjntjtj	}|r%t
| z||j|jf W n tyJ } ztd|j d|j d|d}~ww tjtd_i }|jr|jr|j|jd}|jri|j|d< |jrq|j|d< td	|j d|j d
|j  tjtj fddfd|j|j||j|ddddd
|d}dd |_||j|gdS )z{Start an HTTP server to run the ASGI app.

    Returns a task that blocks until the server exits (e.g., due to error).
    zFailed to bind to address 'rU   '.Nzuvicorn.error)ssl_keyfilessl_certfilessl_keyfile_passwordssl_ca_certszStarting HTTPS server on z with SSL certificate: c                      s    S r3   r7   r7   r   r7   r8   <lambda>      z(start_asgi_http_server.<locals>.<lambda>ToffF)
factoryhostportr  timeout_keep_aliveloopr   r   r   r   )configc                   S   s   d S r3   r7   r7   r7   r7   r8   r/  .  r0  )sockets) r  r  r%  r  r   r   r3  AF_INET6AF_INETSOCK_STREAMr   bindr4  OSErrorrz   r  CRITICAL	getLoggerr  r*  r+  r,  r-  r   infor#  Serverr   keep_alive_timeout_sinstall_signal_handlersr   r   )	r   r'  r(  r&  r  r   rt   
ssl_kwargsserverr7   r.  r8   start_asgi_http_server  sl   



rF  excrequest_timeout_sr  c                 C   s   t | trtddd| d| ddS t | tjr+d| d}t| td	d|dS t | ttfrKt | t	rCtj
d
|  ddid tdd| jdS t | tt	fr_tj
d
|  ddid ntd tddt| dS )N  TzRequest z timed out after zs.)r   is_errorrf   zClient for request z" disconnected, cancelling request.i  zRequest failed: log_to_stderrF)extra  z'Request failed due to unexpected error.i  )rZ   TimeoutErrorr%   rG   CancelledErrorr   r@  r*   r+   r   warningrf   r   r   r\   )rG  rH  r  rf   r7   r7   r8   get_http_response_status3  s<   



rQ  rX   response_startedc                 C   s"   |s| j dvr	g S t| j| j dS )N)rI  rM  )rQ   )r   ra   rf   )rX   rR  r7   r7   r8   send_http_response_on_exceptionY  s   rS  c                 C   sH   t | } td tpddkrt| _| jstr| jpt| _| jp g | _| S )z7Enhanced configuration with component-specific options.r   r   )r   r$   r   rB  rH  r!   r  r'  r7   r7   r8   $configure_http_options_with_defaultsd  s   
rU  c                 C   s6   t | } trtdt d | jttt | S )Nz1Calling user-provided callback from import path 'r)  )r   r    r   r@  r  extendr  r&   rT  r7   r7   r8   configure_http_middlewares{  s   rW  rg   )grG   r   r^   r  r   r   collectionsr   copyr   dataclassesr   typingr   r   r   r   r	   r
   r   r   r   r:   r#  r   r   fastapi.encodersr   	packagingr   starlette.datastructuresr   starlette.middlewarer   starlette.typesr   r   r   r   r   uvicorn.configr   uvicorn.lifespan.onr   ray._common.network_utilsr   ray._common.pydantic_compatr   ray.exceptionsr   r   ray.serve._private.commonr   ray.serve._private.constantsr   r    r!   r"   r#   "ray.serve._private.constants_utilsr$   )ray.serve._private.proxy_request_responser%   ray.serve._private.utilsr&   r'   r(   ray.serve.configr)   ray.serve.exceptionsr*   r+   r,   r?  r   r.   r[   rN   intra   rb   rl   rm   r   r   boolr   r   r  r  r  r\   r!  r%  AbstractEventLoopTaskrF  r   floatrQ  rS  rU  rW  r7   r7   r7   r8   <module>   s    ,

' \oG
"
N
&
