o
    ciI@                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZ d dl	m
Z
mZmZmZ d dlmZmZ d dlmZ d dlmZ e eZeG dd	 d	ZeG d
d dZG dd dZG dd dZdS )    N)	dataclass)DictListOptionalSet)DeploymentHandleSourceDeploymentID	ReplicaIDTargetCapacityDirection)&RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_SSERVE_LOGGER_NAME)DeploymentInfo)"get_capacity_adjusted_num_replicasc                   @   sj   e Zd ZU dZee ed< eed< eed< e	e
ef ed< eed< edefdd	Zedefd
dZdS )HandleMetricReporta  Report from a deployment handle on queued and ongoing requests.

    Args:
        actor_id: If the deployment handle (from which this metric was
            sent) lives on an actor, the actor ID of that actor.
        handle_source: Describes what kind of entity holds this
            deployment handle: a Serve proxy, a Serve replica, or
            unknown.
        queued_requests: The current number of queued requests at the
            handle, i.e. requests that haven't been assigned to any
            replica yet.
        running_requests: A map of replica ID to the average number of
            requests, assigned through the handle, running at that
            replica.
        timestamp: The time at which this report was received.
    actor_idhandle_sourcequeued_requestsrunning_requests	timestampreturnc                 C   s   | j t| j  S )z,Total number of queued and running requests.)r   sumr   valuesself r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/autoscaling_state.pytotal_requests/   s   z!HandleMetricReport.total_requestsc                 C   s   | j tjtjfv S )aQ  Whether the handle source is a Serve actor.

        More specifically, this returns whether a Serve actor tracked
        by the controller holds the deployment handle that sent this
        report. If the deployment handle lives on a driver, a Ray task,
        or an actor that's not a Serve replica, then this returns False.
        )r   r   PROXYREPLICAr   r   r   r   is_serve_component_source4   s   	z,HandleMetricReport.is_serve_component_sourceN)__name__
__module____qualname____doc__r   str__annotations__r   floatr   r	   propertyr   boolr   r   r   r   r   r      s   
 r   c                   @   s"   e Zd ZU dZeed< eed< dS )ReplicaMetricReportzReport from a replica on ongoing requests.

    Args:
        running_requests: Average number of running requests at the
            replica.
        timestamp: The time at which this report was received.
    r   r   N)r    r!   r"   r#   r&   r%   r   r   r   r   r)   C   s   
 r)   c                   @   s  e Zd ZdZdefddZdededefdd	Zd
e	fddZ
defddZdefddZdee	 fddZdefddZdedefddZd
e	dee deddfddZded ee d!ed"ed#ee	ef deddfd$d%Zd&ee ddfd'd(Z	)d/ded*edefd+d,Zdefd-d.ZdS )0AutoscalingStatez,Manages autoscaling for a single deployment.deployment_idc                 C   s>   || _ t | _t | _d | _d | _d | _g | _d | _d | _	d S N)
_deployment_iddict_handle_requests_replica_requests_deployment_info_config_policy_running_replicas_target_capacity_target_capacity_directionr   r+   r   r   r   __init__T   s   
zAutoscalingState.__init__infocurr_target_num_replicasr   c                 C   sl   |j j}| jdu s| j|r|jdur|j}n|}|| _|| _| j | _|j| _	|j
| _i | _| |S )zyRegisters an autoscaling deployment's info.

        Returns the number of replicas the target should be set to.
        N)deployment_configautoscaling_configr1   config_changedinitial_replicasr2   
get_policyr3   target_capacityr5   target_capacity_directionr6   _policy_stateapply_bounds)r   r9   r:   configtarget_num_replicasr   r   r   registerf   s   

zAutoscalingState.register
replica_idc                 C   s   || j v r| j |= d S d S r,   )r0   )r   rG   r   r   r   on_replica_stopped}   s   
z#AutoscalingState.on_replica_stoppedc                 C   s8   | j jd ur| jtjkrt| j j| jS t| j j| jS r,   )r2   r>   r6   r
   UPr   r5   min_replicasr   r   r   r   get_num_replicas_lower_bound   s   z-AutoscalingState.get_num_replicas_lower_boundc                 C   s   t | jj| jS r,   )r   r2   max_replicasr5   r   r   r   r   get_num_replicas_upper_bound   s   z-AutoscalingState.get_num_replicas_upper_boundrunning_replicasc                 C   s
   || _ dS )z=Update cached set of running replica IDs for this deployment.N)r4   )r   rN   r   r   r   update_running_replica_ids   s   
