o
    $iw                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=m>Z>m?Z?m@Z@ d dlAmBZB d dlCmDZDmEZEmFZF eGe:ZHeddG dd dZIdeJde%fddZK	dPd ee d!eLdee$ fd"d#ZMG d$d% d%ZNd&d' ZOG d(d) d)e'ZPG d*d+ d+ZQd,eddfd-d.ZRd/ejdeSfd0d1ZTG d2d3 d3ZUd4ede!gfd5d6ZVG d7d8 d8ZWd9e#d4ee de#fd:d;ZXd<d=d9e#d>eBd?e jYd@eSde jZf
dAdBZ[dCe\dDe]dEe^de<fdFdGZ_dHe<dIeSdee$ fdJdKZ`d>eBdeBfdLdMZad>eBdeBfdNdOZbdS )Q    N)deque)deepcopy)	dataclass)	AnyAsyncGenerator	AwaitableCallableListOptionalTupleTypeUnion)FastAPI)jsonable_encoder)version)MutableHeaders)
Middleware)ASGIAppMessageReceiveScopeSend)Config)
LifespanOn)is_ipv6)IS_PYDANTIC_2)RayActorErrorRayTaskError)RequestMetadata)#RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S)RAY_SERVE_HTTP_PROXY_CALLBACK_IMPORT_PATH&RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_SSERVE_HTTP_REQUEST_ID_HEADERSERVE_LOGGER_NAME)ResponseStatus)call_function_from_import_pathgenerate_request_idserve_encoders)HTTPOptions)BackPressureErrorDeploymentUnavailableErrorRayServeExceptionT)frozenc                   @   sP   e Zd ZU eed< eed< eed< deeeef fddZde	j
jfddZd	S )
ASGIArgsscopereceivesendreturnc                 C   s   | j | j| jfS N)r.   r/   r0   self r5   Y/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/serve/_private/http_util.pyto_args_tupleB   s   zASGIArgs.to_args_tuplec                 C   s   t jj|   S r2   )	starletterequestsRequestr7   r3   r5   r5   r6   to_starlette_requestE   s   zASGIArgs.to_starlette_requestN)__name__
__module____qualname__r   __annotations__r   r   r   r7   r8   r9   r:   r;   r5   r5   r5   r6   r-   <   s   
 r-   serialized_bodyr1   c                    s   d  fdd}|S )zAReturns an ASGI receiver that returns the provided buffered body.Fc                     s,    rt  } |  I d H  d dddS )NThttp.requestF)bodytype	more_body)asyncioEventwait)block_foreverreceivedr@   r5   r6   mock_receiveR   s   z0make_buffered_asgi_receive.<locals>.mock_receiver5   )r@   rK   r5   rI   r6   make_buffered_asgi_receiveK   s   rL      objstatus_codec                 C   s   d}d}| du rd}d}n&t | tr| }d}nt | tr$| d}d}ntjt| tddd }d	}d
|d|ggdd|dgS )zSerializes the provided object and converts it to ASGI messages.

    These ASGI messages can be sent via an ASGI `send` interface to comprise an HTTP
    response.
    N    s
   text/plainzutf-8s   text/plain; charset=utf-8)custom_encoder),:)
separatorss   application/jsonhttp.response.starts   content-type)rC   statusheaderszhttp.response.body)rC   rB   )
isinstancebytesstrencodejsondumpsr   r'   )rN   rO   rB   content_typer5   r5   r6   convert_object_to_asgi_messagesb   s0   



r_   c                   @   s"   e Zd ZdZdddZdd ZdS )	ResponseaQ  ASGI compliant response class.

    It is expected to be called in async context and pass along
    `scope, receive, send` as in ASGI spec.

    >>> from ray.serve.http_util import Response  # doctest: +SKIP
    >>> scope, receive = ... # doctest: +SKIP
    >>> await Response({"k": "v"}).send(scope, receive, send) # doctest: +SKIP
    NrM   c                 C   s   t ||d| _dS )zConstruct a HTTP Response based on input type.

        Args:
            content: Any JSON serializable object.
            status_code (int, optional): Default status code is 200.
        )rN   rO   N)r_   	_messages)r4   contentrO   r5   r5   r6   __init__   s   zResponse.__init__c                    s    | j D ]	}||I d H  qd S r2   )ra   )r4   r.   r/   r0   messager5   r5   r6   r0      s   
zResponse.sendNrM   )r<   r=   r>   __doc__rc   r0   r5   r5   r5   r6   r`      s    

