o
    `۷i                 	   @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZGmHZHmIZI d dlJmKZKmLZL d dlMmNZN d dlOmPZP d dlQmRZR d dlSmTZT d dlUmVZVmWZWmXZXmYZY d dlZm[Z[m\Z\m]Z]m^Z^ d dl_m`Z`maZambZbmcZcmdZemfZf d dlgmhZhmiZimjZjmkZkmlZlmmZmmnZnmoZompZpmqZqmrZrmsZsmtZtmuZumvZv d dlwmxZx eye2Zzd Z{d Z|d!Z}G d"d# d#Z~d$eer d%erd&ee d'ee' fd(d)Zd*erd+erd'efd,d-Zd.ee d/ee d0ee' fd1d2ZdS )3    N)defaultdict)AnyDictIterableListOptionalSetTupleUnion)build_address)run_background_task)	GcsClient)ActorHandle)ApplicationStateManagerStatusOverview)AutoscalingStateManager)	DeploymentIDDeploymentSnapshotHandleMetricReportNodeIdReplicaMetricReportRequestProtocolRequestRoutingInfoRunningReplicaInfoTargetCapacityDirection)DeploymentConfig)	CONTROL_LOOP_INTERVAL_S)RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATHRAY_SERVE_ENABLE_DIRECT_INGRESS*RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS(RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_SSERVE_CONTROLLER_NAMESERVE_DEFAULT_APP_NAMESERVE_LOGGER_NAMESERVE_NAMESPACE)ControllerHealthMetricsTracker)create_cluster_node_info_cache)DeploymentInfo)DeploymentReplicaDeploymentStateManager)EndpointState)ExternalScalerDisabledError)set_proxy_default_grpc_options)$configure_http_options_with_defaults)%configure_autoscaling_snapshot_loggerconfigure_component_logger#configure_component_memory_profilerget_component_logger_file_path)LongPollHostLongPollNamespace)NodePortManager)ProxyStateManager)RayInternalKVStore)ServeUsageTag)call_function_from_import_path"get_all_live_placement_group_namesget_head_node_idis_grpc_enabled)DeploymentModeHTTPOptionsProxyLocationgRPCOptions)ActorNameListApplicationArgsDeploymentArgsDeploymentRouteEndpointInfoEndpointSet)APITypeApplicationDetailsDeploymentDetailsHTTPOptionsSchemaLoggingConfigProxyDetailsReplicaDetailsReplicaRankServeActorDetailsServeApplicationSchemaServeDeploySchemaServeInstanceDetailsTargetTargetGroupgRPCOptionsSchema)metricszserve-app-config-checkpointzserve-logging-config-checkpointc                   @   s  e Zd ZdZdddededee fddZdefd	d
Z	dddZ
defddZdefddZdefddZdefddZdefddZdefddZdd Zdeeef fd d!Zd"efd#d$Zdeeeeef f fd%d&Zdefd'd(Zdeeef fd)d*Z defd+d,Z!d-d. Z"dd/d0Z#dd1d2Z$dd3d4Z%d5e&d6e&d7efd8d9Z'd:d; Z(d<d= Z)de*e&ee+ ee, f fd>d?Z-deee.e/ f fd@dAZ0de1fdBdCZ2deeef fdDdEZ3dFedee4 fdGdHZ5dIede&fdJdKZ6dIedLede7fdMdNZ8defdOdPZ9defdQdRZ:dSdT Z;de<fdUdVZ=dWeddfdXdYZ>ddZd[Z?d\d] Z@d^eee.e f d_eeef ddfd`daZAdbedce.e ddeddfdedfZB	gddhe+die&ddfdjdkZCddbedIedefdmdnZDdeee*eEef f fdodpZFdedeeG fdqdrZHde.e fdsdtZIdedueddfdvdwZJddxeeK defdydzZLde.eM fd{d|ZN		}ddIee d~e<de.eM fddZOdIede.eP fddZQdIedede.eM fddZRdedIede.eM fddZSde.eP deTde.eU fddZVdeeeWe f fddZXdFededeTdefddZY	}ddFedededeTde<f
ddZZdePdeTdefddZ[dePdeTde<fddZ\e]fdbedefddZ^de.e de.e fddZ_de.e fddZ`deeeaf fddZbdIede<fddZcde.e fddZd	lddbedIedeedef fddZfdbefddZgdIedee fddZhdeie fddZjdekfddZldedeeemf fddZndde<fddZode*fddZpdee, fddZqdS )ServeControllera  Responsible for managing the state of the serving system.

    The controller implements fault tolerance by persisting its state in
    a new checkpoint each time a state change is made. If the actor crashes,
    the latest checkpoint is loaded and the state is recovered. Checkpoints
    are written/read using a provided KV-store interface.

    All hard state in the system is maintained by this actor and persisted via
    these checkpoints. Soft state required by other components is fetched by
    those actors from this actor on startup and updates are pushed out from
    this actor.

    All other actors started by the controller are named, detached actors
    so they will not fate share with the controller if it crashes.

    The following guarantees are provided for state-changing calls to the
    controller:
        - If the call succeeds, the change was made and will be reflected in
          the system even if the controller or other actors die unexpectedly.
        - If the call fails, the change may have been made but isn't guaranteed
          to have been. The client should retry in this case. Note that this
          requires all implementations here to be idempotent.
    N)grpc_optionshttp_optionsglobal_logging_configrW   c             	      s\  t   | _| jt ksJ dt  j| _tt  jd| _	d| j }t
|| j	| _t | _t | _d | _d | _| jt}|d urLt|}| | tdtt d trjtdt d tt t | j	| _!| j!"  t#| _$| j$rtd t%j&|_'t(t)|| j| j!| jt*|d	| _+~~t,| j| j| _-t j.j/d
d}dd |D }t0 | _1t2| j| j|t3 | j!| j1| _4t5| j4| j1| j-| j| j| _6t7t   t j.8 t j.9 t  : t;t  < t= d| _>d| _?t | _@d | _Ai | _BtCtDD d| _E| F  tG| H  d | _Id | _J| K  tL | _M| N  i | _Og | _P| Q  g | _Rd S )Nz$Controller must be on the head node.)addressz
ray-serve-
controller)component_namecomponent_idz0Calling user-provided callback from import path .zODirect ingress is enabled in ServeController, enabling proxy on head node only.)rX   head_node_idcluster_node_info_cachelogging_configrW   T)all_namespacesc                 S   s    g | ]}|d  t kr|d qS )	namespacename)r$   ).0actor rg   S/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/controller.py
<listcomp>   s
    z,ServeController.__init__.<locals>.<listcomp>)node_idnode_ipnode_instance_idactor_id
actor_name	worker_idlog_file_pathF)controller_start_time)Srayget_runtime_contextget_node_id_controller_node_idr:   rc   ray_worker_namespacer   gcs_address
gcs_clientr6   kv_storer2   long_poll_hostasyncioEventdone_recovering_event_autoscaling_loggerrY   getLOGGING_CONFIG_CHECKPOINT_KEYpickleloads!reconfigure_global_logging_configr0   strosgetpidr   loggerinfor8   r&   r`   updater   _direct_ingress_enabledr<   HeadOnlylocationr5   r-   r,   proxy_state_managerr*   endpoint_stateutillist_named_actorsr   autoscaling_state_managerr)   r9   deployment_state_managerr   application_state_managerrN   get_node_ip_addressget_node_instance_idget_actor_idr!   get_worker_idr1   _actor_details_shutting_down_shutdown_event_shutdown_start_time_registered_cleanup_actorsr%   time_health_metrics_tracker_create_control_loop_metricsr   run_control_loop_target_capacity_target_capacity_direction_recover_state_from_checkpointset_proxy_nodes_update_proxy_nodes_last_autoscaling_snapshots&_autoscaling_enabled_deployments_cache&_refresh_autoscaling_deployments_cache_last_broadcasted_target_groups)selfrX   rY   rW   kv_store_namespacelog_config_checkpointall_current_actorsall_serve_actor_namesrg   rg   rh   __init__   s   




	


	

