o
    ci                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlm	Z	m
Z
mZmZmZmZ d dlmZmZ eeZeG dd dZG dd	 d	Zed
dG dd dZG dd dZdS )    N)defaultdict)	dataclassfield)CallableDefaultDictDictHashableListOptional)*METRICS_PUSHER_GRACEFUL_SHUTDOWN_TIMEOUT_SSERVE_LOGGER_NAMEc                   @   s   e Zd ZU eed< eed< dS )_MetricsTask	task_func
interval_sN)__name__
__module____qualname__r   __annotations__float r   r   T/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/metrics_utils.pyr      s   
 r   c                   @   s   e Zd ZdZdddeeegdf  fddZede	j
fdd	Zd
d ZdefddZdedededdfddZdd Zdd ZdS )MetricsPusherz+Periodically runs registered asyncio tasks.N)async_sleepr   c                C   s&   |pt j| _t | _t | _d | _d S N)asynciosleep_async_sleepdict_tasks_async_tasks_stop_event)selfr   r   r   r   __init__   s   
zMetricsPusher.__init__returnc                 C   s   | j d u r
t | _ | j S r   )r    r   Eventr!   r   r   r   
stop_event&   s   

zMetricsPusher.stop_eventc                 C   s   | j   d S r   )r&   clearr%   r   r   r   start-      zMetricsPusher.startnamec              
      s   t | j }	 | rdS z	| j|   W n ty6 } zt	d| d|  W Y d}~nd}~ww t | 
| j| j}t j||gt jdI dH  | sX|  q
)zPeriodically runs `task_func` every `interval_s` until `stop_event` is set.

        If `task_func` raises an error, an exception will be logged.
        TNzFailed to run metrics task 'z': )return_when)r   create_taskr&   waitdoner   r   	Exceptionlogger	exceptionr   r   FIRST_COMPLETEDcancel)r!   r*   wait_for_stop_evente
sleep_taskr   r   r   metrics_task0   s*   "zMetricsPusher.metrics_taskr   r   c                 C   sF   t ||| j|< || jvs| j|  r!t| || j|< dS dS )zRegister a task under the provided name, or update it.

        This method is idempotent - if a task is already registered with
        the specified name, it will update it with the most recent info.
        N)r   r   r   r.   r   r,   r7   )r!   r*   r   r   r   r   r   register_or_update_taskK   s   z%MetricsPusher.register_or_update_taskc                 C   s"   | j   | j  | j  d S r   )r&   setr   r'   r   r%   r   r   r   
stop_tasks[   s   

zMetricsPusher.stop_tasksc                    sH   | j   | jrtjt| j tdI dH  | j	  | j	  dS )zkShutdown metrics pusher gracefully.

        This method will ensure idempotency of shutdown call.
        )timeoutN)
r&   r9   r   r   r-   listvaluesr   r   r'   r%   r   r   r   graceful_shutdown`   s   

zMetricsPusher.graceful_shutdown)r   r   r   __doc__r
   r   intr"   propertyr   r$   r&   r(   strr7   r8   r:   r>   r   r   r   r   r      s*    

r   T)orderc                   @   s(   e Zd ZU eed< eddZeed< dS )TimeStampedValue	timestampF)comparevalueN)r   r   r   r   r   r   rG   r   r   r   r   rD   q   s   
 rD   c                
   @   s   e Zd ZdZdd Zdeeef defddZdefd	d
Z	dedede
e fddZ	ddedededee fddZ	ddededefddZdS )InMemoryMetricsStorez-A very simple, in memory time series databasec                 C   s   t t| _d S r   )r   r<   datar%   r   r   r   r"   z   r)   zInMemoryMetricsStore.__init__data_pointsrE   c                 C   s0   |  D ]\}}tj| j| t||d qdS )aN  Push new data points to the store.

        Args:
            data_points: dictionary containing the metrics values. The
              key should uniquely identify this time series
              and to be used to perform aggregation.
            timestamp: the unix epoch timestamp the metrics are
              collected at.
        axN)itemsbisectinsortrI   rD   )r!   rJ   rE   r*   rG   r   r   r   add_metrics_point}   s   
z&InMemoryMetricsStore.add_metrics_pointstart_timestamp_sc                 C   sR   t | j D ]\}}t|dks|d j|k r| j|= q| ||| j|< qdS )a  Prune keys and compact data that are outdated.

        For keys that haven't had new data recorded after the timestamp,
        remove them from the database.
        For keys that have, compact the datapoints that were recorded
        before the timestamp.
        r   N)r<   rI   rN   lenrE   _get_datapoints)r!   rR   key
datapointsr   r   r   prune_keys_and_compact_data   s
   
z0InMemoryMetricsStore.prune_keys_and_compact_datarV   window_start_timestamp_sr#   c                 C   s,   | j | }tj|t|ddd}||d S )z<Get all data points given key after window_start_timestamp_sr   )rE   rG   rK   N)rI   rO   rD   )r!   rV   rY   rW   idxr   r   r   rU      s   
z$InMemoryMetricsStore._get_datapointsT
do_compactc                 C   sD   |  ||}|r|| j|< t|dkrdS tdd |D t| S )a  Perform a window average operation for metric `key`

        Args:
            key: the metric name.
            window_start_timestamp_s: the unix epoch timestamp for the
              start of the window. The computed average will use all datapoints
              from this timestamp until now.
            do_compact: whether or not to delete the datapoints that's
              before `window_start_timestamp_s` to save memory. Default is
              true.
        Returns:
            The average of all the datapoints for the key on and after time
            window_start_timestamp_s, or None if there are no such points.
        r   Nc                 s       | ]}|j V  qd S r   rG   .0pointr   r   r   	<genexpr>       z6InMemoryMetricsStore.window_average.<locals>.<genexpr>)rU   rI   rT   sumr!   rV   rY   r[   points_after_idxr   r   r   window_average   s   
z#InMemoryMetricsStore.window_averagec                 C   s0   |  ||}|r|| j|< tdd |D ddS )ap  Perform a max operation for metric `key`.

        Args:
            key: the metric name.
            window_start_timestamp_s: the unix epoch timestamp for the
              start of the window. The computed average will use all datapoints
              from this timestamp until now.
            do_compact: whether or not to delete the datapoints that's
              before `window_start_timestamp_s` to save memory. Default is
              true.
        Returns:
            Max value of the data points for the key on and after time
            window_start_timestamp_s, or None if there are no such points.
        c                 s   r\   r   r]   r^   r   r   r   ra      rb   z+InMemoryMetricsStore.max.<locals>.<genexpr>N)default)rU   rI   maxrd   r   r   r   rh      s   
zInMemoryMetricsStore.maxN)T)r   r   r   r?   r"   r   r   r   rQ   rX   r	   rU   boolr
   rf   rh   r   r   r   r   rH   w   s<    

rH   )r   rO   loggingcollectionsr   dataclassesr   r   typingr   r   r   r   r	   r
   ray.serve._private.constantsr   r   	getLoggerr0   r   r   rD   rH   r   r   r   r   <module>   s     
[