o
    cis                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8 d dl9m:Z: d dl;m<Z<m=Z=m>Z> d dl?m@Z@ d dlAmBZBmCZCmDZD eEe8ZFeddG dd dZGdeHde%fddZI	dOdee d eJdee$ fd!d"ZKG d#d$ d$ZLd%d& ZMG d'd( d(e'ZNG d)d* d*ZOd+eddfd,d-ZPd.ejdeQfd/d0ZRG d1d2 d2ZSd3ede!gfd4d5ZTG d6d7 d7ZUd8e#d3ee de#fd9d:ZVd;d<d8e#d=e@d>e jWd?eQde jXf
d@dAZYdBeZdCe[dDe\de:fdEdFZ]dGe:dHeQdee$ fdIdJZ^d=e@de@fdKdLZ_d=e@de@fdMdNZ`dS )P    N)deque)deepcopy)	dataclass)	AnyAsyncGenerator	AwaitableCallableListOptionalTupleTypeUnion)FastAPI)jsonable_encoder)version)MutableHeaders)
Middleware)ASGIAppMessageReceiveScopeSend)Config)
LifespanOn)IS_PYDANTIC_2)RayActorErrorRayTaskError)RequestMetadata)#RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S)RAY_SERVE_HTTP_PROXY_CALLBACK_IMPORT_PATH&RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_SSERVE_HTTP_REQUEST_ID_HEADERSERVE_LOGGER_NAME)ResponseStatus)call_function_from_import_pathgenerate_request_idserve_encoders)HTTPOptions)BackPressureErrorDeploymentUnavailableErrorRayServeExceptionT)frozenc                   @   sP   e Zd ZU eed< eed< eed< deeeef fddZde	j
jfddZd	S )
ASGIArgsscopereceivesendreturnc                 C   s   | j | j| jfS N)r-   r.   r/   self r4   P/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/http_util.pyto_args_tupleA   s   zASGIArgs.to_args_tuplec                 C   s   t jj|   S r1   )	starletterequestsRequestr6   r2   r4   r4   r5   to_starlette_requestD   s   zASGIArgs.to_starlette_requestN)__name__
__module____qualname__r   __annotations__r   r   r   r6   r7   r8   r9   r:   r4   r4   r4   r5   r,   ;   s   
 r,   serialized_bodyr0   c                    s   d  fdd}|S )zAReturns an ASGI receiver that returns the provided buffered body.Fc                     s,    rt  } |  I d H  d dddS )NThttp.requestF)bodytype	more_body)asyncioEventwait)block_foreverreceivedr?   r4   r5   mock_receiveQ   s   z0make_buffered_asgi_receive.<locals>.mock_receiver4   )r?   rJ   r4   rH   r5   make_buffered_asgi_receiveJ   s   rK      objstatus_codec                 C   s   d}d}| du rd}d}n&t | tr| }d}nt | tr$| d}d}ntjt| tddd }d	}d
|d|ggdd|dgS )zSerializes the provided object and converts it to ASGI messages.

    These ASGI messages can be sent via an ASGI `send` interface to comprise an HTTP
    response.
    N    s
   text/plainzutf-8s   text/plain; charset=utf-8)custom_encoder),:)
separatorss   application/jsonhttp.response.starts   content-type)rB   statusheaderszhttp.response.body)rB   rA   )
isinstancebytesstrencodejsondumpsr   r&   )rM   rN   rA   content_typer4   r4   r5   convert_object_to_asgi_messagesa   s0   



r^   c                   @   s"   e Zd ZdZdddZdd ZdS )	ResponseaQ  ASGI compliant response class.

    It is expected to be called in async context and pass along
    `scope, receive, send` as in ASGI spec.

    >>> from ray.serve.http_util import Response  # doctest: +SKIP
    >>> scope, receive = ... # doctest: +SKIP
    >>> await Response({"k": "v"}).send(scope, receive, send) # doctest: +SKIP
    NrL   c                 C   s   t ||d| _dS )zConstruct a HTTP Response based on input type.

        Args:
            content: Any JSON serializable object.
            status_code (int, optional): Default status code is 200.
        )rM   rN   N)r^   	_messages)r3   contentrN   r4   r4   r5   __init__   s   zResponse.__init__c                    s    | j D ]	}||I d H  qd S r1   )r`   )r3   r-   r.   r/   messager4   r4   r5   r/      s   