zServeController.__init__c                 C   s   | j r
| j |kr
d S | jtt| || _ | jtj	|i t
dtt |d ttt |d| _tjdtj dddid td	| j   d S )
Nr[   )r\   r]   ra   )r]   ra   zController starting (version='z').log_to_stderrFextraz;Configure the serve controller logger with logging config: )rY   ry   putr   r   dumpsrz   notify_changedr3   GLOBAL_LOGGING_CONFIGr/   r   r   r   r.   r~   r   r   rr   __version__debug)r   rY   rg   rg   rh   r     s:   



z1ServeController.reconfigure_global_logging_configreturnc                 C   s   dS )z+No-op to check if this controller is alive.Nrg   r   rg   rg   rh   check_alive=  s   zServeController.check_alivec                 C   s   t  S N)r   r   r   rg   rg   rh   get_pidA  s   zServeController.get_pidreplica_metric_reportc                 C   s   t   |j }|d }| jj||jjj|jjj|jjdd | j	
| |tkr=td|j d|j d| dt d	 | j| d S )	N  
deploymentapplicationreplicatagsz*Received autoscaling metrics from replica  with timestamp 
 which is Bms ago. This is greater than the warning threshold RPC latency of ms. This may indicate a performance issue with the controller try increasing the RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS environment variable.)r   	timestampreplica_metrics_delay_gauger   
replica_iddeployment_idrd   app_name	unique_idr   record_replica_metrics_delayr   r   warningr   "record_request_metrics_for_replica)r   r   latency
latency_msrg   rg   rh   'record_autoscaling_metrics_from_replicaD  s*   	z7ServeController.record_autoscaling_metrics_from_replicahandle_metric_reportc                 C   s   t   |j }|d }| jj||jj|jj|jdd | j	| |t
kr>td|j d|j d|j d| dt
 d	 | j| d S )
Nr   r   r   handler   z)Received autoscaling metrics from handle z for deployment r   r   r   r   )r   r   handle_metrics_delay_gauger   r   rd   r   	handle_idr   record_handle_metrics_delayr   r   r   r   !record_request_metrics_for_handle)r   r   r   r   rg   rg   rh   &record_autoscaling_metrics_from_handle_  s*   	z6ServeController.record_autoscaling_metrics_from_handler   c                 C      | j |S r   )r   %get_total_num_requests_for_deploymentr   r   rg   rg   rh   2_get_total_num_requests_for_deployment_for_testingz  s   zBServeController._get_total_num_requests_for_deployment_for_testingc                 C   r   r   )r   get_metrics_for_deploymentr   rg   rg   rh   '_get_metrics_for_deployment_for_testing  s   z7ServeController._get_metrics_for_deployment_for_testingc                 C   s   | j j| jS r   )r   _deployment_states	_replicasr   rg   rg   rh    _dump_replica_states_for_testing  s   z0ServeController._dump_replica_states_for_testingc                 C   s   | j j|   d S r   )r   r   %_stop_one_running_replica_for_testingr   rg   rg   rh   r     s   
z5ServeController._stop_one_running_replica_for_testingkeys_to_snapshot_idsc                    .   | j  s| j  I dH  | j|I dH S )a  Proxy long pull client's listen request.

        Args:
            keys_to_snapshot_ids (Dict[str, int]): Snapshot IDs are used to
              determine whether or not the host should immediately return the
              data or wait for the value to be changed.
        N)r}   is_setwaitrz   listen_for_change)r   r   rg   rg   rh   r     s   