r`   c                    sN   g }d}|r"| I d H }|d dksJ |d }| |d  |sd|S )NTrC   rA   rD   rB   rP   )appendjoin)r.   r/   r0   body_bufferrD   rd   r5   r5   r6   receive_http_body   s   
rj   c                   @   s   e Zd ZdZdd Zdd ZdefddZd	efd
dZ	d	efddZ
dd Zdee fddZdefddZdejdeee df fddZdS )MessageQueueaq  Queue enables polling for received or sent messages.

    Implements the ASGI `Send` interface.

    This class:
        - Is *NOT* thread safe and should only be accessed from a single asyncio
          event loop.
        - Assumes a single consumer of the queue (concurrent calls to
          `get_messages_nowait` and `wait_for_message` is undefined behavior).
    c                 C   s"   t  | _t | _d| _d | _d S )NF)r   _message_queuerE   rF   _new_message_event_closed_errorr3   r5   r5   r6   rc      s   

zMessageQueue.__init__c                 C   s   d| _ | j  dS )zClose the queue, rejecting new messages.

        Once the queue is closed, existing messages will be returned from
        `get_messages_nowait` and subsequent calls to `wait_for_message` will
        always return immediately.
        TN)rn   rm   setr3   r5   r5   r6   close   s   zMessageQueue.closeec                 C   
   || _ d S r2   )ro   )r4   rr   r5   r5   r6   	set_error      
zMessageQueue.set_errorrd   c                 C   s   | j | | j  d S r2   )rl   rg   rm   rp   r4   rd   r5   r5   r6   
put_nowait   s   zMessageQueue.put_nowaitc                    s   | j rtd| | dS )zSend a message, putting it on the queue.

        `RuntimeError` is raised if the queue has been closed using `.close()`.
        z6New messages cannot be sent after the queue is closed.N)rn   RuntimeErrorrw   rv   r5   r5   r6   __call__   s   zMessageQueue.__call__c                    s    | j s| j I dH  dS dS )a   Wait until at least one new message is available.

        If a message is available, this method will return immediately on each call
        until `get_messages_nowait` is called.

        After the queue is closed using `.close()`, this will always return
        immediately.
        N)rn   rm   rG   r3   r5   r5   r6   wait_for_message   s   	zMessageQueue.wait_for_messager1   c                 C   s>   g }t | jdkr|| j  t | jdks	| j  |S )a  Returns all messages that are currently available (non-blocking).

        At least one message will be present if `wait_for_message` had previously
        returned and a subsequent call to `wait_for_message` blocks until at
        least one new message is available.
        r   )lenrl   rg   popleftrm   clear)r4   messagesr5   r5   r6   get_messages_nowait   s   
z MessageQueue.get_messages_nowaitc                    s   | j r| j | j I dH  t| jdkr,| j }t| jdkr*| js*| j  |S t| jdkr9| j r9| j t| jdkrE| jrGtdS dS )a  This blocks until a message is ready.

        This method should not be used together with get_messages_nowait.
        Please use either `get_one_message` or `get_messages_nowait`.

        Raises:
            StopAsyncIteration: if the queue is closed and there are no
                more messages.
            Exception (self._error): if there are no more messages in
                the queue and an error has been set.
        Nr   )	ro   rm   rG   r{   rl   r|   rn   r}   StopAsyncIteration)r4   msgr5   r5   r6   get_one_message   s   

zMessageQueue.get_one_messagecall_futNc                 C  s   d}zL	 t |  }t j||gt jdI dH \}}|  }|r$|V  ||v r)nq| }|dur5|dW | s>|  |durL| sN|  dS dS dS | sX|  |dure| sf|  w w w )a  Repeatedly consume messages from the queue and yield them.

        This is used to fetch queue messages in the system event loop in
        a thread-safe manner.

        Args:
            call_fut: The async Future pointing to the task from the user
                code event loop that is pushing messages onto the queue.

        Yields:
            List[Any]: Messages from the queue.
        NT)return_when)	rE   create_taskrz   rG   FIRST_COMPLETEDr   	exceptiondonecancel)r4   r   wait_for_msg_taskr   _r~   rr   r5   r5   r6   fetch_messages_from_queue  s8   

z&MessageQueue.fetch_messages_from_queue)r<   r=   r>   rf   rc   rq   BaseExceptionrt   r   rw   ry   rz   r	   r   r   rE   Futurer   r   r   r5   r5   r5   r6   rk      s    

rk   c                	   @   sl   e Zd ZdZdededeegee f fddZ	de
fdd	Zedejfd
dZdd Zde
fddZdS )ASGIReceiveProxyzProxies ASGI receive from an actor.

    The `receive_asgi_messages` callback will be called repeatedly to fetch messages
    until a disconnect message is received.
    r.   request_metadatareceive_asgi_messagesc                 C   s&   |d | _ d | _|| _|| _d | _d S )NrC   )_type_queue_request_metadata_receive_asgi_messages_disconnect_message)r4   r.   r   r   r5   r5   r6   rc   J  s
   

zASGIReceiveProxy.__init__r1   c                 C   s   | j dkr
dddS ddiS )a7  Return the appropriate disconnect message based on the connection type.

        HTTP ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

        WS ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event-ws
        	websocketwebsocket.disconnecti  )rC   coderC   http.disconnect)r   r3   r5   r5   r6   _get_default_disconnect_messageW  s
   
	z0ASGIReceiveProxy._get_default_disconnect_messagec                 C   s   | j d u r
t | _ | j S r2   )r   rE   Queuer3   r5   r5   r6   queuei  s   

zASGIReceiveProxy.queuec              
      s   	 z&|  | jI dH }t|D ]}| j| |d dv r&|| _ W dS qW n/ ty?   |  }| j| || _Y dS  t	yW } z| j| W Y d}~dS d}~ww q)a  Fetch messages repeatedly until a disconnect message is received.

        If a disconnect message is received, this function exits and returns it.

        If an exception occurs, it will be raised on the next __call__ and no more
        messages will be received.
        TNrC   >   r   r   )
r   r   pickleloadsr   rw   r   KeyErrorr   	Exception)r4   pickled_messagesrd   rr   r5   r5   r6   fetch_until_disconnectp  s0   
z'ASGIReceiveProxy.fetch_until_disconnectc                    s>   | j  r| jdur| jS | j  I dH }t|tr||S )zReturn the next message once available.

        This will repeatedly return a disconnect message once it's been received.
        N)r   emptyr   getrX   r   rv   r5   r5   r6   ry     s   
zASGIReceiveProxy.__call__N)r<   r=   r>   rf   r   r   r   r   rY   rc   r   r   propertyrE   r   r   r   ry   r5   r5   r5   r6   r   C  s    
 r   clsc                    s  ddl m}m} ddlm m dd } fdd| jD }| }|D ]O}| j| |j}t	
|}	t|	j }
t|
dkrFtd|
d }|j||d	}|gd
d |
dd D  }|	j|d}||j_|j_|j| q%| | t | jD ].}t| fsqtst| r|jr|jj|j_t|jdd}|dur|kr| qfdd| jD | jdd< dS )a_  Transform the `cls`'s methods and class annotations to FastAPI routes.

    Modified from
    https://github.com/dmontagu/fastapi-utils/blob/master/fastapi_utils/cbv.py

    Usage:
    >>> from fastapi import FastAPI
    >>> app = FastAPI() # doctest: +SKIP
    >>> class A: # doctest: +SKIP
    ...     @app.route("/{i}") # doctest: +SKIP
    ...     def func(self, i: int) -> str: # doctest: +SKIP
    ...         return self.dep + i # doctest: +SKIP
    >>> # just running the app won't work, here.
    >>> make_fastapi_class_based_view(app, A) # doctest: +SKIP
    >>> # now app can be run properly
    r   )	APIRouterDepends)APIRouteAPIWebSocketRoutec                     s   ddl m}  |  jS )Nr   serve)rayr   get_replica_contextservable_objectr   r5   r5   r6   get_current_servable_instance  s   
zDmake_fastapi_class_based_view.<locals>.get_current_servable_instancec                    s6   g | ] t  frt fd djD r qS )c                 3   s,    | ]}|t ur jj|jd  V  qdS ).N)objectendpointr>   
startswith).0baserouter5   r6   	<genexpr>  s    z;make_fastapi_class_based_view.<locals>.<listcomp>.<genexpr>)rX   any__mro__)r   )r   r   r   r   r6   
<listcomp>  s    z1make_fastapi_class_based_view.<locals>.<listcomp>zOMethods in FastAPI class-based view must have ``self`` as their first argument.)defaultc                 S   s   g | ]
}|j tjjd qS ))kind)replaceinspect	ParameterKEYWORD_ONLY)r   	parameterr5   r5   r6   r     s       N)
parameters
_serve_clsc                    s   g | ]}| vr|qS r5   r5   )r   r)routes_to_remover5   r6   r     s    )fastapir   r   fastapi.routingr   r   routesremover   r   	signaturelistr   valuesr{   r+   r   __signature__r   rg   include_routerrX   r   response_modelresponse_fieldouter_type_secure_cloned_response_fieldgetattr)fastapi_appr   r   r   r   class_method_routes
new_routerr   old_endpointold_signatureold_parametersold_self_parameternew_self_parameternew_parametersnew_signature	serve_clsr5   )r   r   r   r   r6   make_fastapi_class_based_view  sZ   





"r   sockc              
   C   s   z%|  tjtjd ttdr|  tjtjd W dS |  tjdd W dS  tyA } ztd| d W Y d}~dS d}~ww )	zMutate a socket object to allow multiple process listening on the same port.

    Returns:
        success: whether the setting was successful.
    r   SO_REUSEPORT   Tz'Setting SO_REUSEPORT failed because of z. SO_REUSEPORT is disabled.NF)	
setsockoptsocket
SOL_SOCKETSO_REUSEADDRhasattrr   r   loggerdebug)r   rr   r5   r5   r6   set_socket_reuse_port  s   
	
r   c                	   @   s|   e Zd ZdZdeeef fddZedefddZ	ede
e fdd	Zd
d Zdededede
e fddZdd ZdS )ASGIAppReplicaWrapperz;Provides a common wrapper for replicas running an ASGI app.app_or_funcc                 C   s@   t |r
| | _n|| _tt| jdd d dd| _t| j_d S )NonF)lifespan	log_level
log_config
access_log)r   
isfunction	_asgi_appr   r   _serve_asgi_lifespanr   )r4   r   r5   r5   r6   rc   .  s   

zASGIAppReplicaWrapper.__init__r1   c                 C   s   | j S r2   r   r3   r5   r5   r6   appE  s   zASGIAppReplicaWrapper.appc                 C   s   t | jtr
| jjS d S r2   )rX   r   r   docs_urlr3   r5   r5   r6   	docs_pathI  s   zASGIAppReplicaWrapper.docs_pathc                    sf   ddl m} || jjtjd | j I d H  | jjr!tdW d    d S 1 s,w   Y  d S )Nr   LoggingContextlevelz=ASGI lifespan startup failed. Check replica logs for details.)	 ray.serve._private.logging_utilsr   r   r   loggingWARNINGstartupshould_exitrx   r4   r   r5   r5   r6   _run_asgi_lifespan_startupN  s   "z0ASGIAppReplicaWrapper._run_asgi_lifespan_startupr.   r/   r0   c                    s   |  |||I dH  dS )z Calls into the wrapped ASGI app.Nr   )r4   r.   r/   r0   r5   r5   r6   ry   Z  s   zASGIAppReplicaWrapper.__call__c                    sV   ddl m} || jjtjd | j I d H  W d    d S 1 s$w   Y  d S )Nr   r   r   )r  r   r   r   r  r  shutdownr  r5   r5   r6   __del__i  s
   "zASGIAppReplicaWrapper.__del__N)r<   r=   r>   rf   r   r   r   rc   r   r   r
   rZ   r   r  r   r   r   ry   r	  r5   r5   r5   r6   r   +  s$    
r   middlewaresc                 C   sN   | du rg } t | tstd| D ]}tt|ts$tdt| dq| S )a4  Validate the return value of HTTP proxy callback.

    Middlewares should be a list of Starlette middlewares. If it is None, we
    will treat it as an empty list. If it is not a list, we will raise an
    error. If it is a list, we will check if all the items in the list are
    Starlette middlewares.
    Nz@HTTP proxy callback must return a list of Starlette middlewares.zMHTTP proxy callback must return a list of Starlette middlewares, instead got z type item in the list.)rX   r   
ValueError
issubclassrC   r   )r
  
middlewarer5   r5   r6   #validate_http_proxy_callback_returnr  s   
r  c                   @   s0   e Zd ZdefddZdededefddZd	S )
RequestIdMiddlewarer   c                 C   rs   r2   )_app)r4   r   r5   r5   r6   rc     ru   zRequestIdMiddleware.__init__r.   r/   r0   c                    s\   t |d}|t  d u rt  |t  dtf fdd}| |||I d H  d S )Nr.   rd   c                    sJ   | d dkrt | d}|d  | d dkr | d< | I d H  d S )NrC   rU   r  zX-Request-IDzwebsocket.accept)r   rg   )rd   rW   
request_idr0   r5   r6   send_with_request_id  s   
z:RequestIdMiddleware.__call__.<locals>.send_with_request_id)r   r   r"   r&   rg   r   r  )r4   r.   r/   r0   rW   r  r5   r  r6   ry     s   

zRequestIdMiddleware.__call__N)	r<   r=   r>   r   rc   r   r   r   ry   r5   r5   r5   r6   r    s    r  r   c                 C   s`   t tg| D ]&}ttjtdk r|j| fi |j} q|j| g|jR i |j	} q| S )zyWrap the ASGI app with the provided middlewares.

    The built-in RequestIdMiddleware will always be applied first.
    z0.35.0)
