o
    ci                    @   s8  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZm Z m!Z! d dl"Z#d dl$m%Z% d d	l&m'Z' d d
l(m)Z) d dl*m+Z+m,Z,m-Z-m.Z. d dl/Z/d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZA d dlBmCZC d dlDmEZEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZNmOZOmPZPmQZQmRZRmSZS d dlTmUZUmVZV d dlWmXZXmYZYmZZZm[Z[m\Z\ d dl]m^Z^m_Z_m`Z`maZambZb d dlcmdZdmeZe d dlfmgZg d dlhmiZimjZjmkZk d dllmmZm d dlnmoZo d dlpmqZq d dlrmsZs d dltmuZumvZvmwZw d dlxmyZy ezeRZ{e eCemee| ee} ee~ f Zd e~d!efd"d#ZG d$d% d%Zee~gdf ZG d&d' d'eZG d(d) d)eZG d*d+ d+ZeG d,d- d-ZG d.d/ d/ZdS )0    N)ABCabstractmethod)defaultdictdeque)asynccontextmanagercontextmanager)	dataclass)import_module)AnyAsyncGeneratorCallableDict	GeneratorOptionalTupleUnion)	to_thread)Request)	Starlette)ASGIAppReceiveScopeSend)cloudpickle)get_or_create_event_loop)
ActorClassActorHandle)RemoteFunction)metrics)DeploymentID	ReplicaIDReplicaQueueLengthInfoRequestMetadataServeComponentTypeStreamingHTTPRequestgRPCRequest)DeploymentConfig)GRPC_CONTEXT_ARG_NAMEHEALTH_CHECK_METHOD/RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE$RAY_SERVE_METRICS_EXPORT_INTERVAL_MS4RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_PERIOD_S&RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE RAY_SERVE_RUN_SYNC_IN_THREADPOOL(RAY_SERVE_RUN_SYNC_IN_THREADPOOL_WARNING*RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREADRECONFIGURE_METHODREQUEST_LATENCY_BUCKETS_MSREQUEST_ROUTING_STATS_METHODSERVE_CONTROLLER_NAMESERVE_LOGGER_NAMESERVE_NAMESPACE)create_replica_implcreate_replica_metrics_manager)ASGIAppReplicaWrapperASGIArgsASGIReceiveProxyMessageQueueResponse)access_log_msg configure_component_cpu_profilerconfigure_component_logger#configure_component_memory_profilerget_component_logger_file_path)InMemoryMetricsStoreMetricsPusher)get_asgi_route_name)	Semaphoreget_component_file_nameparse_import_path)DeploymentVersion)AutoscalingConfig)_get_in_flight_requests)
Deployment)BackPressureErrorDeploymentUnavailableErrorRayServeException)LoggingConfigimport_pathreturnc                 C   sj   t | \}}tt||}t|tr|j}|S t|tr"|jj}|S t|t	r3t
d|  d |j}|S )NzThe import path "zm" contains a decorated Serve deployment. The decorator's settings are ignored when deploying via import path.)rG   getattrr	   
isinstancer   	_functionr   __ray_metadata__modified_classrK   loggerwarningfunc_or_class)rP   module_name	attr_namedeployment_def r]   N/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/replica.py%_load_deployment_def_from_import_paths   s   

	

r_   c                   @   s   e Zd ZdZdZdZdZdedej	de
e defd	d
Zdd Zdd Zdd ZdefddZde
e fddZdedefddZdedefddZdefddZdededefd d!Zdeeef fd"d#Zd'd%d&Zd$S )(ReplicaMetricsManagera"  Manages metrics for the replica.

    A variety of metrics are managed:
        - Fine-grained metrics are set for every request.
        - Autoscaling statistics are periodically pushed to the controller.
        - Queue length metrics are periodically recorded as user-facing gauges.
    push_metrics_to_controllerrecord_metrics set_replica_request_metric_gauge
replica_id
event_loopautoscaling_configingressc                 C   s  || _ t | _t | _|| _|| _tjt	t
d| _d| _tdk| _td | _tjddd| _| j  tjddd	d
| _| jrDtt| _tjddd	d
| _| jrUtt| _tdt  tjddtd	d| _| jrott| _ tj!ddd| _"| #| | jr|$| %  d S d S )N)	namespacer     serve_deployment_replica_startszCThe number of times this replica has been restarted due to failure.)description serve_deployment_request_counterz?The number of queries that have been processed in this replica.)route)rk   tag_keysserve_deployment_error_counterz<The number of exceptions that have occurred in this replica.zREQUEST_LATENCY_BUCKETS_MS: &serve_deployment_processing_latency_msz(The latency for queries to be processed.)rk   
boundariesrn    serve_replica_processing_queriesz.The current number of queries being processed.)&_replica_idrC   _metrics_pusherrB   _metrics_store_autoscaling_config_ingressray	get_actorr3   r5   _controller_handle_num_ongoing_requestsr*   _cached_metrics_enabled_cached_metrics_interval_sr   Counter_restart_counterinc_request_counterr   int_cached_request_counter_error_counter_cached_error_counterrW   debugr1   	Histogram_processing_latency_trackerr   _cached_latenciesGauge_num_ongoing_requests_gaugeset_autoscaling_configcreate_task_report_cached_metrics_forever)selfrd   re   rf   rg   r]   r]   r^   __init__   s^   






zReplicaMetricsManager.__init__c                 C   s   | j  D ]\}}| jj|d|id q| j   | j D ]\}}| jj|d|id q| j  | j D ]\}}|D ]}| jj	|d|id q=q7| j  | j
| j d S )Nrm   tags)r   itemsr   r   clearr   r   r   r   observer   setr{   )r   rm   count	latencies
latency_msr]   r]   r^   _report_cached_metrics   s   