zResponse.sendNrL   )r;   r<   r=   __doc__rb   r/   r4   r4   r4   r5   r_      s    

r_   c                    sN   g }d}|r"| I d H }|d dksJ |d }| |d  |sd|S )NTrB   r@   rC   rA   rO   )appendjoin)r-   r.   r/   body_bufferrC   rc   r4   r4   r5   receive_http_body   s   
ri   c                   @   s   e Zd ZdZdd Zdd ZdefddZd	efd
dZ	d	efddZ
dd Zdee fddZdefddZdejdeee df fddZdS )MessageQueueaq  Queue enables polling for received or sent messages.

    Implements the ASGI `Send` interface.

    This class:
        - Is *NOT* thread safe and should only be accessed from a single asyncio
          event loop.
        - Assumes a single consumer of the queue (concurrent calls to
          `get_messages_nowait` and `wait_for_message` is undefined behavior).
    c                 C   s"   t  | _t | _d| _d | _d S )NF)r   _message_queuerD   rE   _new_message_event_closed_errorr2   r4   r4   r5   rb      s   

zMessageQueue.__init__c                 C   s   d| _ | j  dS )zClose the queue, rejecting new messages.

        Once the queue is closed, existing messages will be returned from
        `get_messages_nowait` and subsequent calls to `wait_for_message` will
        always return immediately.
        TN)rm   rl   setr2   r4   r4   r5   close   s   zMessageQueue.closeec                 C   
   || _ d S r1   )rn   )r3   rq   r4   r4   r5   	set_error      
zMessageQueue.set_errorrc   c                 C   s   | j | | j  d S r1   )rk   rf   rl   ro   r3   rc   r4   r4   r5   
put_nowait   s   zMessageQueue.put_nowaitc                    s   | j rtd| | dS )zSend a message, putting it on the queue.

        `RuntimeError` is raised if the queue has been closed using `.close()`.
        z6New messages cannot be sent after the queue is closed.N)rm   RuntimeErrorrv   ru   r4   r4   r5   __call__   s   zMessageQueue.__call__c                    s    | j s| j I dH  dS dS )a   Wait until at least one new message is available.

        If a message is available, this method will return immediately on each call
        until `get_messages_nowait` is called.

        After the queue is closed using `.close()`, this will always return
        immediately.
        N)rm   rl   rF   r2   r4   r4   r5   wait_for_message   s   	zMessageQueue.wait_for_messager0   c                 C   s>   g }t | jdkr|| j  t | jdks	| j  |S )a  Returns all messages that are currently available (non-blocking).

        At least one message will be present if `wait_for_message` had previously
        returned and a subsequent call to `wait_for_message` blocks until at
        least one new message is available.
        r   )lenrk   rf   popleftrl   clear)r3   messagesr4   r4   r5   get_messages_nowait   s   
z MessageQueue.get_messages_nowaitc                    s   | j r| j | j I dH  t| jdkr,| j }t| jdkr*| js*| j  |S t| jdkr9| j r9| j t| jdkrE| jrGtdS dS )a  This blocks until a message is ready.

        This method should not be used together with get_messages_nowait.
        Please use either `get_one_message` or `get_messages_nowait`.

        Raises:
            StopAsyncIteration: if the queue is closed and there are no
                more messages.
            Exception (self._error): if there are no more messages in
                the queue and an error has been set.
        Nr   )	rn   rl   rF   rz   rk   r{   rm   r|   StopAsyncIteration)r3   msgr4   r4   r5   get_one_message   s   

zMessageQueue.get_one_messagecall_futNc                 C  s   d}zL	 t |  }t j||gt jdI dH \}}|  }|r$|V  ||v r)nq| }|dur5|dW | s>|  |durL| sN|  dS dS dS | sX|  |dure| sf|  w w w )a  Repeatedly consume messages from the queue and yield them.

        This is used to fetch queue messages in the system event loop in
        a thread-safe manner.

        Args:
            call_fut: The async Future pointing to the task from the user
                code event loop that is pushing messages onto the queue.

        Yields:
            List[Any]: Messages from the queue.
        NT)return_when)	rD   create_taskry   rF   FIRST_COMPLETEDr~   	exceptiondonecancel)r3   r   wait_for_msg_taskr   _r}   rq   r4   r4   r5   fetch_messages_from_queue  s8   

