o
    ci                     @   s$  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZmZmZmZmZmZ d dlZd d	lm Z  d d
l!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZGmHZH d dlImJZJ d dlKmLZLmMZM d dlNmOZO ePe3ZQdZRG dd dZSG dd deZTde jUfdd ZVG d!d" d"ZWG d#d$ d$eTZXG d%d& d&ZYdS )'    N)ABCabstractmethod)AbstractEventLoop)defaultdict)MutableMapping)contextmanager)	lru_cachepartial)	AnyCallable	CoroutineDefaultDictDictListOptionalTupleUnion)ActorHandle)ActorDiedErrorActorUnavailableErrorRayError)DeploymentHandleSourceDeploymentIDDeploymentTargetInfo	ReplicaIDRequestMetadataRunningReplicaInfo)DeploymentConfig)HANDLE_METRIC_PUSH_INTERVAL_S/RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE3RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_PERIOD_S'RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTINGSERVE_LOGGER_NAME)LongPollClientLongPollNamespace)InMemoryMetricsStoreMetricsPusher)ReplicaResult)PendingRequestRequestRouter)PowerOfTwoChoicesRequestRouterRunningReplica)ServeUsageTag)generate_request_idresolve_deployment_response"run_coroutine_or_future_threadsafe)AutoscalingConfig)BackPressureErrorDeploymentUnavailableError)metricsqueuedc                   @   s   e Zd ZdZdZdZdedededede	d	e
jd
e
jde
jfddZedefddZdee fddZedee fddZdedefddZdefddZdd Zd d! Zd"efd#d$Zd"efd%d&Z dede!fd'd(Z"d)d* Z#d+d, Z$d-d. Z%d/d0 Z&d1S )2RouterMetricsManagerzManages metrics for the router.push_metrics_to_controllerrecord_metricsdeployment_id	handle_idself_actor_idhandle_sourcecontroller_handlerouter_requests_counterqueued_requests_gaugerunning_requests_gaugec	           	      C   s   || _ || _|| _|| _|| _|| _| j|j|j| j | jd d| _	|| _
| j
|j|j| j | jd | j
d tt| _|| _| j|j|j| j | jd t | _t | _t | _d | _d| _d S )N
deploymentapplicationhandleactor_idr   F)
_handle_id_deployment_id_self_actor_id_handle_source_controller_handlenum_router_requestsset_default_tagsnameapp_namenum_queued_requestsnum_queued_requests_gaugesetr   intnum_requests_sent_to_replicasnum_running_requests_gauge	threadingLock_queries_lockr&   metrics_pusherr%   metrics_store_deployment_config	_shutdown)	selfr9   r:   r;   r<   r=   r>   r?   r@    r]   M/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/router.py__init__J   sN   	