z,ReplicaMetricsManager._report_cached_metricsc                    s~   | j dksJ d}	 zt| j I d H  |   d}W n! ty=   td tdd| }|d7 }t|I d H  Y nw q)Nr   Tz#Unexpected error reporting metrics.
         )r}   asynciosleepr   	ExceptionrW   	exceptionmin)r   consecutive_errorsbackoff_time_sr]   r]   r^   r      s   
z4ReplicaMetricsManager._report_cached_metrics_foreverc                    s   | j  I dH  dS )zStop periodic background tasks.N)rt   graceful_shutdownr   r]   r]   r^   shutdown   s   zReplicaMetricsManager.shutdownrQ   c                 C   s   t  o| jS N)r)   rv   r   r]   r]   r^   should_collect_metrics  s   z,ReplicaMetricsManager.should_collect_metricsc                 C   sV   || _ |  r)| j  | j| j| j| j j | j| j| j	t
t| j j dS dS )z&Dynamically update autoscaling config.N)rv   r   rt   startregister_or_update_task$PUSH_METRICS_TO_CONTROLLER_TASK_NAME_push_autoscaling_metricsmetrics_interval_sRECORD_METRICS_TASK_NAME_add_autoscaling_metrics_pointr   r+   )r   rf   r]   r]   r^   r     s"   
z,ReplicaMetricsManager.set_autoscaling_configrequest_metadatac                 C   s*   |  j d7  _ | js| j| j  dS dS )zFIncrement the current total queue length of requests for this replica.r   Nr{   r|   r   r   r   r   r]   r]   r^   inc_num_ongoing_requests#     z.ReplicaMetricsManager.inc_num_ongoing_requestsc                 C   s*   |  j d8  _ | js| j| j  dS dS )zFDecrement the current total queue length of requests for this replica.r   Nr   r   r]   r]   r^   dec_num_ongoing_requests)  r   z.ReplicaMetricsManager.dec_num_ongoing_requestsc                 C      | j S )z<Get current total queue length of requests for this replica.)r{   r   r]   r]   r^   get_num_ongoing_requests/     z.ReplicaMetricsManager.get_num_ongoing_requestsrm   r   	was_errorc                C   s   | j r#| j| | |r| j|  d7  < dS | j|  d7  < dS | jj|d|id |r:| jjd|id dS | j	jd|id dS )zRecords per-request metrics.r   rm   r   N)
r|   r   appendr   r   r   r   r   r   r   )r   rm   r   r   r]   r]   r^   record_request_metrics3  s   z,ReplicaMetricsManager.record_request_metricsc                 C   s:   | j j}| jjj| j| j| jt | t d d S )N)rd   
window_avgsend_timestamp)	rv   look_back_period_srz   record_autoscaling_metricsremoters   ru   window_averagetime)r   look_back_periodr]   r]   r^   r   B  s   
z/ReplicaMetricsManager._push_autoscaling_metricsNc                 C   s   | j | j| jit  d S r   )ru   add_metrics_pointrs   r{   r   r   r]   r]   r^   r   L  s   
z4ReplicaMetricsManager._add_autoscaling_metrics_pointrQ   N)__name__
__module____qualname____doc__r   r   *SET_REPLICA_REQUEST_METRIC_GAUGE_TASK_NAMEr    r   BaseEventLoopr   rI   boolr   r   r   r   r   r   r"   r   r   r   r   strfloatr   r   r
   r   r   r]   r]   r]   r^   r`      s2    
H
r`   c                   @   s(  e Zd Zdedededededede	de
fd	d
ZedefddZdefddZdefddZdddefddZdedeef fddZdede	fddZdedee dee
 fddZededeeddf fd d!Zd"ee d#ee
 d$e defd%d&Z!dedee d'ee
ef fd(d)Z"dedee#ef fd*d+Z$dede%edf fd,d-Z&defd.d/Z'e(d0d1 Z)defd2d3Z*defd4d5Z+e(ded6e,j-fd7d8Z.e(ded6e/fd9d:Z0e(ededeeddf fd;d<Z1e2defd=d>Z3d?d@ Z4dAdB Z5dCdD Z6dEdF Z7dee
ef fdGdHZ8dS )IReplicaBaserd   r\   	init_argsinit_kwargsdeployment_configversionrg   route_prefixc	           	   	      s   | _ | _|j _| _| _| _ jj  _ jj	r( jj	 d j  _ jj
 _  jj t  _t||| jttdd _t fdd _d _t  _d  _d _d _d  _ jd d t| j jj |d _!d  _"d  _#d S )N_F)deployment_idrun_sync_methods_in_threadpool run_user_code_in_separate_threadlocal_testing_modec                      s    j S r   )max_ongoing_requestsr]   r   r]   r^   <lambda>{  s    z&ReplicaBase.__init__.<locals>.<lambda>servable_object)rd   re   rf   rg   )$_versionrs   r   _deployment_id_deployment_configrw   _route_prefixname_component_nameapp_name	unique_id_component_id_configure_logger_and_profilerslogging_configr   _event_loopUserCallableWrapperr-   r/   _user_callable_wrapperrE   
_semaphore_user_callable_initializedr   Lock_user_callable_initialized_lock_initialization_latency_healthy_shutting_down_user_callable_asgi_app_set_internal_replica_contextr7   rf   _metrics_manager_port
_docs_path)	r   rd   r\   r   r   r   r   rg   r   r]   r   r^   r   W  sL   
	

zReplicaBase.__init__rQ   c                 C   s   | j jS r   )r   r   r   r]   r]   r^   r        z ReplicaBase.max_ongoing_requestsc                 C   
   | j  S r   )r   r   r   r]   r]   r^   r        