r   r  r   parser8   __version__r   optionsargskwargs)r   r
  r  r5   r5   r6   _apply_middlewares  s   r  F)enable_so_reuseporthttp_options
event_loopr  c                   sR  t  |j tt|jrtjntjtj}|rt| z|	|j|j
f W n tyB } ztd|j d|j
 d|d}~ww tjtd_i }|jrz|jrz|j|jd}|jra|j|d< |jri|j|d< td	|j d|j
 d
|j  tjtj fddfd|j|j
|j|j|ddddd
|d}dd |_||j|gdS )z{Start an HTTP server to run the ASGI app.

    Returns a task that blocks until the server exits (e.g., due to error).
    zFailed to bind to address 'rS   '.Nzuvicorn.error)ssl_keyfilessl_certfilessl_keyfile_passwordssl_ca_certszStarting HTTPS server on z with SSL certificate: c                      s    S r2   r5   r5   r   r5   r6   <lambda>      z(start_asgi_http_server.<locals>.<lambda>ToffF)
factoryhostport	root_pathtimeout_keep_aliveloopr   r   r   r   )configc                   S   s   d S r2   r5   r5   r5   r5   r6   r$    r%  )sockets)r  r
  r   r   r(  AF_INET6AF_INETSOCK_STREAMr   bindr)  OSErrorrx   r  CRITICAL	getLoggerr   r  r   r!  r"  r   infouvicornServerr   r*  keep_alive_timeout_sinstall_signal_handlersr   r   )r   r  r  r  r   rr   