zRouterMetricsManager.__init__request_metac                 c   s|    | j d ur
| j jnd}|dkr$| j|kr$t| j|d}t|j |z| |j | 	  d V  W | 
  d S | 
  w )N)rO   max_queued_requests)rZ   rb   rO   r2   loggerwarningmessageinc_num_total_requestsrouteinc_num_queued_requestsdec_num_queued_requests)r\   r`   rb   er]   r]   r^   wrap_request_assignment   s$   

z,RouterMetricsManager.wrap_request_assignmentrunning_replicasc                    s\   dd |D  j  tt fddj D _W d   dS 1 s'w   Y  dS )zPrune list of replica ids in self.num_queries_sent_to_replicas.

        We want to avoid self.num_queries_sent_to_replicas from growing
        in memory as the deployment upscales and downscales over time.
        c                 S   s   h | ]}|j qS r]   )
replica_id).0replicar]   r]   r^   	<setcomp>   s    z@RouterMetricsManager._update_running_replicas.<locals>.<setcomp>c                    s(   i | ]\}}|s| v r|j | qS r]   )rS   )rn   idnum_queriesrunning_replica_setr\   r]   r^   
<dictcomp>   s    
zARouterMetricsManager._update_running_replicas.<locals>.<dictcomp>N)rW   r   rR   rS   items)r\   rl   r]   rs   r^   _update_running_replicas   s   "z-RouterMetricsManager._update_running_replicasreturnc                 C   s   | j d u rd S | j jS N)rZ   autoscaling_configr\   r]   r]   r^   rz      s   
z'RouterMetricsManager.autoscaling_configdeployment_configcurr_num_replicasc                 C   s   | j rdS || _| j}|rD| j  | |r|   tr8| j| j	| j
tt|j | j| j| j|j dS | j| j| jt dS | jrN| j  dS dS )zCUpdate the config for the deployment this router sends requests to.N)r[   rZ   rz   rX   start)should_send_scaled_to_zero_optimized_push&push_autoscaling_metrics_to_controllerr   register_or_update_taskRECORD_METRICS_TASK_NAME_add_autoscaling_metrics_pointminr    metrics_interval_s$PUSH_METRICS_TO_CONTROLLER_TASK_NAMEr   
stop_tasks)r\   r|   r}   rz   r]   r]   r^   update_deployment_config   s<   

	z-RouterMetricsManager.update_deployment_configrg   c                 C   s   | j jd|id d S )Nrg   )tags)rK   inc)r\   rg   r]   r]   r^   rf      s   z+RouterMetricsManager.inc_num_total_requestsc                 C   s    |  j d7  _ | j| j  d S N   rO   rP   rQ   r{   r]   r]   r^   rh         z,RouterMetricsManager.inc_num_queued_requestsc                 C   s    |  j d8  _ | j| j  d S r   r   r{   r]   r]   r^   ri      r   z,RouterMetricsManager.dec_num_queued_requestsrm   c                 C   sT   | j  | j|  d7  < | jt| j  W d    d S 1 s#w   Y  d S r   rW   rS   rT   rQ   sumvaluesr\   rm   r]   r]   r^   $inc_num_running_requests_for_replica     "z9RouterMetricsManager.inc_num_running_requests_for_replicac                 C   sT   | j  | j|  d8  < | jt| j  W d    d S 1 s#w   Y  d S r   r   r   r]   r]   r^   $dec_num_running_requests_for_replica	  r   z9RouterMetricsManager.dec_num_running_requests_for_replicac                 C   s   | j d uo|dko| jdkS )Nr   )rz   rO   )r\   r}   r]   r]   r^   r     s
   
z>RouterMetricsManager.should_send_scaled_to_zero_optimized_pushc                 C   s4   | j jjdt | j| j| j| jd|   dS )zPushes queued and running request metrics to the controller.

        These metrics are used by the controller for autoscaling.
        )send_timestampr9   r:   rE   r<   Nr]   )	rJ   record_handle_metricsremotetimerG   rF   rH   rI   _get_aggregated_requestsr{   r]   r]   r^   r     s   

z;RouterMetricsManager.push_autoscaling_metrics_to_controllerc                 C   sP   t   }| jt| ji| tr| j| j| t   | jj }| j	| dS )zAdds metrics point for queued and running requests at replicas.

        Also prunes keys in the in memory metrics store with outdated datapoints.
        N)
r   rY   add_metrics_pointQUEUED_REQUESTS_KEYrO   r   rS   rz   look_back_period_sprune_keys_and_compact_data)r\   	timestampstart_timestampr]   r]   r^   r   &  s   
z3RouterMetricsManager._add_autoscaling_metrics_pointc                    s>   t  }trjrjj  fddj D }j|dS )Nc                    s,   i | ]\}}|j |t   p|qS r]   )rY   window_averager   )rn   rm   num_requestslook_back_periodr\   r]   r^   ru   =  s    zARouterMetricsManager._get_aggregated_requests.<locals>.<dictcomp>)queued_requestsrunning_requests)dictr   rz   r   rS   rv   rO   )r\   r   r]   r   r^   r   9  s   
z-RouterMetricsManager._get_aggregated_requestsc                    s"   | j r| j  I dH  d| _dS )z$Shutdown metrics manager gracefully.NT)rX   graceful_shutdownr[   r{   r]   r]   r^   shutdownL  s   
zRouterMetricsManager.shutdownN)'__name__
__module____qualname____doc__r   r   r   strr   r   r4   CounterGauger_   r   r   rk   r   r   rw   propertyr   r1   rz   r   rR   r   rf   rh   ri   r   r   r   boolr   r   r   r   r   r]   r]   r]   r^   r6   D   sR    	
E
1r6   c                   @   sR   e Zd ZedefddZededejj	e
 fddZedejj	fddZd	S )
Routerrx   c                 C      d S ry   r]   r{   r]   r]   r^   running_replicas_populatedV     z!Router.running_replicas_populatedr`   c                 O   r   ry   r]   )r\   r`   request_argsrequest_kwargsr]   r]   r^   assign_requestZ  s   zRouter.assign_requestc                 C   r   ry   r]   r{   r]   r]   r^   r   c  r   zRouter.shutdownN)r   r   r   r   r   r   r   
concurrentfuturesFuturer'   r   r   r]   r]   r]   r^   r   U  s    
r   rx   c                      s
   t  S )z<Helper to create an asyncio event in the current event loop.)asyncioEventr]   r]   r]   r^   create_eventh  s   r   c                    @   sN  e Zd Zeddddfdedededededej	de
d	ed
ee de
dedee deeeef  dee deej fddZedee fddZde
fddZdefddZdefddZdedee d eeef deee eeef f fd!d"Zd#ed$ed%ed&eeef fd'd(Z d)e!dee"ef fd*d+Z#d,ede"fd-d.Z$d/d0 Z%dS )1AsyncioRouterNr=   r9   r:   r;   r<   
event_loop"enable_strict_max_ongoing_requestsnode_idavailability_zoneprefer_local_node_routingresolve_request_arg_funcrequest_router_classrequest_router_kwargsrequest_router!_request_router_initialized_eventc                 C   s   || _ || _|| _|| _|| _|| _|r|ni | _|| _|| _|	| _	|
| _
d| _|| _|r1|| _ntt | j}| | _| jrF| j  || _d| _d| _t|||||tjddddtjdd	d
dtjddd
d| _t|tj|f| jtj|f| j i| jd| _!t"#|| j}|$|  dS )zUsed to assign requests to downstream replicas for a deployment.

        The routing behavior is delegated to a RequestRouter; this is a thin
        wrapper that adds metrics and logging.
        TNFserve_num_router_requestsz/The number of requests processed by the router.)rB   rg   rC   rD   rE   )descriptiontag_keysserve_deployment_queued_querieszUThe current number of queries to this deployment waiting to be assigned to a replica.rA   &serve_num_ongoing_requests_at_replicaszXThe current number of requests to this deployment that have been submitted to a replica.)call_in_event_loop)%rJ   r9   rH   rI   _event_loop_request_router_class_request_router_kwargs#_enable_strict_max_ongoing_requests_node_id_availability_zone_prefer_local_node_routing_deployment_available_request_router_request_router_initializedr   run_coroutine_threadsafer   resultrQ   _resolve_request_arg_func_running_replicas_running_replicas_populatedr6   r4   r   r   _metrics_managerr#   r$   DEPLOYMENT_TARGETSupdate_deployment_targetsDEPLOYMENT_CONFIGr   long_poll_clientSharedRouterLongPollClientget_or_createregister)r\   r=   r9   r:   r;   r<   r   r   r   r   r   r   r   r   r   r   futuresharedr]   r]   r^   r_   n  s|   


%
zAsyncioRouter.__init__rx   c                 C   s   | j sP| jrP| j| j| j| j| jt  rt j	nd| j
dd | jt| jd
}|jdi | j | jdur=|| j || _ | j  | jturPtjd | j S )a  Get and lazy loading request router.

        If the request_router_class not provided, and the request router is not
        yet initialized, then it will return None. Otherwise, if request router
        is not yet initialized, it will be initialized and returned. Also,
        setting `self._request_router_initialized` to signal that the request
        router is initialized.
        Nc                 S   s   t | S ry   r+   )rr]   r]   r^   <lambda>  s    z.AsyncioRouter.request_router.<locals>.<lambda>)
r9   r<   self_node_idr;   self_actor_handleuse_replica_queue_len_cachecreate_replica_wrapper_funcr   prefer_local_az_routingself_availability_zone1r]   )r   r   r9   rI   r   rH   rayget_runtime_contextget_actor_idcurrent_actorr   r   r!   r   initialize_stater   r   rw   r   rQ   r*   r-   CUSTOM_REQUEST_ROUTER_USEDrecord)r\   r   r]   r]   r^   r     s.   




zAsyncioRouter.request_routerc                 C   s   | j S ry   )r   r{   r]   r]   r^   r     s   z(AsyncioRouter.running_replicas_populateddeployment_target_infoc                 C   sF   |j | _|j}| jr| j| n|| _| j| |r!d| _d S d S )NT)is_availabler   rl   r   rw   r   r   r   )r\   r   rl   r]   r]   r^   r     s   
z'AsyncioRouter.update_deployment_targetsr|   c                 C   s2   |j  | _|j j| _| jj|t| jj	d d S )Nr}   )
request_router_configget_request_router_classr   r   r   r   r   lenr   curr_replicas)r\   r|   r]   r]   r^   r      s   

z&AsyncioRouter.update_deployment_configrequest_metadatar   r   c                    s   t |}| }i }t|D ]\}}| ||I dH }	|	dur$|	||< qi }
| D ]\}}| ||I dH }	|	dur@|	|
|< q+|sE|
rYt | t |
  }t|I dH  | D ]
\}}	|	 ||< q]|
 D ]
\}}	|	 ||< ql||fS )zEAsynchronously resolve and replace top-level request args and kwargs.N)	listcopy	enumerater   rv   r   r   waitr   )r\   r  r   r   new_args
new_kwargsresolve_arg_tasksiobjtaskresolve_kwarg_tasksk	all_tasksindexkeyr]   r]   r^   _resolve_request_arguments,  s2   z(AsyncioRouter._resolve_request_argumentsrm   parent_request_idresponse_idr   c                 C   sr   | j | t|tr| jr| j| t| d d S t|tr7| jr,| j	| td| d d S d S )N@ will not be considered for future requests because it has died.zRequest failed because  is temporarily unavailable.)
r   r   
isinstancer   r   on_replica_actor_diedrc   rd   r   on_replica_actor_unavailable)r\   rm   r  r  r   r]   r]   r^   _process_finished_requestT  s   


z'AsyncioRouter._process_finished_requestprc                    sD  | j  I dH  | j|I dH }| jr|jr)|j|ddI dH \}}||jfS 	 d}z(|j|ddI dH \}}| j|j| | j	||j| |j
rS||jfW S W nA tjye   |durd|    ty}   | j|j t|j d Y n ty   | j|j t|j d Y nw | jj|ddI dH }q*)zChoose a replica for the request and send it.

        This will block indefinitely if no replicas are available to handle the
        request, so it's up to the caller to time out or cancel the request.
        NF)with_rejectionTr  r  )is_retry)r   r  r   _choose_replica_for_requestr   is_cross_languagesend_requestrm   on_new_queue_len_infoon_request_routedacceptedr   CancelledErrorcancelr   r  rc   rd   r   r  )r\   r  r   r   _
queue_infor]   r]   r^   route_and_send_requestr  s<   	

z$AsyncioRouter.route_and_send_requestr`   c           
   	      sL  | j s	t| jt t }tjj	 j