z$ReplicaBase.get_num_ongoing_requestsc                 C   s   | j j| j | j| j| jfS r   )r   r   r   r   r   r   r]   r]   r^   get_metadata  s   zReplicaBase.get_metadataNr   r   c                C   s   t jjj| j|| jd d S )N)rd   r   r   )rx   servecontextr   rs   r   )r   r   r]   r]   r^   r     s
   
z)ReplicaBase._set_internal_replica_contextr   c                 C   sp   |d u ri }t |trtdi |}ttj| j| j|td t	tj| j| jd t
tj| j| jd\| _| _d S )N)component_typecomponent_namecomponent_idr   buffer_size)r   r   r  r]   )rS   dictrO   r?   r#   REPLICAr   r   r,   r@   r>   cpu_profilercpu_profiler_log)r   r   r]   r]   r^   r     s*   
z+ReplicaBase._configure_logger_and_profilersr   c                 C   s   | j   S r   )r   lockedr   r]   r]   r^   _can_accept_request  s   zReplicaBase._can_accept_requestrequest_argsc                 C   s\   |j }| jdur,|d }z	t| j|j}W n ty%   d}td Y nw |dur,|}|S )zGet the matched route string for ASGI apps to be used in logs & metrics.

        If this replica does not wrap an ASGI app or there is no matching for the
        request, returns the existing route from the request metadata.
        Nr   zFailed unexpectedly trying to get route name for request. Routes in metric tags and log messages may be inaccurate. Please file a GitHub issue containing this traceback.)rm   r   rD   
asgi_scoper   rW   r   )r   r   r	  rm   reqmatched_router]   r]   r^   _maybe_get_http_route  s    

z!ReplicaBase._maybe_get_http_routec              
   #   s    t   }d }d  dtf fdd}z|V  W n9 tjy2 } z|}| || W Y d }~n#d }~w tyP } z|}td | || W Y d }~nd }~ww t   | d }| 	| || |d urh|d d S )Nsc                    s   |  d S r   r]   )r  status_coder]   r^   _status_code_callback  r   zEReplicaBase._handle_errors_and_metrics.<locals>._status_code_callbackzRequest failed.ri   )
r   r   r   CancelledError_on_request_cancelledr   rW   r   _on_request_failed_record_errors_and_metrics)r   r   
start_timeuser_exceptionr  er   r]   r  r^   _handle_errors_and_metrics  s0   

z&ReplicaBase._handle_errors_and_metricsr  r  r   c           	      C   sz   |j }|j}|j}|d u rd}nt|tjrd}nd}tjt|p!d|p$||p'||dddid | j	j
|||d ud	 d S )
NOK	CANCELLEDERRORCALL)methodrm   statusr   serve_access_logTextra)rm   r   r   )_http_methodrm   call_methodrS   r   r  rW   infor=   r   r   )	r   r  r  r   r   http_method
http_router$  
status_strr]   r]   r^   r    s,   

z&ReplicaBase._record_errors_and_metricsrequest_kwargsc                 C   s   |j r6t|dkrt|d tsJ |d }|j}t|||j}|dd|_| 	|||_
||f}||fS |jrat|dkrFt|d tsHJ |d }| j|j}|jf}|jr_t|jini }||fS )Nr   r   r  WS)is_http_requestlenrS   r$   r
  r:   receive_asgi_messagesgetr#  r  rm   is_grpc_requestr%   r   get_user_method_infor$  user_request_prototakes_grpc_context_kwargr'   grpc_context)r   r   r	  r)  requestscopereceivemethod_infor]   r]   r^   _unpack_proxy_args3  s6   zReplicaBase._unpack_proxy_argsc              
      s   |  |||\}}| |: | |4 I d H  | j|||I d H W  d   I d H  W  d    S 1 I d H s<w   Y  W d    d S 1 sLw   Y  d S r   )r8  _wrap_request_start_requestr   call_user_method)r   r   r	  r)  r]   r]   r^   handle_requestY  s   "zReplicaBase.handle_requestc           	   
   O  s   |  |||\}}| |b}| |4 I dH = |jr9|\}}| j||||2 z3 dH W }t|V  q*6 n| j|||2 z	3 dH W }|V  qA6 W d  I dH  n1 I dH s\w   Y  W d   dS W d   dS 1 stw   Y  dS zDGenerator that is the entrypoint for all `stream=True` handle calls.N)	r8  r9  r:  r+  r   call_http_entrypointpickledumpscall_user_generator)	r   r   r	  r)  status_code_callbackr5  r6  msgsresultr]   r]   r^   handle_request_streaminge  s6   *"z$ReplicaBase.handle_request_streamingc           
   
   O  sn  |  |s$| j}tjd| d|j dddid td|  V  d S | |||\}}| |{}| 	|4 I d H V td|  dV  |j
re|\}}| j||||2 z3 d H W }t|V  qV6 n#|jr|| j|||2 z	3 d H W }	|	V  qp6 n| j|||I d H V  W d   I d H  n1 I d H sw   Y  W d    d S W d    d S 1 sw   Y  d S )	Nz,Replica at capacity of max_ongoing_requests=z, rejecting request .log_to_stderrFr!  T)acceptednum_ongoing_requests)r  r   rW   rX   
request_idr!   r   r8  r9  r:  r+  r   r>  r?  r@  is_streamingrA  r;  )
r   r   r	  r)  limitrB  r5  r6  rC  rD  r]   r]   r^   handle_request_with_rejection  s\   
*"z)ReplicaBase.handle_request_with_rejectionc                    s   t r   )NotImplementedErrorr   r]   r]   r^   _on_initialized  s   zReplicaBase._on_initializedc              	      s   za| j 4 I d H E t | _| js.| j I d H | _| jr$| jjj| _	| 
 I d H  d| _|rD| j|jI d H  | j|jI d H  W d   I d H  n1 I d H sTw   Y  |  I d H  W d S  typ   tt d w NT)r   r   _initialization_start_timer   r   initialize_callabler   	_callable	docs_pathr   rO   set_sync_method_threadpool_limitr   call_reconfigureuser_configcheck_healthr   RuntimeError	traceback