z+AutoscalingState.update_running_replica_ids&num_replicas_running_at_target_versionc                 C   s   |  ||kS )zWhether or not this deployment is within the autoscaling bounds.

        Returns: True if the number of running replicas for the current
            deployment version is within the autoscaling bounds. False
            otherwise.
        )rC   )r   rP   r   r   r   is_within_bounds   s   	z!AutoscalingState.is_within_boundsnum_replicasc                 C   s   t |  t|  |S )zqClips a replica count with current autoscaling bounds.

        This takes into account target capacity.
        )maxrK   minrM   )r   rR   r   r   r   rC      s   zAutoscalingState.apply_bounds
window_avgsend_timestampNc                 C   s@   |du rdS || j vs|| j | jkrt||d| j |< dS dS )z8Records average number of ongoing requests at a replica.N)r   r   )r0   r   r)   )r   rG   rU   rV   r   r   r   "record_request_metrics_for_replica   s   
z3AutoscalingState.record_request_metrics_for_replica	handle_idr   r   r   r   c                C   s:   || j vs|| j | jkrt|||||d| j |< dS dS )zgRecords average number of queued and running requests at a handle for this
        deployment.
        )r   r   r   r   r   N)r/   r   r   )r   rX   r   r   r   r   rV   r   r   r   !record_request_metrics_for_handle   s   
z2AutoscalingState.record_request_metrics_for_handlealive_serve_actor_idsc                 C   s   t d| jj t}t| j D ]_\}}|jr<|jdur<|j|vr<| j|= |j	dkr;t
d| d|j d|j	 d qt |j |kro| j|= |j	dkro|j}|rYd| d	nd
}t
d| d	| d|dd|j	 d	 qdS )Drops handle metrics that are no longer valid.

        This includes handles that live on Serve Proxy or replica actors
        that have died AND handles from which the controller hasn't
        received an update for too long.
           Nr   zDropping metrics for handle 'z%' because the Serve actor it was on (z) is no longer alive. It had z ongoing requestsz
on actor 'z'  z#Dropping stale metrics for handle 'z#because no update was received for z.1fzs. Ongoing requests was: .)rS   r2   metrics_interval_sr   listr/   itemsr   r   r   loggerdebugtimer   r9   )r   rZ   	timeout_srX   handle_metricr   
actor_infor   r   r   drop_stale_handle_metrics   sB   




z*AutoscalingState.drop_stale_handle_metricsF_skip_bound_checkc              	   C   s@   | j ||  t| j| j|  |  | jd}|r|S | |S )a  Decide the target number of replicas to autoscale to.

        The decision is based off of the number of requests received
        for this deployment. After the decision number of replicas is
        returned by the policy, it is then bounded by the bounds min
        and max adjusted by the target capacity and returned. If
        `_skip_bound_check` is True, then the bounds are not applied.
        )r:   total_num_requestsnum_running_replicasrD   capacity_adjusted_min_replicascapacity_adjusted_max_replicaspolicy_state)	r3   get_total_num_requestslenr4   r2   rK   rM   rB   rC   )r   r:   ri   decision_num_replicasr   r   r   get_decision_num_replicas  s   

z*AutoscalingState.get_decision_num_replicasc                 C   sx   d}| j D ]}|| jv r|| j| j7 }q|dk}| j D ]}||j7 }|s9| j D ]}||jv r8||j| 7 }q*q|S )a  Get average total number of requests aggregated over the past
        `look_back_period_s` number of seconds.

        If there are 0 running replicas, then returns the total number
        of requests queued at handles

        This code assumes that the metrics are either emmited on handles
        or on replicas, but not both. Its the responsibility of the writer
        to ensure enclusivity of the metrics.
        r   )r4   r0   r   r/   r   r   )r   r   idmetrics_collected_on_replicasrf   r   r   r   ro      s   