| | fdd | j I dH  | j j | jjt| jjdrC| j  d}zE|  ||I dH \}}| tt|| dI dH \}}trtjj }|j}| j| t| j||}	||	 |W W  d   S  tjy   |dur|    w 1 sw   Y  dS )zBAssign a request to a replica and return the resulting object_ref.c                    s   t jj jS ry   )r   servecontext"_remove_request_pending_assignmentinternal_request_idr%  r`   r  r]   r^   r     s    z.AsyncioRouter.assign_request.<locals>.<lambda>Nr   )argskwargsmetadata)!r   r3   r9   r.   r   current_taskr   r(  r)  _add_request_pending_assignmentr+  add_done_callbackr   r  r   rk   r   r   r   r   r   r  r'  r(   r  r   _get_serve_request_context
request_idr   r	   r  r#  r$  )
r\   r`   r   r   assign_request_taskreplica_resultrm   _request_contextr5  callbackr]   r-  r^   r     sd   


	
%zAsyncioRouter.assign_requestc                    s   | j  I d H  d S ry   )r   r   r{   r]   r]   r^   r     s   zAsyncioRouter.shutdown)&r   r   r   r/   r   r   r   r   r   BaseEventLoopr   r   r   r   r   r
   r)   r   r_   r   r   r   r   r   r   r   r   r   r  r   r   r   r  r(   r'   r'  r   r   r]   r]   r]   r^   r   m  s    	

v(

(



<
Gr   c                   @   s   e Zd ZU dZdZeej ed< e	
 Zdd ZedejfddZdefd	d
Zdedejje fddZdejjfddZdS )SingletonThreadRoutera-  Wrapper class that runs an AsyncioRouter on a separate thread.

    The motivation for this is to avoid user code blocking the event loop and
    preventing the router from making progress.

    Maintains a singleton event loop running in a daemon thread that is shared by
    all AsyncioRouters.
    N_asyncio_loopc                 K   s,   d|vsJ dt dd|  i|| _d S )Nr   z4SingletonThreadRouter manages the router event loop.r]   )r   _get_singleton_asyncio_loop_asyncio_router)r\   passthrough_kwargsr]   r]   r^   r_     s   
zSingletonThreadRouter.__init__rx   c                 C   sp   | j * | jdu r$t | _tjd| jjd}|  W d   | jS W d   | jS 1 s0w   Y  | jS )zdGet singleton asyncio loop running in a daemon thread.

        This method is thread safe.
        NT)daemontarget)_asyncio_loop_creation_lockr<  r   new_event_looprU   Threadrun_foreverr~   )clsthreadr]   r]   r^   r=    s   



	
		z1SingletonThreadRouter._get_singleton_asyncio_loopc                 C   s
   | j  S ry   )r>  r   r{   r]   r]   r^   r      s   
z0SingletonThreadRouter.running_replicas_populatedr`   c                    s`   dt jdtjjfdd | j| jj|g|R i |}t|| jd|	 fdd S )a  Routes assign_request call on the internal asyncio loop.

        This method uses `run_coroutine_threadsafe` to execute the actual request
        assignment logic (`_asyncio_router.assign_request`) on the dedicated
        asyncio event loop thread. It returns a `concurrent.futures.Future` that
        can be awaited or queried from the calling thread.

        Returns:
            A concurrent.futures.Future resolving to the ReplicaResult representing
            the assigned request.
        asyncio_futureconcurrent_futurec                 S   sF   |  r|   s|  du r!|  }td |  dS dS dS dS )a  Callback attached to the asyncio Task running assign_request.

            This runs when the asyncio Task finishes (completes, fails, or is cancelled).
            Its primary goal is to propagate cancellation initiated via the
            `concurrent_future` back to the `ReplicaResult` in situations where
            asyncio_future didn't see the cancellation event in time. Think of it
            like a second line of defense for cancellation of replica results.
            NzuAsyncio task completed despite cancellation attempt. Attempting to cancel the request that was assigned to a replica.)	cancelled	exceptionr   rc   infor$  )rH  rI  r   r]   r]   r^   asyncio_future_callback5  s   zESingletonThreadRouter.assign_request.<locals>.asyncio_future_callbackloopc                    s
    | S ry   r]   r,  rM  rI  r]   r^   r   W  s   
 z6SingletonThreadRouter.assign_request.<locals>.<lambda>)
r   r   r   r   r<  create_taskr>  r   r0   r3  )r\   r`   r   r   r  r]   rP  r^   r   #  s(   
z$SingletonThreadRouter.assign_requestc                 C   s   t j| j | jdS )NrN  )r   r   r>  r   r<  r{   r]   r]   r^   r   Z  s   zSingletonThreadRouter.shutdown)r   r   r   r   r<  r   r   r   __annotations__rU   rV   rB  r_   classmethodr=  r   r   r   r   r   r   r'   r   r   r]   r]   r]   r^   r;    s   
 		