z!ServeController.listen_for_changekeys_to_snapshot_ids_bytesc                    r   )zProxy long pull client's listen request.

        Args:
            keys_to_snapshot_ids_bytes (Dict[str, int]): the protobuf bytes of
              keys_to_snapshot_ids (Dict[str, int]).
        N)r}   r   r   rz   listen_for_change_java)r   r   rg   rg   rh   r     s   

z&ServeController.listen_for_change_javac                 C   
   | j  S )2Returns a dictionary of deployment name to config.)r   get_endpointsr   rg   rg   rh   get_all_endpoints  s   
z!ServeController.get_all_endpointsc                 C   s(   |   }dd | D }t|d S )r   c                 S   s"   i | ]\}}|j t|d  dqS )route)r   )rd   EndpointInfoProto)re   endpoint_tagendpoint_dictrg   rg   rh   
<dictcomp>      z:ServeController.get_all_endpoints_java.<locals>.<dictcomp>)	endpoints)r   itemsrE   SerializeToString)r   r   datarg   rg   rh   get_all_endpoints_java  s
   z&ServeController.get_all_endpoints_javac                 C   s   | j du ri S | j  S )z7Returns a dictionary of node ID to proxy actor handles.N)r   get_proxy_handlesr   rg   rg   rh   get_proxies  s   

zServeController.get_proxiesc                 C   s*   | j du rdS t| j   d}| S )z9Returns the proxy actor name list serialized by protobuf.N)names)r   r@   get_proxy_namesvaluesr   )r   actor_name_listrg   rg   rh   r    s   
zServeController.get_proxy_namesc                 C   s2   | j  }|t| j  }|| j || _dS )zUpdate the nodes set where proxy actors should run.

        Controller decides where proxy actors should run
        (head node and nodes with deployment replicas).
        N)r   get_active_node_idsr   r`   get_draining_nodesaddru   r   )r   new_proxy_nodesrg   rg   rh   r     s   

z#ServeController._update_proxy_nodesc                    s   g }t   | j D ])}| j|}| D ]\}} t||d |jj}|r2|	||||f qq
|| _
 fdd| j D | _d S )Nrd   r   c                    s   i | ]\}}| v r||qS rg   rg   )re   kvactive_dep_idsrg   rh   r     s
    zJServeController._refresh_autoscaling_deployments_cache.<locals>.<dictcomp>)r   r   list_app_nameslist_deployment_detailsr   r  r   deployment_configautoscaling_configappendr   r   )r   resultr   deployment_detailsdep_namedetailsr  rg   r  rh   r     s"   
z6ServeController._refresh_autoscaling_deployments_cachec           	      C   s   | j du rdS g }| jD ]5\}}}}t||d}| j|}|du r#q| j|}|dur3||r3q||j	dd || j|< q|rN| j 
d|i dS dS )zEEmit structured autoscaling snapshot logs in a single batch per loop.Nr	  T)exclude_none	snapshots)r~   r   r   r   get_deployment_snapshotr   r   is_scaling_equivalentr  dictr   )	r   snapshots_to_logr   r  r  r  dep_iddeployment_snapshotlastrg   rg   rh   &_emit_deployment_autoscaling_snapshots  s,   

z6ServeController._emit_deployment_autoscaling_snapshotsc           	   
      s  t }d}t }	 t }z| |||I d H  W n" ty< } ztd|  tdI d H  W Y d }~nd }~ww t | }|dkrTtjd| ddd	id
 | j	
| | j| |d7 }| j
| || j_t }ttI d H  t | }| j
| || j_q
)Nr   Tz,There was an exception in the control loop:    
   z%The last control loop was slow (took z~s). This is likely caused by running a large number of replicas in a single Ray cluster. Consider using multiple Ray clusters.r   Fr   )r    r   run_control_loop_step	Exceptionr   	exceptionr{   sleepr   control_loop_duration_gauge_sr   r   record_loop_durationnum_control_loops_gaugenum_control_loopsr   sleep_duration_gauge_slast_sleep_duration_s)	r   recovering_timeout	num_loops
start_timeloop_start_timeeloop_durationsleep_start_timesleep_durationrg   rg   rh   r     s@   
z ServeController.run_control_loopr/  r-  r.  c                    s  z| j   W n ty   td Y nw | jr/z|   W n ty.   td Y nw | j sJt		 | |krJt
d| d | j  d }z?t		 }| j }t		 | }| j| | j| | j s|s| j  |dkrtjdt		 | ddd	d
id W n ty   td Y nw z%t		 }| j }|s|r|   t		 | }	| j|	 | j|	 W n ty   td Y nw z|   W n ty   td Y nw t		 }
|   t		 |
 }| j| | j| | jr7| j r7z t		 }| jj| jd t		 | }| j| | j| W n ty6   td Y nw |d
u rJ| j| j | j  B  | j!ra| j" }t#$| t#%| &  d S d S )Nz+Exception updating cluster node info cache.zException during shutdown.z Replicas still recovering after z@s, setting done recovering event to broadcast long poll updates.r   z&Finished recovering deployments after z.2fzs.r   Fr   z$Exception updating deployment state.z%Exception updating application state.z4Exception emitting deployment autoscaling snapshots.)proxy_nodeszException updating proxy state.)'r`   r   r$  r   r%  r   shutdownr}   r   r   r   r   r   dsm_update_duration_gauge_sr   record_dsm_update_durationr   r   r   asm_update_duration_gauge_srecord_asm_update_durationr   r   node_update_duration_gauge_srecord_node_update_durationr   r   proxy_update_duration_gauge_srecord_proxy_update_durationr   drop_stale_handle_metricsget_alive_replica_actor_idsget_alive_proxy_actor_idsr   get_ingress_replicas_infor4   update_portsprune!_get_node_id_to_alive_replica_ids)r   r/  r-  r.  any_recoveringdsm_update_start_timedsm_durationasm_update_start_timeany_target_state_changedasm_durationnode_update_start_timenode_update_durationproxy_update_start_timeproxy_update_durationingress_replicas_info_listrg   rg   rh   r#  .  s   