z'AutoscalingState.get_total_num_requests)F)r    r!   r"   r#   r   r8   r   intrF   r	   rH   rK   rM   r   rO   rQ   rC   r   r&   rW   r$   r   r   rY   r   rh   r(   rr   ro   r   r   r   r   r*   Q   sX    

	
*
r*   c                   @   s"  e Zd ZdZdd Zdedededefdd	Zdefd
dZ	dede
e fddZdefddZdeeef fddZdededefddZdedefddZdededefddZdedee deddfddZded ed!ee d"ed#ed$eeef deddfd%d&Zd'ee ddfd(d)ZdS )*AutoscalingStateManagerzManages all things autoscaling related.

    Keeps track of request metrics for each deployment and decides on
    the target number of replicas to autoscale to based on those metrics.
    c                 C   s
   i | _ d S r,   _autoscaling_statesr   r   r   r   r8   E  s   
z AutoscalingStateManager.__init__r+   r9   r:   r   c                 C   s6   |j jsJ || jvrt|| j|< | j| ||S )z%Register autoscaling deployment info.)r;   r<   rx   r*   rF   )r   r+   r9   r:   r   r   r   register_deploymentH  s   

z+AutoscalingStateManager.register_deploymentc                 C   s   | j |d dS )z Remove deployment from tracking.N)rx   popr7   r   r   r   deregister_deploymentV  s   z-AutoscalingStateManager.deregister_deploymentrN   c                 C   s   | j | | d S r,   )rx   rO   )r   r+   rN   r   r   r   rO   Z  s   
z2AutoscalingStateManager.update_running_replica_idsrG   c                 C   s(   |j }|| jv r| j| | d S d S r,   )r+   rx   rH   )r   rG   r+   r   r   r   rH   a  s   
z*AutoscalingStateManager.on_replica_stoppedc                    s    fdd j D S )Nc                    s   i | ]}|  |qS r   )ro   ).0r+   r   r   r   
<dictcomp>g  s    
z7AutoscalingStateManager.get_metrics.<locals>.<dictcomp>rw   r   r   r   r   get_metricsf  s   
z#AutoscalingStateManager.get_metricsc                 C   s   | j | j|dS )N)r:   )rx   rr   )r   r+   r:   r   r   r   get_target_num_replicasl  s   
z/AutoscalingStateManager.get_target_num_replicasc                 C   s   | j |  S r,   )rx   ro   r7   r   r   r   ro   s  s   z.AutoscalingStateManager.get_total_num_requestsrP   c                 C   s   | j | |S r,   )rx   rQ   )r   r+   rP   r   r   r   rQ   v  s   
z(AutoscalingStateManager.is_within_boundsrU   rV   Nc                 C   s.   |j }|| jv r| j| j|||d d S d S )N)rG   rU   rV   )r+   rx   rW   )r   rG   rU   rV   r+   r   r   r   rW   }  s   


z:AutoscalingStateManager.record_request_metrics_for_replicarX   r   r   r   r   c                C   s.   || j v r| j | j||||||d dS dS )z,Update request metric for a specific handle.)rX   r   r   r   r   rV   N)rx   rY   )r   r+   rX   r   r   r   r   rV   r   r   r   rY     s   


z9AutoscalingStateManager.record_request_metrics_for_handlerZ   c                 C   s   | j  D ]}|| qdS )r[   N)rx   r   rh   )r   rZ   autoscaling_stater   r   r   rh     s   z1AutoscalingStateManager.drop_stale_handle_metrics)r    r!   r"   r#   r8   r   r   ru   ry   r{   r   r	   rO   rH   r   r&   r~   r   ro   r(   rQ   r   rW   r$   r   rY   r   rh   r   r   r   r   rv   >  s|    





	

rv   )loggingrd   dataclassesr   typingr   r   r   r   ray.serve._private.commonr   r   r	   r
   ray.serve._private.constantsr   r   "ray.serve._private.deployment_infor   ray.serve._private.utilsr   	getLoggerrb   r   r)   r*   rv   r   r   r   r   <module>   s     
, n