7r;  c                   @   s   e Zd ZdedefddZeedddededd fdd	Zd
e	de
ddfddZdede
ddfddZdeddfddZdeddfddZdS )r   r=   r   c                 C   s.   || _ || _ttj| _t|i | jd| _d S )N)key_listenersr   )controller_handlerr   r   weakrefWeakSetroutersr#   r   )r\   r=   r   r]   r]   r^   r_   a  s   z#SharedRouterLongPollClient.__init__N)maxsizerx   c                 C   s"   | ||d}t d| d |S )N)r=   r   zStarted .)rc   rL  )rF  r=   r   r   r]   r]   r^   r   r  s   z(SharedRouterLongPollClient.get_or_creater   r9   c                 C   (   | j | D ]}|| |j  qd S ry   )rX  r   r   stop)r\   r   r9   routerr]   r]   r^   r   {  s   
z4SharedRouterLongPollClient.update_deployment_targetsr|   c                 C   r[  ry   )rX  r   r   r\  )r\   r|   r9   r]  r]   r]   r^   r     s   
z3SharedRouterLongPollClient.update_deployment_configr]  c                 C   s   | j | j| d S ry   )r   call_soon_threadsafe	_register)r\   r]  r]   r]   r^   r     s   z#SharedRouterLongPollClient.registerc                    sz    j |j | t j  D ]\}}|s j | q fdd j  D  fdd j  D B } j| d S )Nc                    "   i | ]}t j|ft j|d qS )r9   )r$   r   r	   r   rn   r9   r{   r]   r^   ru         
z8SharedRouterLongPollClient._register.<locals>.<dictcomp>c                    r`  ra  )r$   r   r	   r   rb  r{   r]   r^   ru     rc  )	rX  r9   addr  rv   popkeysr   add_key_listeners)r\   r]  r9   rX  rT  r]   r{   r^   r_    s   

z$SharedRouterLongPollClient._register)r   r   r   r   r   r_   rS  r   r   r   r   r   r   r   r   r   r_  r]   r]   r]   r^   r   `  s6    
	
	r   )Zr   concurrent.futuresr   loggingrU   r   rV  abcr   r   r   collectionsr   collections.abcr   
contextlibr   	functoolsr   r	   typingr
   r   r   r   r   r   r   r   r   r   	ray.actorr   ray.exceptionsr   r   r   ray.serve._private.commonr   r   r   r   r   r   ray.serve._private.configr   ray.serve._private.constantsr   r   r    r!   r"   ray.serve._private.long_pollr#   r$    ray.serve._private.metrics_utilsr%   r&   !ray.serve._private.replica_resultr'   !ray.serve._private.request_routerr(   r)   .ray.serve._private.request_router.pow_2_routerr*   1ray.serve._private.request_router.replica_wrapperr,   ray.serve._private.usager-   ray.serve._private.utilsr.   r/   r0   ray.serve.configr1   ray.serve.exceptionsr2   r3   ray.utilr4   	getLoggerrc   r   r6   r   r   r   r   r;  r   r]   r]   r]   r^   <module>   sV    , 
     g