z&MessageQueue.fetch_messages_from_queue)r;   r<   r=   re   rb   rp   BaseExceptionrs   r   rv   rx   ry   r	   r~   r   rD   Futurer   r   r   r4   r4   r4   r5   rj      s    

rj   c                	   @   sl   e Zd ZdZdededeegee f fddZ	de
fdd	Zedejfd
dZdd Zde
fddZdS )ASGIReceiveProxyzProxies ASGI receive from an actor.

    The `receive_asgi_messages` callback will be called repeatedly to fetch messages
    until a disconnect message is received.
    r-   request_metadatareceive_asgi_messagesc                 C   s&   |d | _ d | _|| _|| _d | _d S )NrB   )_type_queue_request_metadata_receive_asgi_messages_disconnect_message)r3   r-   r   r   r4   r4   r5   rb   I  s
   

zASGIReceiveProxy.__init__r0   c                 C   s   | j dkr
dddS ddiS )a7  Return the appropriate disconnect message based on the connection type.

        HTTP ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

        WS ASGI spec:
            https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event-ws
        	websocketwebsocket.disconnecti  )rB   coderB   http.disconnect)r   r2   r4   r4   r5   _get_default_disconnect_messageV  s
   
	z0ASGIReceiveProxy._get_default_disconnect_messagec                 C   s   | j d u r
t | _ | j S r1   )r   rD   Queuer2   r4   r4   r5   queueh  s   

zASGIReceiveProxy.queuec              
      s   	 z8|  | jI dH }t|trt|}n
t|tr|n|g}|D ]}| j| |d dv r8|| _	 W dS q#W n/ t
yQ   |  }| j| || _	Y dS  tyi } z| j| W Y d}~dS d}~ww q)a  Fetch messages repeatedly until a disconnect message is received.

        If a disconnect message is received, this function exits and returns it.

        If an exception occurs, it will be raised on the next __call__ and no more
        messages will be received.
        TNrB   >   r   r   )r   r   rW   rX   pickleloadslistr   rv   r   KeyErrorr   	Exception)r3   pickled_messagesr}   rc   rq   r4   r4   r5   fetch_until_disconnecto  s<   

z'ASGIReceiveProxy.fetch_until_disconnectc                    s>   | j  r| jdur| jS | j  I dH }t|tr||S )zReturn the next message once available.

        This will repeatedly return a disconnect message once it's been received.
        N)r   emptyr   getrW   r   ru   r4   r4   r5   rx     s   
zASGIReceiveProxy.__call__N)r;   r<   r=   re   r   r   r   r   rX   rb   r   r   propertyrD   r   r   r   rx   r4   r4   r4   r5   r   B  s    
)r   clsc                    s  ddl m}m} ddlm m dd } fdd| jD }| }|D ]O}| j| |j}t	
|}	t|	j }
t|
dkrFtd|
d }|j||d	}|gd
d |
dd D  }|	j|d}||j_|j_|j| q%| | t | jD ].}t| fsqtst| r|jr|jj|j_t|jdd}|dur|kr| qfdd| jD | jdd< dS )a_  Transform the `cls`'s methods and class annotations to FastAPI routes.

    Modified from
    https://github.com/dmontagu/fastapi-utils/blob/master/fastapi_utils/cbv.py

    Usage:
    >>> from fastapi import FastAPI
    >>> app = FastAPI() # doctest: +SKIP
    >>> class A: # doctest: +SKIP
    ...     @app.route("/{i}") # doctest: +SKIP
    ...     def func(self, i: int) -> str: # doctest: +SKIP
    ...         return self.dep + i # doctest: +SKIP
    >>> # just running the app won't work, here.
    >>> make_fastapi_class_based_view(app, A) # doctest: +SKIP
    >>> # now app can be run properly
    r   )	APIRouterDepends)APIRouteAPIWebSocketRoutec                  S   s   ddl m}  |  jS )Nr   serve)rayr   get_replica_contextservable_objectr   r4   r4   r5   get_current_servable_instance  s   
zDmake_fastapi_class_based_view.<locals>.get_current_servable_instancec                    s,   g | ]}t | frj|jjv r|qS r4   )rW   r=   endpoint).0route)r   r   r   r4   r5   
<listcomp>  s    
	z1make_fastapi_class_based_view.<locals>.<listcomp>zOMethods in FastAPI class-based view must have ``self`` as their first argument.)defaultc                 S   s   g | ]
}|j tjjd qS ))kind)replaceinspect	ParameterKEYWORD_ONLY)r   	parameterr4   r4   r5   r     s       N)
parameters
_serve_clsc                    s   g | ]}| vr|qS r4   r4   )r   r)routes_to_remover4   r5   r   
  s    )fastapir   r   fastapi.routingr   r   routesremover   r   	signaturer   r   valuesrz   r*   r   __signature__r   rf   include_routerrW   r   response_modelresponse_fieldouter_type_secure_cloned_response_fieldgetattr)fastapi_appr   r   r   r   class_method_routes