z%ServeController.run_control_loop_stepc                 C   s   t jddd| _t jddd| _t jddd| _t jdd	d| _t jd
dd| _t jddd| _t jdddd| _| j	dt
  i t jdddd| _t jdddd| _d S )N'serve_controller_node_update_duration_sz:The control loop time spent on collecting proxy node info.)description.serve_controller_proxy_state_update_duration_sz4The control loop time spent on updating proxy state.3serve_controller_deployment_state_update_duration_sz9The control loop time spent on updating deployment state.4serve_controller_application_state_update_duration_sz:The control loop time spent on updating application state.!serve_controller_sleep_duration_sz.The duration of the last control loop's sleep.(serve_controller_control_loop_duration_sz&The duration of the last control loop."serve_controller_num_control_loopszpThe number of control loops performed by the controller. Increases monotonically over the controller's lifetime.)rm   )rR  tag_keysrm   *serve_autoscaling_replica_metrics_delay_mszpTime taken for the replica metrics to be reported to the controller. High values may indicate a busy controller.r   )serve_autoscaling_handle_metrics_delay_mszoTime taken for the handle metrics to be reported to the controller. High values may indicate a busy controller.r   )rU   Gauger;  r=  r7  r9  r+  r'  r)  set_default_tagsrr   rs   r   r   r   r   rg   rg   rh   r     sT   z,ServeController._create_control_loop_metricsc                 C   sD   |   \}}}|| _|d ur tjdddid | j||d d S d S )Nz!Recovered config from checkpoint.r   Fr   )deployment_time)_read_config_checkpointr   r   r   apply_config)r   r^  serve_configtarget_capacity_directionrg   rg   rh   r     s   z.ServeController._recover_state_from_checkpointc                 C   sD   | j t}|dur t|\}}}}|tt| |d|fS dS )a  Reads the current Serve config checkpoint.

        The Serve config checkpoint stores active application configs and
        other metadata.

        Returns:

        If the GCS contains a checkpoint, tuple of:
            1. A deployment timestamp.
            2. A Serve config. This Serve config is reconstructed from the
                active application states. It may not exactly match the
                submitted config (e.g. the top-level http options may be
                different).
            3. The target_capacity direction calculated after the Serve
               was submitted.

        If the GCS doesn't contain a checkpoint, returns (0, None, None).
        N)applicationstarget_capacity)        NN)ry   r   CONFIG_CHECKPOINT_KEYr   r   rP   listr  )r   
checkpointr^  rd  rb  config_checkpoints_dictrg   rg   rh   r_    s    
	z'ServeController._read_config_checkpointc                 C   r   )z_Used for testing.

        Returned dictionary maps deployment names to replica infos.
        )r   get_running_replica_infosr   rg   rg   rh   _all_running_replicas  s   
z%ServeController._all_running_replicasc                 C      | j S )z^Returns the actor details for this controller.

        Currently used for test only.
        )r   r   rg   rg   rh   get_actor_details  s   z!ServeController.get_actor_detailsc                 C   s,   z| j   W S  ty   td  w )a%  Returns comprehensive health metrics for the controller.

        This method provides detailed performance metrics to help diagnose
        controller health issues, especially as cluster size increases.

        Returns:
            Dictionary containing health metrics including:
            - Control loop performance (iteration speed, durations)
            - Event loop health (task count, scheduling delay)
            - Component update latencies
            - Autoscaling metrics latency (handle/replica)
            - Memory usage
        z/Exception collecting controller health metrics.)r   collect_metricsr  r$  r   r%  r   rg   rg   rh   get_health_metrics  s   
z"ServeController.get_health_metricsrj   c                 C   s   | j du rdS | j  |S )zReturns the proxy details for the proxy on the given node.

        Currently used for test only. Will return None if the proxy doesn't exist on
        the given node.
        N)r   get_proxy_detailsr   )r   rj   rg   rg   rh   rp  *  s   
z!ServeController.get_proxy_detailsr   c                 C   s.   | j   D ]\}}||kr|j  S qdS )zcReturns the deployment timestamp for the given app.

        Currently used for test only.
        N)r   list_app_statusesr   deployment_timestamp)r   r   	_app_nameapp_status_inforg   rg   rh   get_deployment_timestamps5  s   
z)ServeController.get_deployment_timestampsdeployment_namec                 C   s   | j || S )zjReturns the deployment details for the app and deployment.

        Currently used for test only.
        )r   r  )r   r   rv  rg   rg   rh   get_deployment_detailsA  s   
z&ServeController.get_deployment_detailsc                 C      | j du rt S | j  S )z$Return the HTTP proxy configuration.N)r   r=   
get_configr   rg   rg   rh   get_http_configL     

zServeController.get_http_configc                 C   rx  )z$Return the gRPC proxy configuration.N)r   r?   get_grpc_configr   rg   rg   rh   r|  R  r{  zServeController.get_grpc_configc                 C   sN   | j du rdS |  }|jdkr$|jdu rdS dt|j|j |j S |jS )z+Return the root url for the serve instance.N zhttp://)r   rz  root_urlhostr   port	root_path)r   http_configrg   rg   rh   get_root_urlX  s   