format_excr   r   r]   r]   r^   
initialize  s2   


(zReplicaBase.initializec                    s   zI|j | jj k}|j| jjk}|| _t| j|| _| j|j |r*| 	|j | j
|jI d H  |r@| j
|j I d H  | j| j
jd W d S  tyX   tt d w Nr   )rW  r   r   rH   from_deployment_versionr   r   r   rf   r   r   rU  r   rV  r   user_callabler   rY  rZ  r[  )r   r   user_config_changedlogging_config_changedr]   r]   r^   reconfigure  s<   

zReplicaBase.reconfigurer  c                 C      d S r   r]   r   r   r  r]   r]   r^   r    s   z!ReplicaBase._on_request_cancelledc                 C   rd  r   r]   re  r]   r]   r^   r    s   zReplicaBase._on_request_failedc                 C   rd  r   r]   r   r]   r]   r^   r9     s   zReplicaBase._wrap_requestc              
   C  st   | j 4 I d H % z| j| d V  W | j| n| j| w W d   I d H  d S 1 I d H s3w   Y  d S r   )r   r   r   r   r   r]   r]   r^   r:    s   .zReplicaBase._start_requestc                    s`   | j j}	 t|I dH  | j }|dkr$td| d| d ntjddd	id
 dS q)zWait for any ongoing requests to finish.

        Sleep for a grace period before the first time we check the number of ongoing
        requests to allow the notification to remove this replica to propagate to
        callers first.
        TNr   zWaiting for an additional z!s to shut down because there are z ongoing requests.z,Graceful shutdown complete; replica exiting.rG  Fr!  )r   graceful_shutdown_wait_loop_sr   r   r   r   rW   r%  )r   wait_loop_period_srI  r]   r]   r^   _drain_ongoing_requests  s"   
z#ReplicaBase._drain_ongoing_requestsc                    sP   z
| j  I d H  W n   | jrtd ntd Y | j I d H  d S )NzJ__del__ ran before replica finished initializing, and raised an exception.z__del__ raised an exception.)r   call_destructorr   rW   r   r   r   r   r]   r]   r^   r   (  s   
zReplicaBase.shutdownc                    s.   d| _ | jr|  I d H  |  I d H  d S rP  )r   r   rh  r   r   r]   r]   r^   perform_graceful_shutdown8  s
   z%ReplicaBase.perform_graceful_shutdownc              
      sZ   z| j  }|d ur|I d H  d| _W d S  ty, } ztd d| _|d d }~ww )NTzReplica health check failed.F)r   call_user_health_checkr   r   rW   rX   r   fr  r]   r]   r^   rX  B  s   


zReplicaBase.check_healthc              
      sP   z| j  }|d ur|I d H W S i W S  ty' } ztd |d d }~ww )Nz$Replica record routing stats failed.)r   call_user_record_routing_statsr   rW   rX   rl  r]   r]   r^   record_routing_statsO  s   

z ReplicaBase.record_routing_stats)9r   r   r   r    r   r   r   r&   rH   r   r   r   propertyr   r   r   ReplicaMetadatar   r   r   rO   r   r"   r  r
   r   r  r   r   StatusCodeCallbackr  BaseExceptionr   r  r8  bytesr<  r   rE  rM  r   rO  r]  rc  r   r  r  r   r  r9  r   r:  rh  r   rj  rX  ro  r]   r]   r]   r^   r   V  s    	
D	


"

&




/
 $
r   c                   @   sZ   e Zd Zdd ZdedejfddZdedefdd	Z	e
ded
eeddf fddZdS )Replicac                    s4   | j | jjd | jd u rt | j | _d S d S r^  )r   r   r`  r   r   rQ  r   r]   r]   r^   rO  [  s   
zReplica._on_initializedmetadatar  c                 C   sJ   t jj|j}| D ]}|  qt|j}| D ]}|  qdS )z#Recursively cancels child requests.N)rx   r   r    _get_requests_pending_assignmentinternal_request_idvaluescancelrJ   )r   rv  r  requests_pending_assignmenttaskin_flight_requestsreplica_resultr]   r]   r^   r  e  s   


zReplica._on_request_cancelledr   c                 C   s    t jj rt jj  d S d S r   )rx   utilpdb$_is_ray_debugger_post_mortem_enabled_post_mortemre  r]   r]   r^   r  v  s   zReplica._on_request_failedrQ   Nc              
   c   sl    t jjjt jjj|j|j|j| j	j
|j|jd | |}|V  W d   dS 1 s/w   Y  dS )zContext manager that wraps user method calls.

        1) Sets the request context var with appropriate metadata.
        2) Records the access log message (if not disabled).
        3) Records per-request metrics via the metrics manager.
        )rm   rJ  _internal_request_idr   multiplexed_model_idr3  N)rx   r   r   _serve_request_contextr   _RequestContextrm   rJ  rx  r   r   r  r3  r  )r   r   rB  r]   r]   r^   r9  z  s   

"zReplica._wrap_request)r   r   r   rO  r"   r   r  r  r   r  r   r   rr  r9  r]   r]   r]   r^   ru  Z  s    

ru  c                   @   sJ  e Zd ZdZdededededededed	efd
dZ	de
fddZdefddZdefddZ	d0dedee defddZdd Zdeeef fddZdefddZded ee deeee f fd!d"Zdedeeef fd#d$Zdedeedf fd%d&Zdedeedf fd'd(Zd)edefd*d+Zd,d- Z defd.d/Z!dS )1ReplicaActora?  Actor definition for replicas of Ray Serve deployments.

    This class defines the interface that the controller and deployment handles
    (i.e., from proxies and other replicas) use to interact with a replica.

    All interaction with the user-provided callable is done via the
    `UserCallableWrapper` class.
    rd   serialized_deployment_defserialized_init_argsserialized_init_kwargsdeployment_config_proto_bytesr   rg   r   c	              
      sR   t |}	t|}