new_routerr   old_endpointold_signatureold_parametersold_self_parameternew_self_parameternew_parametersnew_signature	serve_clsr4   )r   r   r   r   r5   make_fastapi_class_based_view  sZ   





"r   sockc              
   C   s   z%|  tjtjd ttdr|  tjtjd W dS |  tjdd W dS  tyA } ztd| d W Y d}~dS d}~ww )	zMutate a socket object to allow multiple process listening on the same port.

    Returns:
        success: whether the setting was successful.
    r   SO_REUSEPORT   Tz'Setting SO_REUSEPORT failed because of z. SO_REUSEPORT is disabled.NF)	
setsockoptsocket
SOL_SOCKETSO_REUSEADDRhasattrr   r   loggerdebug)r   rq   r4   r4   r5   set_socket_reuse_port  s   
	
r   c                	   @   s|   e Zd ZdZdeeef fddZedefddZ	ede
e fdd	Zd
d Zdededede
e fddZdd ZdS )ASGIAppReplicaWrapperz;Provides a common wrapper for replicas running an ASGI app.app_or_funcc                 C   s@   t |r
| | _n|| _tt| jdd d dd| _t| j_d S )NonF)lifespan	log_level
log_config
access_log)r   
isfunction	_asgi_appr   r   _serve_asgi_lifespanr   )r3   r   r4   r4   r5   rb   -  s   

zASGIAppReplicaWrapper.__init__r0   c                 C   s   | j S r1   r   r2   r4   r4   r5   appD  s   zASGIAppReplicaWrapper.appc                 C   s   t | jtr
| jjS d S r1   )rW   r   r   docs_urlr2   r4   r4   r5   	docs_pathH  s   zASGIAppReplicaWrapper.docs_pathc                    sf   ddl m} || jjtjd | j I d H  | jjr!tdW d    d S 1 s,w   Y  d S )Nr   LoggingContextlevelz=ASGI lifespan startup failed. Check replica logs for details.)	 ray.serve._private.logging_utilsr   r   r   loggingWARNINGstartupshould_exitrw   r3   r   r4   r4   r5   _run_asgi_lifespan_startupM  s   "z0ASGIAppReplicaWrapper._run_asgi_lifespan_startupr-   r.   r/   c                    s   |  |||I dH  dS )z Calls into the wrapped ASGI app.Nr   )r3   r-   r.   r/   r4   r4   r5   rx   Y  s   zASGIAppReplicaWrapper.__call__c                    sV   ddl m} || jjtjd | j I d H  W d    d S 1 s$w   Y  d S )Nr   r   r   )r   r   r   r   r   r   shutdownr   r4   r4   r5   __del__h  s
   "zASGIAppReplicaWrapper.__del__N)r;   r<   r=   re   r   r   r   rb   r   r   r
   rY   r   r   r   r   r   rx   r   r4   r4   r4   r5   r   *  s$    
r   middlewaresc                 C   sN   | du rg } t | tstd| D ]}tt|ts$tdt| dq| S )a4  Validate the return value of HTTP proxy callback.

    Middlewares should be a list of Starlette middlewares. If it is None, we
    will treat it as an empty list. If it is not a list, we will raise an
    error. If it is a list, we will check if all the items in the list are
    Starlette middlewares.
    Nz@HTTP proxy callback must return a list of Starlette middlewares.zMHTTP proxy callback must return a list of Starlette middlewares, instead got z type item in the list.)rW   r   
ValueError
issubclassrB   r   )r  
middlewarer4   r4   r5   #validate_http_proxy_callback_returnq  s   
r  c                   @   s0   e Zd ZdefddZdededefddZd	S )
RequestIdMiddlewarer   c                 C   rr   r1   )_app)r3   r   r4   r4   r5   rb     rt   zRequestIdMiddleware.__init__r-   r.   r/   c                    s\   t |d}|t  d u rt  |t  dtf fdd}| |||I d H  d S )Nr-   rc   c                    sJ   | d dkrt | d}|d  | d dkr | d< | I d H  d S )NrB   rT   r  zX-Request-IDzwebsocket.accept)r   rf   )rc   rV   
request_idr/   r4   r5   send_with_request_id  s   
z:RequestIdMiddleware.__call__.<locals>.send_with_request_id)r   r   r!   r%   rf   r   r  )r3   r-   r.   r/   rV   r  r4   r	  r5   rx     s   