zServeController.get_root_urlc                 C   s   | j tdu S )zReturns whether the config checkpoint has been deleted.

        Get the config checkpoint from the kv store. If it is None, then it has been
        deleted.
        N)ry   r   rf  r   rg   rg   rh   config_checkpoint_deletedg  s   z)ServeController.config_checkpoint_deletedactor_handlec                 C   s   |j  }|| j|< dS )aI  Register an actor to be killed on serve.shutdown().

        This allows deployments to register auxiliary actors (like caches,
        coordinators, etc.) that should be cleaned up when Serve shuts down.
        The actors must use lifetime="detached" to survive replica restarts,
        but will be explicitly killed during serve.shutdown().

        Note: Registered actors are NOT persisted across controller restarts.
        For full persistence, use controller-managed deployment-scoped actors
        (see https://github.com/ray-project/ray/issues/60359).

        If the same actor is registered multiple times (e.g., from multiple
        router instances sharing a tree actor via get_if_exists=True), it will
        only be stored once.

        Args:
            actor_handle: The actor handle to register for cleanup.
        N)	_actor_idhexr   )r   r  rm   rg   rg   rh    _register_shutdown_cleanup_actoro  s   
z0ServeController._register_shutdown_cleanup_actorc              	   C   s8   | j  D ]}z	tj|dd W q ty   Y qw dS )z0Kill all actors registered for shutdown cleanup.T
no_restartN)r   r  rr   killr$  )r   rf   rg   rg   rh   _kill_registered_cleanup_actors  s   z/ServeController._kill_registered_cleanup_actorsc                 C   s  | j sdS | jdu rt | _tjdddid | jt | jt | j	
  | j
  | j
  | jr;| j
  |  }| j	 }| j }| j }| jdu pW| j }|r}|r}|r}|r}|r}|   tjdddid t j}tj|dd dS t | j d	kr|stjt d
ddid |stjdddid |stjdddid |stjdddid |stjdddid dS dS dS )am  Shuts down the serve instance completely.

        This method will only be triggered when `self._shutting_down` is true. It
        deletes the kv store for config checkpoints, sets application state to deleting,
        delete all deployments, and shuts down all proxies. Once all these
        resources are released, it then kills the controller actor.
        NzController shutdown started.r   Fr   z1All resources have shut down, controller exiting.Tr  r"  z not yet deletedzapplication not yet shutdownzdeployment not yet shutdownzendpoint not yet shutdownzproxy_state not yet shutdown)r   r   r   r   r   ry   deleterf  r   r   r6  r   r   r   r  is_ready_for_shutdownr  r   rr   rs   current_actorr  )r   r  application_is_shutdowndeployment_is_shutdownendpoint_is_shutdownproxy_state_is_shutdown_controller_actorrg   rg   rh   r6    s   











zServeController.shutdownname_to_deployment_args_listname_to_application_argsc                 C   s   i }|  D ]-\}}g }|D ] }t|}||j|j|j|j|j|	dr)|j
ndd q|||< qi }	|  D ]\}}
t|
|	|< q:| j||	 | j  dS )a0  
        Takes in a list of dictionaries that contain deployment arguments.
        If same app name deployed, old application will be overwritten.

        Args:
            name: Application name.
            deployment_args_list: List of serialized deployment information,
                where each item in the list is bytes representing the serialized
                protobuf `DeploymentArgs` object. `DeploymentArgs` contains all the
                information for the single deployment.
            name_to_application_args: Dictionary mapping application names to serialized
                application arguments, where each item is bytes representing the serialized
                protobuf `ApplicationArgs` object. `ApplicationArgs` contains the information
                for the application.
        route_prefixN)rv  deployment_config_proto_bytesreplica_config_proto_bytesdeployer_job_idingressr  )r   rB   
FromStringr  rv  r  replica_configr  r  HasFieldr  rA   r   deploy_appssave_checkpoint)r   r  r  name_to_deployment_argsrd   deployment_args_listdeployment_args_deserializeddeployment_args_bytesargs%name_to_application_args_deserializedapplication_args_bytesrg   rg   rh   deploy_applications  s0   


z#ServeController.deploy_applicationsrd   r  application_argsc                 C   s   |  ||i||i dS )av  
        Deploy a single application
        (as deploy_applications(), but it only takes a single name and deployment args).
        This primarily exists as a shim to avoid
        changing Java code in https://github.com/ray-project/ray/pull/49168,
        and could be removed if the Java code was refactored
        to use the new bulk deploy_applications API.
        N)r  )r   rd   r  r  rg   rg   rh   deploy_application  s   z"ServeController.deploy_applicationre  configr^  c              	   C   s   t jd |st }i }|  \}}}t||| jd| _t| j|j	| j |j	| _|j
D ]}|jdu r<|jr<|j|_|jdd}|||j< q.| jtt|| j| j|f | jj|j
|| j| jd | j  dS )zApply the config described in `ServeDeploySchema`.

        This will upgrade the applications to the goal state specified in the
        config.

        If `deployment_time` is not provided, `time.time()` is used.
        v2)curr_config