t|
trt|
}
t||
t|t||	|||d| _d S )N)rd   r\   r   r   r   r   rg   r   )	r&   from_proto_bytesr   loadsrS   r   r_   r6   _replica_impl)r   rd   r  r  r  r  r   rg   r   r   r\   r]   r]   r^   r     s"   

zReplicaActor.__init__handlec                 C   s   |j   d S r   )pongr   )r   r  r]   r]   r^   push_proxy_handle  s   zReplicaActor.push_proxy_handlerQ   c                 C   r   )zFetch the number of ongoing requests at this replica (queue length).

        This runs on a separate thread (using a Ray concurrency group) so it will
        not be blocked by user code.
        )r  r   r   r]   r]   r^   r     s   
z%ReplicaActor.get_num_ongoing_requestsc                    s>   t  t  t  t  tj tj	 t
 fS )a  poke the replica to check whether it's alive.

        When calling this method on an ActorHandle, it will complete as
        soon as the actor has started running. We use this mechanism to
        detect when a replica has been allocated a worker slot.
        At this time, the replica can transition from PENDING_ALLOCATION
        to PENDING_INITIALIZATION startup state.

        Returns:
            The PID, actor ID, node ID, node IP, and log filepath id of the replica.
        )osgetpidrx   get_runtime_contextget_actor_idget_worker_idget_node_idr  get_node_ip_addressget_node_instance_idrA   r   r]   r]   r^   is_allocated  s   


zReplicaActor.is_allocatedNr   _afterc                    s   | j |I dH  | j  S )zHandles initializing the replica.

        Returns: 3-tuple containing
            1. DeploymentConfig of the replica
            2. DeploymentVersion of the replica
            3. Initialization duration in seconds
        N)r  r]  r   )r   r   r  r]   r]   r^   initialize_and_get_metadata  s   
z(ReplicaActor.initialize_and_get_metadatac                       | j  I d H  d S r   )r  rX  r   r]   r]   r^   rX       zReplicaActor.check_healthc                    s   | j  I d H S r   )r  ro  r   r]   r]   r^   ro    s   z!ReplicaActor.record_routing_statsc                    s   | j |I d H  | j  S r   )r  rc  r   r\  r]   r]   r^   rc    s   
zReplicaActor.reconfigurepickled_request_metadatar	  c                 C   s.   t |}|js|jrt |d f}||fS )Nr   )r?  r  r+  r/  )r   r  r	  r   r]   r]   r^   _preprocess_request_args  s   
z%ReplicaActor._preprocess_request_argsc                    sJ   |  ||\}}| jj|g|R i |I dH }|jr#|j| f}|S )z$Entrypoint for `stream=False` calls.N)r  r  r<  r/  r3  SerializeToStringr   r  r	  r)  r   rD  r]   r]   r^   r<    s   zReplicaActor.handle_requestc                 O  sZ   |  ||\}}| jj|g|R i |2 z3 dH W }|jr&|j| f}|V  q6 dS r=  )r  r  rE  r/  r3  r  r  r]   r]   r^   rE    s   z%ReplicaActor.handle_request_streamingc                 O  sr   |  ||\}}| jj|g|R i |2 z3 dH W }t|tr(t|V  q|jr2|j|	 f}|V  q6 dS )aK  Entrypoint for all requests with strict max_ongoing_requests enforcement.

        The first response from this generator is always a system message indicating
        if the request was accepted (the replica has capacity for the request) or
        rejected (the replica is already at max_ongoing_requests).

        For non-streaming requests, there will only be one more message, the unary
        result of the user request handler.

        For streaming requests, the subsequent messages will be the results of the
        user request handler (which must be a generator).
        N)
r  r  rM  rS   r!   r?  r@  r/  r3  r  r  r]   r]   r^   rM  %  s"   
z*ReplicaActor.handle_request_with_rejectionproto_request_metadatac                    sT   ddl m} ||}t|j|j|j|j|jd}| jj	|g|R i |I d H S )Nr   )r"   )rJ  rx  r$  r  rm   )
ray.serve.generated.serve_pb2r"   
FromStringrJ  rx  r$  r  rm   r  r<  )r   r  r	  r)  RequestMetadataProtoprotor   r]   r]   r^   handle_request_from_javaE  s"   
z%ReplicaActor.handle_request_from_javac                    r  r   )r  rj  r   r]   r]   r^   rj  [  r  z&ReplicaActor.perform_graceful_shutdownc                 C   s   | j dur9ddl}| j   t| jd}|| j j| W d   n1 s'w   Y  td| j d | jS t	d dS )zuSaves CPU profiling data, if CPU profiling is enabled.

        Logs a warning if CPU profiling is disabled.
        Nr   wbz Saved CPU profile data to file ""zAttempted to save CPU profile data, but failed because no CPU profiler was running! Enable CPU profiling by enabling the RAY_SERVE_ENABLE_CPU_PROFILING env var.)
r  marshalsnapshot_statsopenr  dumpstatsrW   r%  error)r   r  rm  r]   r]   r^   _save_cpu_profile_data^  s   

z#ReplicaActor._save_cpu_profile_data)NN)"r   r   r   r   r    rt  rH   r   r   r   r   r  r   r   r  r&   r   r
   rq  r  rX  r   ro  rc  r   r"   r  r<  r   rE  rM  r  rj  r  r]   r]   r]   r^   r    s|    		








 