ssl_kwargsserverr5   r#  r6   start_asgi_http_server  sj   



r=  excrequest_timeout_sr  c                 C   s   t | trtddd| d| ddS t | tjr+d| d}t| td	d|dS t | ttfrKt | t	rCtj
d
|  ddid tdd| jdS t | tt	fr_tj
d
|  ddid ntd tddt| dS )N  TzRequest z timed out after zs.)r   is_errorrd   zClient for request z" disconnected, cancelling request.i  zRequest failed: log_to_stderrF)extra  z'Request failed due to unexpected error.i  )rX   TimeoutErrorr$   rE   CancelledErrorr   r6  r)   r*   r   warningrd   r   r   rZ   )r>  r?  r  rd   r5   r5   r6   get_http_response_status  s<   



rH  rV   response_startedc                 C   s"   |s| j dvr	g S t| j| j dS )N)r@  rD  )rO   )r   r_   rd   )rV   rI  r5   r5   r6   send_http_response_on_exception-  s   rJ  c                 C   s@   t | } tpddkrt| _| jstr| jpt| _| jpg | _| S )z7Enhanced configuration with component-specific options.r   )r   r   r9  r?  r!   r
  r  r5   r5   r6   $configure_http_options_with_defaults8  s   
rL  c                 C   s6   t | } trtdt d | jttt | S )Nz1Calling user-provided callback from import path 'r  )r   r    r   r6  r
  extendr  r%   rK  r5   r5   r6   configure_http_middlewaresL  s   rN  re   )crE   r   r\   r  r   r   collectionsr   copyr   dataclassesr   typingr   r   r   r   r	   r
   r   r   r   r8   r7  r   r   fastapi.encodersr   	packagingr   starlette.datastructuresr   starlette.middlewarer   starlette.typesr   r   r   r   r   uvicorn.configr   uvicorn.lifespan.onr   ray._common.network_utilsr   ray._common.pydantic_compatr   ray.exceptionsr   r   ray.serve._private.commonr   ray.serve._private.constantsr   r    r!   r"   r#   )ray.serve._private.proxy_request_responser$   ray.serve._private.utilsr%   r&   r'   ray.serve.configr(   ray.serve.exceptionsr)   r*   r+   r5  r   r-   rY   rL   intr_   r`   rj   rk   r   r   boolr   r   r  r  r  AbstractEventLoopTaskr=  r   floatrZ   rH  rJ  rL  rN  r5   r5   r5   r6   <module>   s    ,

' \oG

M
&