new_configcurr_target_capacity_directionNTexclude_unset)r^  rd  rb  )r7   API_VERSIONrecordr   r_  #calculate_target_capacity_directionr   log_target_capacity_changer   rd  rc  ra   r  rd   ry   r   rf  r   r   r   apply_app_configsr  )r   r  r^  new_config_checkpoint_r  
app_configapp_config_dictrg   rg   rh   r`    sL   
zServeController.apply_configr}  c                 C   sn   t ||d}| j|}|du r%|rd| dnd}td| d| d| j|}t| |d	}| S )
a  Get the current information about a deployment.

        Args:
            name: the name of the deployment.

        Returns:
            DeploymentRoute's protobuf serialized bytes

        Raises:
            KeyError: If the deployment doesn't exist.
        r	  Nz in application ''r}  zDeployment 'z' does not existr^   )deployment_infor   )	r   r   get_deploymentKeyErrorr   get_endpoint_routerC   to_protor   )r   rd   r   idr  app_msgr   deployment_routerg   rg   rh   get_deployment_info\  s   z#ServeController.get_deployment_infoc                    s    fdd j   D S )zGets the current information about all deployments.

        Returns:
            Dict(deployment_id, (DeploymentInfo, route))
        c                    s"   i | ]\}}|| j |fqS rg   )r   r  )re   r  r   r   rg   rh   r   }  r   z=ServeController.list_deployments_internal.<locals>.<dictcomp>)r   get_deployment_infosr   r   rg   r   rh   list_deployments_internalu  s   
z)ServeController.list_deployments_internalc                 C   s   | j  |}|r|jS dS )a  Get the deployment config for the given deployment id.

        Args:
            deployment_id: The deployment id to get the config for.

        Returns:
            A deployment config object if the deployment id exist,
            None otherwise.
        N)r   r  r   r  )r   r   r  rg   rg   rh   get_deployment_config  s   
z%ServeController.get_deployment_configc                 C   s   | j j S )z6Gets the current list of all deployments' identifiers.)r   r   keysr   rg   rg   rh   list_deployment_ids  s   z#ServeController.list_deployment_idstarget_num_replicasc                 C   sX   |j }| j|std| d| j|s#td|j d| d| j|| dS )aA  Update the target number of replicas for a deployment.

        Args:
            deployment_id: The deployment to update.
            target_num_replicas: The new target number of replicas.

        Raises:
            ExternalScalerDisabledError: If external_scaler_enabled is set to False for the application.
        zApplication 'z' not foundz'Cannot update replicas for deployment 'z' in application 'a	  '. The external scaling API can only be used when 'external_scaler_enabled' is set to true in the application configuration. Current value: external_scaler_enabled=false. To use this API, redeploy your application with 'external_scaler_enabled: true' in the config.N)	r   r   does_app_exist
ValueErrorget_external_scaler_enabledr+   rd   r   set_target_num_replicas)r   r   r  r   rg   rg   rh   update_deployment_replicas  s   
	z*ServeController.update_deployment_replicassourcec                 C   s  |   }|  }i }| jj|d}|r|  ni }| D ]2\}}t|| j|| ||j	|j
|j||| j|| j|| j|| j|d||< qt|jdd}	t|jdd}
t| j| jt|j|	|
| jrv| j nd||  djddS )a  Gets details on all applications on the cluster and system-level info.

        The information includes application and deployment statuses, config options,
        error messages, etc.

        Args:
            source: If provided, returns application
                statuses for applications matching this API type.
                Defaults to None, which means all applications are returned.

        Returns:
            Dict that follows the format of the schema ServeInstanceDetails.
        )r  )rd   r  	docs_pathstatusmessagelast_deployed_time_sdeployed_app_configr  deploymentsexternal_scaler_enableddeployment_topologyTr  N)rd  controller_infoproxy_locationrX   rW   proxiesrc  target_groups)rz  r|  r   rq  get_app_configsr   rG   get_route_prefixget_docs_pathr  r  rr  r   get_app_sourcer  r  get_deployment_topologyrI   	parse_objr  rT   rQ   r   r   r>   _from_deployment_moder   r   rp  get_target_groups'_get_user_facing_json_serializable_dict)r   r  r  grpc_configrc  app_statusesapp_configsr   rt  rX   rW   rg   rg   rh   get_serve_instance_details  sX   


z*ServeController.get_serve_instance_detailsc                 C   s^   g }| j  r-|ttjd| j tjd t|  r-|ttj	d| j tj	d |S )z*Get target groups for proxy-based routing./)protocolr  targets)
r   rp  r  rS   r   HTTPget_targetsr;   r|  GRPC)r   r  rg   rg   rh   _get_proxy_target_groups  s(   
	z(ServeController._get_proxy_target_groupsFfrom_proxy_managerc                    s      } js	|S |du rdd  j  D }n|g} fdd|D }|s*|S g }|D ]} j|} ||}|rD|| q.| || q.|S )a  Get target groups for direct ingress deployments.

        This returns target groups that point directly to replica ports
        rather than proxy ports when direct ingress is enabled.

        Following situations are possible:
        1. Direct ingress is not enabled. In this case, we just return the
        target groups from the proxy implementation.
        2. Direct ingress is enabled and there are no applications. In this case,
        we return target groups for proxy. Serve controller is running but there
        are no applications to route traffic to.
        3. Direct ingress is enabled and there are applications. All applications
        have atleast one running replica. In this case, we return target groups
        for all applications with targets pointing to the running replicas.
        4. Direct ingress is enabled and there are applications. Some applications
        have no running replicas. In this case, for applications that have no
        running replicas, we return target groups for proxy and for applications
        that have running replicas, we return target groups for direct ingress.
        If there are multiple applications with no running replicas, we return
        one target group per application with unique route prefix.
        Nc                 S   s   g | ]\}}|qS rg   rg   )re   rs  r  rg   rg   rh   ri   <  s    z5ServeController.get_target_groups.<locals>.<listcomp>c                    s    g | ]} j |d ur|qS r   )r   r  re   appr   rg   rh   ri   E  s
    )	r  r   r   rq  r   r  _get_target_groups_for_appextend3_get_target_groups_for_app_with_no_running_replicas)r   r   r  proxy_target_groupsappsr  r  app_target_groupsrg   r   rh   r    s2   
z!ServeController.get_target_groupsc                    s`   | j |}t||d}| j|}|sg S |j}dd | j |g D   fdd|D S )z7Get running replica details for a specific application.)r   rd   c                 S   s   h | ]}|j jqS rg   )r   r   )re   replica_inforg   rg   rh   	<setcomp>j  s    zVServeController._get_running_replica_details_for_ingress_deployment.<locals>.<setcomp>c                    s   g | ]	}|j  v r|qS rg   )r   re   replica_detailrunning_replica_idsrg   rh   ri   p  s
    