zRequestIdMiddleware.__call__N)	r;   r<   r=   r   rb   r   r   r   rx   r4   r4   r4   r5   r    s    r  r   c                 C   s`   t tg| D ]&}ttjtdk r|j| fi |j} q|j| g|jR i |j	} q| S )zyWrap the ASGI app with the provided middlewares.

    The built-in RequestIdMiddleware will always be applied first.
    z0.35.0)
r   r  r   parser7   __version__r   optionsargskwargs)r   r  r  r4   r4   r5   _apply_middlewares  s   r  F)enable_so_reuseporthttp_options
event_loopr  c                   s   t  |j t }|rt| z||j|jf W n ty6 } ztd|j d|j d|d}~ww t	j
t	d_tjtj fddd|j|j|j|j|d	d
dddd}dd |_||j|gdS )z{Start an HTTP server to run the ASGI app.

    Returns a task that blocks until the server exits (e.g., due to error).
    zFailed to bind to address 'rR   '.Nzuvicorn.errorc                      s    S r1   r4   r4   r   r4   r5   <lambda>      z(start_asgi_http_server.<locals>.<lambda>ToffF)
factoryhostport	root_pathtimeout_keep_aliveloopr   r   r   r   )configc                   S   s   d S r1   r4   r4   r4   r4   r5   r    r  )sockets)r  r  r   r   bindr  r  OSErrorrw   r   CRITICAL	getLoggerr   uvicornServerr   r  keep_alive_timeout_sinstall_signal_handlersr   r   )r   r  r  r  r   rq   serverr4   r  r5   start_asgi_http_server  sB   

r+  excrequest_timeout_sr
  c                 C   s   t | trtddd| d| ddS t | tjr+d| d}t| td	d|dS t | ttfrKt | t	rCtj
d
|  ddid tdd| jdS t | tt	fr_tj
d
|  ddid ntd tddt| dS )N  TzRequest z timed out after zs.)r   is_errorrc   zClient for request z" disconnected, cancelling request.i  zRequest failed: log_to_stderrF)extra  z'Request failed due to unexpected error.i  )rW   TimeoutErrorr#   rD   CancelledErrorr   infor(   r)   r   warningrc   r   r   rY   )r,  r-  r
  rc   r4   r4   r5   get_http_response_status  s<   



r7  rU   response_startedc                 C   s"   |s| j dvr	g S t| j| j dS )N)r.  r2  )rN   )r   r^   rc   )rU   r8  r4   r4   r5   send_http_response_on_exception  s   r9  c                 C   s@   t | } tpddkrt| _| jstr| jpt| _| jpg | _| S )z7Enhanced configuration with component-specific options.r   )r   r   r(  r-  r    r  r  r4   r4   r5   $configure_http_options_with_defaults"  s   
r;  c                 C   s6   t | } trtdt d | jttt | S )Nz1Calling user-provided callback from import path 'r  )r   r   r   r5  r  extendr  r$   r:  r4   r4   r5   configure_http_middlewares6  s   r=  rd   )arD   r   r[   r   r   r   collectionsr   copyr   dataclassesr   typingr   r   r   r   r	   r
   r   r   r   r7   r&  r   r   fastapi.encodersr   	packagingr   starlette.datastructuresr   starlette.middlewarer   starlette.typesr   r   r   r   r   uvicorn.configr   uvicorn.lifespan.onr   ray._common.pydantic_compatr   ray.exceptionsr   r   ray.serve._private.commonr   ray.serve._private.constantsr   r   r    r!   r"   )ray.serve._private.proxy_request_responser#   ray.serve._private.utilsr$   r%   r&   ray.serve.configr'   ray.serve.exceptionsr(   r)   r*   r%  r   r,   rX   rK   intr^   r_   ri   rj   r   r   boolr   r   r  r  r  AbstractEventLoopTaskr+  r   floatrY   r7  r9  r;  r=  r4   r4   r4   r5   <module>   s    ,

' efG

8
&