r  c                   @   sT   e Zd ZU dZeed< eed< eed< eed< eed< edededd fd	d
Z	dS )UserMethodInfoz4Wrapper for a user method and its relevant metadata.callabler   is_asgi_apptakes_any_argsr2  crQ   c                C   s,   t |j}| ||j|t|dkt|v dS )Nr   )r  r   r  r  r2  )inspect	signature
parametersr   r,  r'   )clsr  r  paramsr]   r]   r^   from_callable~  s   
zUserMethodInfo.from_callableN)
r   r   r   r   r   __annotations__r   r   classmethodr  r]   r]   r]   r^   r  t  s   
 r  c                   @   s  e Zd ZdZeefZdedede	de
dededefd	d
ZedejfddZdedefddZedefddZdedefddZdedefddZdddddddedeee  d ee	eef  d!ed"ee d#ee deeef fd$d%Zedee fd&d'ZdVd(d)Zedee fd*d+Zdefd,d-Z dee!j"j# fd.d/Z$dee!j"j# fd0d1Z%ed2d3 Z&ede	eef fd4d5Z'ed6efd7d8Z(ded9ed!ed:ed;ed"ee dee defd<d=Z)d>e*d?e+d@e,dAe-def
dBdCZ.ed9ed@e,dAe-dDe/def
dEdFZ0d>e*dGee dHe	eef de1edf fdIdJZ2eddKd>e*dGee dHe	eef dLee dee1edf  f
dMdNZ3ed>e*dGee dHe	eef defdOdPZ4dQe5fdRdSZ6edTdU Z7dS )Wr   zLWraps a user-provided callable that is used to handle requests to a replica.r\   r   r   r   r   r   r   c          	         s   t |st |stdt| d| _| _| _t | _| _	| _
d _| _| _d _i  _d  _ jrXt  _ fdd}tjd|d _ j  d S t  _d S )NzBdeployment_def must be a function or class. Instead, its type was rF  Fc                      s   t  j  j  d S r   )r   set_event_loop_user_code_event_looprun_foreverr]   r   r]   r^   _run_user_code_event_loop  s   z?UserCallableWrapper.__init__.<locals>._run_user_code_event_loopT)daemontarget)r  
isfunctionisclass	TypeErrortype_deployment_def
_init_args_init_kwargs_is_functionr   _local_testing_mode_destructor_called_run_sync_methods_in_threadpool!_run_user_code_in_separate_thread _warned_about_sync_method_change_cached_user_method_inforS  r   new_event_loopr  	threadingThread_user_code_event_loop_threadr   get_running_loop)	r   r\   r   r   r   r   r   r   r  r]   r   r^   r     s8   zUserCallableWrapper.__init__rQ   c                 C   r   r   )r  r   r]   r]   r^   re     r   zUserCallableWrapper.event_looprm  c                    s2   t  s	J dt dtf fdd}|S )a  Decorator to run a coroutine method on the user code event loop.

        The method will be modified to be a sync function that returns a
        `asyncio.Future` if user code is running in a separate event loop.
        Otherwise, it will return the coroutine directly.
        z7_run_user_code can only be used on coroutine functions.rQ   c                    sB    | g|R i |}| j rt|| j}| jr|S t|S |S r   )r  r   run_coroutine_threadsafer  r  wrap_future)r   argskwargscorofutrm  r]   r^   wrapper  s   
z3UserCallableWrapper._run_user_code.<locals>.wrapper)r  iscoroutinefunction	functoolswrapsr
   )rm  r  r]   r  r^   _run_user_code  s   z"UserCallableWrapper._run_user_coderL  c                    s   |t  _d S r   )r   current_default_thread_limitertotal_tokens)r   rL  r]   r]   r^   rU    s   z4UserCallableWrapper.set_sync_method_threadpool_limitmethod_namec                    s   | j v r
 j | S  jr j}n(t j|rt j|}n fdd}tt|t j}td| d| dt	j
|t jtd}| j |< |S )zGet UserMethodInfo for the provided call method name.

        This method is cached to avoid repeated expensive calls to `inspect.signature`.
        c                    s&   |  drdS tt j| sdS dS )N__FT)
startswithr  rR   rS  )attrr   r]   r^   callable_method_filter  s
   
zHUserCallableWrapper.get_user_method_info.<locals>.callable_method_filterzTried to call a method 'z*' that does not exist. Available methods: rF  )r  )r  r  rS  hasattrrR   listfilterdirrN   r  r  rS   r8   )r   r  user_methodr  methodsr%  r]   r   r^   r0    s&   



z(UserCallableWrapper.get_user_method_inforD  	asgi_argsc                    sP   |  \}}}t|tjjr||||I dH  dS t||||I dH  dS )a  Handle the result from user code and send it over the ASGI interface.

        If the result is already a Response type, it is sent directly. Otherwise, it
        is converted to a custom Response type that handles serialization for
        common Python objects.
        N)to_args_tuplerS   	starlette	responsesr<   send)r   rD  r  r5  r6  r  r]   r]   r^   _send_user_result_over_asgi  s
   z/UserCallableWrapper._send_user_result_over_asgiNF)r  r  rK  generator_result_callback'run_sync_methods_in_threadpool_overrider  r  r  rK  r  r  c                   s  d} dur	 nt   durnt |du r| jn|}tp'to2tp1t }	|	ra|ratrKd}|sKt	dj
 d fdd}
t|
I dH }||fS |	rw| jsw|du rwd| _ttjj
d  i }t|r|I dH }||fS )	a7  Call the callable with the provided arguments.

        This is a convenience wrapper that will work for `def`, `async def`,
        generator, and async generator functions.

        Returns the result and a boolean indicating if the result was a sync generator
        that has already been consumed.
        FNTMethod 'f' returned a generator. You must use `handle.options(stream=True)` to call generators on a deployment.c                     s,    i } r| D ]}| qd } | S r   r]   )rD  rr  r  r  is_generatorr  r]   r^   run_callableJ  s   