zWServeController._get_running_replica_details_for_ingress_deployment.<locals>.<listcomp>)r   get_ingress_deployment_namer   r   rw  replicasrj  r   )r   r   ingress_deployment_namer   r  replica_detailsrg   r  rh   3_get_running_replica_details_for_ingress_deployment^  s   


zCServeController._get_running_replica_details_for_ingress_deploymentr  c                 C   sz   |  |}|s	g S g }| |tj}|r |ttj|||d t|  r;| |tj}|r;|ttj|||d |S )a  
        Create HTTP and gRPC target groups for a specific application.

        This function can return empty list if there are no running replicas.
        Or replicas have not fully initialized yet, where their ports are not
        allocated yet.
        r  r  r  r   )	r  _get_targets_for_protocolr   r  r  rS   r;   r|  r  )r   r   r  r  r  http_targetsgrpc_targetsrg   rg   rh   r  v  s@   
	z*ServeController._get_target_groups_for_appc                 C   s\   g }| j tj}| j tj}|r|ttj|||d |r,|ttj|||d |S )z
        For applications that have no running replicas, we return target groups
        for proxy. This will allow applications to be discoverable via the
        proxy in situations where their replicas have scaled down to 0.
        r  )r   r  r   r  r  r  rS   )r   r  r   r  r  r  rg   rg   rh   r    s,   zCServeController._get_target_groups_for_app_with_no_running_replicasr  r  c                    s    fdd|D S )z?Create targets for a specific protocol from a list of replicas.c                    s6   g | ]} | rt|j| |j|jd qS ))ipr  instance_idrd   )_is_port_allocatedrR   rk   	_get_portrl   rn   r  r  r   rg   rh   ri     s    

z=ServeController._get_targets_for_protocol.<locals>.<listcomp>rg   )r   r  r  rg   r  rh   r    s   z)ServeController._get_targets_for_protocolc                 C   sX   t t}| jj D ]}|j }|D ]}|j}|d u rq|jj	}|| 
| qq
|S r   )r   r   r   r   r  r   r   actor_node_idr   r   r  )r   node_id_to_alive_replica_idsdsr  r   rj   replica_unique_idrg   rg   rh   rE    s   
z1ServeController._get_node_id_to_alive_replica_idsr   c                 C   s   t |}|||S )z;Allocate an HTTP port for a replica in direct ingress mode.)r4   get_node_managerallocate_port)r   rj   r   r  node_managerrg   rg   rh   allocate_replica_port  s   
z%ServeController.allocate_replica_portr  
block_portc                 C   s   t |}||||| dS )z:Release an HTTP port for a replica in direct ingress mode.N)r4   r  release_port)r   rj   r   r  r  r  r  rg   rg   rh   release_replica_port  s   
	z$ServeController.release_replica_portr  c                 C      t |j}||j|S )zGet the port for a replica.)r4   r  rj   get_portr   r   r  r  r  rg   rg   rh   r       zServeController._get_portc                 C   r  )z-Check if the port for a replica is allocated.)r4   r  rj   is_port_allocatedr   r  rg   rg   rh   r    r  z"ServeController._is_port_allocatedc                 C   s2   | j |}| j |}t|||d}|  S )zReturn application status
        Args:
            name: application name. If application name doesn't exist, app_status
                  is NOT_STARTED.
        )rd   
app_statusdeployment_statuses)r   get_app_status_infoget_deployments_statusesr   r  r   )r   rd   r  r  status_inforg   rg   rh   get_serve_status  s   z ServeController.get_serve_statusr  c                 C   s"   g }|D ]
}| | | q|S r   )r  r#  )r   r  statusesrd   rg   rg   rh   get_serve_statuses  s   z"ServeController.get_serve_statusesc                 C   s(   g }| j  D ]
}|| | q|S r   )r   rq  r  r#  )r   r$  rd   rg   rg   rh   list_serve_statuses$  s   z#ServeController.list_serve_statusesc                 C   s<   | j t}|d u ri S t|\}}}}dd | D S )Nc                 S   s   i | ]
\}}|t |qS rg   )rO   r  )re   r  r  rg   rg   rh   r   0  s    
z3ServeController.get_app_configs.<locals>.<dictcomp>)ry   r   rf  r   r   r   )r   rh  r  ri  rg   rg   rh   r  *  s   zServeController.get_app_configsc                 C   r   )a  Get the external_scaler_enabled flag value for an application.

        This is a helper method specifically for Java tests to verify the flag
        is correctly set, since Java cannot deserialize Python Pydantic objects.

        Args:
            app_name: Name of the application.

        Returns:
            True if external_scaler_enabled is set for the application, False otherwise.
        )r   r  r   r   rg   rg   rh   r  5  s   z+ServeController.get_external_scaler_enabledc                 C   s   | j  }dd |D S )z6Gets deployment status bytes for all live deployments.c                 S   s   g | ]}|   qS rg   )r  r   )re   r  rg   rg   rh   ri   F  s    z?ServeController.get_all_deployment_statuses.<locals>.<listcomp>)r   get_deployment_statuses)r   r$  rg   rg   rh   get_all_deployment_statusesC  s   
z+ServeController.get_all_deployment_statusesc                 C   s2   t ||d}| j|g}|sdS |d   S )zGet deployment status by deployment name.

        Args:
            name: Deployment name.
            app_name: Application name. Default is "" because 1.x
                deployments go through this API.
        r	  Nr   )r   r   r(  r  r   )r   rd   r   r  r  rg   rg   rh   get_deployment_statusH  s
   z%ServeController.get_deployment_statusc                 C   r   )zqDocs path for application.

        Currently, this is the OpenAPI docs path for FastAPI-integrated applications.)r   r  )r   rd   rg   rg   rh   r  Y  s   zServeController.get_docs_pathc                 C   r   )zName of the ingress deployment in an application.

        Returns:
            Ingress deployment name (str): if the application exists.
            None: if the application does not exist.
        )r   r   r'  rg   rg   rh   r   _  s   z+ServeController.get_ingress_deployment_namec                 C   s$   |D ]}| j | q| j   dS )zhDelete applications based on names

        During deletion, the application status is DELETING
        N)r   
delete_appr  )r   r  rd   rg   rg   rh   delete_appsh  s   zServeController.delete_appsr   c                 C   s   | j | dS )zRecord replica routing information for a replica.

        Args:
            info: RequestRoutingInfo including deployment name, replica tag,
                multiplex model ids, and routing stats.
        N)r   record_request_routing_info)r   r   rg   rg   rh   r-  r  s   z+ServeController.record_request_routing_infoc                 C   r   )a	  Get the current rank mapping for all replicas in a deployment.
        Args:
            deployment_id: The deployment ID to get ranks for.
        Returns:
            Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
        )r   _get_replica_ranks_mappingr   rg   rg   rh   r.  {  s   	z*ServeController._get_replica_ranks_mappingTr   c                    s$   d| _ |sdS | j I dH  dS )a  Set the shutting down flag on controller to signal shutdown in
        run_control_loop().

        This is used to signal to the controller that it should proceed with shutdown
        process, so it can shut down gracefully. It also waits until the shutdown
        event is triggered if wait is true.

        Raises:
            RayActorError: if wait is True, the caller waits until the controller
                is killed, which raises a RayActorError.
        TN)r   r   r   )r   r   rg   rg   rh   graceful_shutdown  s
   z!ServeController.graceful_shutdownc                 C   s0   d}t jD ]}t|tjjr|jj}q| j|fS )z5Get the logging configuration (for testing purposes).N)r   handlers
isinstanceloggingMemoryHandlertargetbaseFilenamerY   )r   rp   handlerrg   rg   rh   _get_logging_config  s   

z#ServeController._get_logging_configc                 C   rl  )z=Gets the controller's scale direction (for testing purposes).)r   r   rg   rg   rh   _get_target_capacity_direction  s   z.ServeController._get_target_capacity_direction)r   N)re  )r}  r   )NF)F)T)r__name__
__module____qualname____doc__r=   rJ   r   r?   r   r   r   intr   r   r   r   r   r   r   r   r   r   r   r   r   bytesr   r   r   r   r   r   r   r  r   r   r   r   floatr#  r   r   r	   rP   r   r_  r   r   rk  rN   rm  ro  rK   rp  ru  rH   rw  rz  r|  r  boolr  r  r  r6  r  r  r`  r  r'   r  r   r  r  r  rF   r  rS   r  r  rL   r  r  r  r   rR   r  r   rE  r  r  r  r  r"   r#  r%  r&  rO   r  r  r)  r
   r*  r  r   r   r,  r   r-  rM   r.  r/  r7  r8  rg   rg   rg   rh   rV   v   s   
 
"






)
k7
*

H

3

A


 J
B

2
 






	
	

rV   r  r  r  r   c                 C   s   d}d}| dur>t | |r>| j}|j}||kr|}|S |du r(|dur(tj}|S |du r0d}|S ||k r9tj}|S tj}|S |jdurHtj}|S d}|S )zCCompares two Serve configs to calculate the next scaling direction.N)applications_matchrd  r   DOWNUP)r  r  r  curr_target_capacitynext_target_capacity_directionnext_target_capacityrg   rg   rh   r    s0   

r  config1config2c                 C   s(   dd | j D }dd |j D }||kS )zzChecks whether the applications in config1 and config2 match.

    Two applications match if they have the same name.
    c                 S      h | ]}|j qS rg   rd   r  rg   rg   rh   r        z%applications_match.<locals>.<setcomp>c                 S   rI  rg   rJ  r  rg   rg   rh   r    rK  )rc  )rG  rH  config1_app_namesconfig2_app_namesrg   rg   rh   rA    s   rA  rD  rF  rE  c              	   C   sL   | |kr$t |trtd|j  d|  d| d dS td dS dS )z$Logs changes in the target_capacity.zTarget capacity scaling z from z to r^   z.Target capacity entering 100% at steady state.N)r1  r   r   r   valuelower)rD  rF  rE  rg   rg   rh   r    s   
r  )r{   r2  r   r   r   collectionsr   typingr   r   r   r   r   r   r	   r
   rr   ray._common.network_utilsr   ray._common.utilsr   ray._rayletr   	ray.actorr   $ray.serve._private.application_stater   r   $ray.serve._private.autoscaling_stater   ray.serve._private.commonr   r   r   r   r   r   r   r   r   ray.serve._private.configr   ray.serve._private.constantsr   r   r   r   r    r!   r"   r#   r$   4ray.serve._private.controller_health_metrics_trackerr%   ray.serve._private.default_implr&   "ray.serve._private.deployment_infor'   #ray.serve._private.deployment_stater(   r)   !ray.serve._private.endpoint_stater*   ray.serve._private.exceptionsr+   ray.serve._private.grpc_utilr,   ray.serve._private.http_utilr-    ray.serve._private.logging_utilsr.   r/   r0   r1   ray.serve._private.long_pollr2   r3   $ray.serve._private.node_port_managerr4   ray.serve._private.proxy_stater5   #ray.serve._private.storage.kv_storer6   ray.serve._private.usager7   ray.serve._private.utilsr8   r9   r:   r;   ray.serve.configr<   r=   r>   r?   ray.serve.generated.serve_pb2r@   rA   rB   rC   rD   r   rE   ray.serve.schemarF   rG   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   ray.utilrU   	getLoggerr   #_CRASH_AFTER_CHECKPOINT_PROBABILITYrf  r   rV   r?  r  r@  rA  r  rg   rg   rg   rh   <module>   s    (,, D
            >
#