z;UserCallableWrapper._call_func_or_gen.<locals>.run_callable)r  )tupler  r  r  r  ismethodr  isasyncgenfunctionisgeneratorfunctionr  r   r   run_syncr  warningswarnr.   formatiscoroutine)r   r  r  r  rK  r  r  sync_gen_consumedrun_sync_in_threadpoolis_sync_methodr  rD  r]   r  r^   _call_func_or_gen  sP   



z%UserCallableWrapper._call_func_or_genc                 C   r   r   )rS  r   r]   r]   r^   r`  l  r   z!UserCallableWrapper.user_callablec                    sP      j j}dtdtf fdd} jD ]}||| q j  I d H  d S )Nr   excc                    s
     |S r   )handle_exception)r   r  r   r]   r^   r  y  r   zGUserCallableWrapper._initialize_asgi_callable.<locals>.handle_exception)rS  appr   r   service_unavailable_exceptionsadd_exception_handler_run_asgi_lifespan_startup)r   r  r  r  r]   r   r^   _initialize_asgi_callablep  s   
z-UserCallableWrapper._initialize_asgi_callablec                    s   | j dur
tdtjdddid | jr| j| _ n%| j| j| _ | j| j j| j	| j
ddI dH  t| j tr@|  I dH  t| j td| _t| j td| _tjdddid t| j trc| j jS dS )	zInitialize the user callable.

        If the callable is an ASGI app wrapper (e.g., using @serve.ingress), returns
        the ASGI app object, which may be used *read only* by the caller.
        Nz/initialize_callable should only be called once.zStarted initializing replica.rG  Fr!  )r  r  r  zFinished initializing replica.)rS  rY  rW   r%  r  r  __new__r  r   r  r  rS   r8   r  rR   r(   _user_health_checkr2   _user_record_routing_statsr  r   r]   r]   r^   rR    s>   



z'UserCallableWrapper.initialize_callablec                 C   s   | j d u rtd| dd S )Nz-`initialize_callable` must be called before `z`.)rS  rY  )r   r  r]   r]   r^   _raise_if_not_initialized  s
   

z-UserCallableWrapper._raise_if_not_initializedc                 C       |  d | jd ur|  S d S )Nrk  )r  r  _call_user_health_checkr   r]   r]   r^   rk    s   

z*UserCallableWrapper.call_user_health_checkc                 C   r  )Nrn  )r  r  _call_user_record_routing_statsr   r]   r]   r^   rn    s   

z2UserCallableWrapper.call_user_record_routing_statsc                    s   |  | jI d H  d S r   )r  r  r   r]   r]   r^   r    s   z+UserCallableWrapper._call_user_health_checkc                    s   |  | jI d H \}}|S r   )r  r  )r   rD  r   r]   r]   r^   r    s   z3UserCallableWrapper._call_user_record_routing_statsrW  c                    sn   |  d |d ur5| jrtdt| jts$td| j d t d | jt	| jt|fdI d H  d S d S )NrV  z1deployment_def must be a class to use user_configz%user_config specified but deployment z	 missing z method)r  )
r  r  
ValueErrorr  rS  r0   rN   r   r  rR   )r   rW  r]   r]   r^   rV    s,   

z$UserCallableWrapper.call_reconfigureuser_method_infor+  r  c                   s   t |}t |}	|rK|r|D ]}
||
 q|S |	r,|2 z
3 dH W }
||
 q6 |S |r<|js<| ||I dH  |S |sI|sItd|j d|S |rQJ d|sU|	r^td|j d|S )ai  Postprocess the result of a user method.

        User methods can be regular unary functions or return a sync or async generator.
        This method will raise an exception if the result is not of the expected type
        (e.g., non-generator for streaming requests or generator for unary requests).

        Generator outputs will be written to the `generator_result_callback`.

        Note that HTTP requests are an exception: they are *always* streaming requests,
        but for ASGI apps (like FastAPI), the actual method will be a regular function
        implementing the ASGI `__call__` protocol.
        NCalled method 'G' with `handle.options(stream=True)` but it did not return a generator.z4All HTTP requests go through the streaming codepath.r  r  )r  isgenerator
isasyncgenr  r  r  r   )r   rD  r  rK  r+  r  r  r  result_is_genresult_is_async_genr  r]   r]   r^   _handle_user_method_result  s<   




z.UserCallableWrapper._handle_user_method_resultr   rB  r5  r6  c                   s   t   | |j}| jr$t dtf fdd}| ||||}ndtf fdd}t| ||||}d} 	|2 z3 d H W }	|s[|	d }
d}|
d d	kr[|t
|
d
  |	V  q?6 d S )Nitemc                    s     j|  d S r   call_soon_threadsafe
put_nowaitr#  result_queuesystem_event_loopr]   r^   enqueue@  s   z9UserCallableWrapper.call_http_entrypoint.<locals>.enqueuec                    s     |  d S r   )r&  r'  )r)  r]   r^   r+  H  s   Fr   Tr  zhttp.response.startr  )r;   r0  r$  r  r   r  r
   _call_http_entrypointr   fetch_messages_from_queuer   )r   r   rB  r5  r6  r  r+  call_futurefirst_message_peekedmessagesmsgr]   r(  r^   r>  1  s,   z(UserCallableWrapper.call_http_entrypointr  c                    sd  |  d tjd|j ddddd |jr|||f}n|js%t }n	tj	|||f}d}z9t
| }| j|j|i d|d	I dH \}}| j||dd||t|||d
I dH }	|durg| sg|  |	W S  ty }
 z"|js| |
}| |t|||I dH  |dur| s|   d}
~
w t
jy   |dur| st|jds|   w )zCall an HTTP entrypoint.

        `send` is used to communicate the results of streaming responses.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r,  %Started executing request to method ''.FTrG  r   r!  N)r  r  rK  r  )rK  r+  r  r  r  set_max_batch_size)r  rW   r%  r   r  r  r  r  requestsr   r   r   fetch_until_disconnectr  r  r"  r9   donerz  r   r  r  r  r  )r   r  r5  r6  r  r	  receive_taskrD  r  final_resultr  responser]   r]   r^   r,  `  sb   




z)UserCallableWrapper._call_http_entrypointr	  r)  c           
        s   | j s| |||I dH }|2 z	3 dH W }|V  q6 dS t  t dtf fdd}| j||||d} |2 z3 dH W }|D ]}	|	V  qDq<6 dS )zCalls a user method for a streaming call and yields its results.

        The user method is called in an asyncio `Task` and places its results on a
        `result_queue`. This method pulls and yields from the `result_queue`.
        Nr#  c                    s     j|  d S r   r$  r'  r(  r]   r^   _enqueue_thread_safe  s   zEUserCallableWrapper.call_user_generator.<locals>._enqueue_thread_safer+  )r  _call_user_generatorr;   r   r  r
   r-  )
r   r   r	  r)  genrD  r<  r.  r0  r1  r]   r(  r^   rA    s,   
z'UserCallableWrapper.call_user_generatorr=  r+  c                   s  |  d durnt durnt | |jjtp*to5t	p4t
 }tjdj ddddd d	ttdf ffd
d fdd}rp|rp| jrpt|I dH  dS r fdd}| I dH  dS   S )zCall a user generator.

        The `generator_result_callback` is used to communicate the results of generator
        methods.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r>  Nr2  r3  FTr4  r!  rQ   c                    s|    i } t | r| I d H } t | r!| D ]}|V  qd S t | r5| 2 z	3 d H W }|V  q(6 d S tdj dNr  r  )r  r  r  r  r  r   r?  rD  )r  r	  r)  r  r]   r^   _call_generator_async  s   



zGUserCallableWrapper._call_user_generator.<locals>._call_generator_asyncc                     s@    i } t | r| D ]}| qd S tdj dr@  )r  r  r  r   rA  )r  r+  r	  r)  r  r]   r^   _call_generator_sync
  s   

zFUserCallableWrapper._call_user_generator.<locals>._call_generator_syncc                     s$     2 z
3 d H W } |  q6 d S r   r]   )rD  )rB  r+  r]   r^   gen_coro_wrapper  s   
zBUserCallableWrapper._call_user_generator.<locals>.gen_coro_wrapper)r  r  r  r0  r$  r  r  r  r   r  r  rW   r%  r   r   r
   r  r   r  )r   r   r	  r)  r+  r
  rC  rD  r]   )rB  r  r+  r	  r)  r  r^   r>    s0   

 z(UserCallableWrapper._call_user_generatorc                    s~   |  d tjd|j ddddd | |j}| j|j||ddI d	H \}}t|s4t	|r=t
d
|j d|S )zCall a (unary) user method.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r;  r2  r3  FTr4  r!  )r  r  rK  Nr  r  )r  rW   r%  r$  r0  r  r  r  r  r  r  r   )r   r   r	  r)  r  rD  r   r]   r]   r^   r;  !  s$   
z$UserCallableWrapper.call_user_methodr  c                 C   s.   t || jrtjj|jddS tjjdddS )Ni  r  zInternal Server Errori  )rS   r  r  r  r<   message)r   r  r]   r]   r^   r  C  s
   z$UserCallableWrapper.handle_exceptionc              
      s   | j du rtd dS | jrdS d| _z)t| j dr(| j| j jddI dH  t| j dr<t| j d I dH  W dS W dS  t	yY } zt
d|  W Y d}~dS d}~ww )	zExplicitly call the `__del__` method of the user callable.

        Calling this multiple times has no effect; only the first call will
        actually call the destructor.
        NzEThis replica has not yet started running user code. Skipping __del__.T__del__F)r  __serve_multiplex_wrapperz/Exception during graceful shutdown of replica: )rS  rW   r%  r  r  r  rF  rR   r   r   r   )r   r  r]   r]   r^   ri  K  s,   
z#UserCallableWrapper.call_destructorr   )8r   r   r   r   rL   rM   r  r   r   r   r   r   r   rp  r   AbstractEventLoopre   r  r   rU  r   r  r0  r
   r9   r  r   r  r`  r  r   rR  r  
concurrentfuturesFuturerk  rn  r  r  rV  r"  r"   rr  r   r   r>  r   r,  r   rA  r>  r;  r   r  ri  r]   r]   r]   r^   r     s   	
5$


	
P
7
	

=
/N


&
K
!r   )r   concurrent.futuresrI  r  r  loggingr  r?  r  r   rZ  r  abcr   r   collectionsr   r   
contextlibr   r   dataclassesr   	importlibr	   typingr
   r   r   r   r   r   r   r   starlette.responsesr  anyior   fastapir   starlette.applicationsr   starlette.typesr   r   r   r   rx   r   ray._common.utilsr   	ray.actorr   r   ray.remote_functionr   	ray.server   ray.serve._private.commonr   r    r!   r"   r#   r$   r%   ray.serve._private.configr&   ray.serve._private.constantsr'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   ray.serve._private.default_implr6   r7   ray.serve._private.http_utilr8   r9   r:   r;   r<    ray.serve._private.logging_utilsr=   r>   r?   r@   rA    ray.serve._private.metrics_utilsrB   rC   1ray.serve._private.thirdparty.get_asgi_route_namerD   ray.serve._private.utilsrE   rF   rG   ray.serve._private.versionrH   ray.serve.configrI   ray.serve.contextrJ   ray.serve.deploymentrK   ray.serve.exceptionsrL   rM   rN   ray.serve.schemarO   	getLoggerrW   r   r   r   rq  r_   r`   rr  r   ru  r  r  r   r]   r]   r]   r^   <module>   s    ($	D
	 L    9 b