o
    bi:                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZ d dlmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z*m+Z,m-Z. d dl/m0Z0m1Z1m2Z2m3Z4 d dl5Z5d dl6m7Z7m8Z8 d dl9m:Z: d dl;m3Z3 d dl<m=Z= e>e?Z@dZAdZBeCdZDdZEG dd de%ZFe	dg dZGde3fddZHG dd deIeZJG d d! d!ZKG d"d# d#ZLG d$d% d%ZMG d&d' d'ZNG d(d) d)ejOZPdS )*    N)defaultdict
namedtuple)Enum)AnyDictListSetTupleUnion)MetricDescriptorType)ValueDouble)aggregationmeasure)CountAggregationDataDistributionAggregationDataLastValueAggregationDataSumAggregationData)StatsExporter)StatsRecorder)View)ViewManager)tag_keytag_map	tag_value)CounterMetricFamilyGaugeMetricFamilyHistogramMetricFamilyMetric)RAY_METRIC_CARDINALITY_LEVELenv_bool)	GcsClient)r   )_is_invalid_metric_nameRAY_WORKER_TIMEOUT_SCOREz[^a-zA-Z0-9]WorkerIdc                   @   sR   e Zd ZdZdee fddZedd Zedd Z	ed	d
 Z
edd ZdS )GaugezGauge representation of opencensus view.

    This class is used to collect process metrics from the reporter agent.
    Cpp metrics should be collected in a different way.
    tagsc                 C   sX   t |rtd| dt|||| _|| _dd |D }t|||| jt	 | _
d S )NzInvalid metric name: z. Metric will be discarded and data will not be collected or published. Metric names can only contain letters, numbers, _, and :. Metric names cannot start with numbers.c                 S   s   g | ]}t |qS  )tag_key_moduleTagKey).0tagr'   r'   N/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/metrics_agent.py
<listcomp>H   s    z"Gauge.__init__.<locals>.<listcomp>)r!   
ValueErrormeasure_module
MeasureInt_measure_descriptionr   r   r   LastValueAggregation_view)selfnamedescriptionunitr&   r'   r'   r,   __init__>   s   

zGauge.__init__c                 C      | j S N)r1   r5   r'   r'   r,   r   M      zGauge.measurec                 C   r:   r;   )r4   r<   r'   r'   r,   viewQ   r=   z
Gauge.viewc                 C   s   | j jS r;   )r   r6   r<   r'   r'   r,   r6   U   s   z
Gauge.namec                 C   r:   r;   )r2   r<   r'   r'   r,   r7   Y   r=   zGauge.descriptionN)__name__
__module____qualname____doc__r   strr9   propertyr   r>   r6   r7   r'   r'   r'   r,   r%   7   s    


r%   Record)gaugevaluer&   metricc                 C   s|   | j jds	dS td| j j| j _| jD ]%}|jD ]}|dr:|j}|j	j
j}t|dkr:|d dkr:d|d< qqdS )a  
    Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
    by opencensus.stats.DistributionAggregationData.

    - metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
    `grpc.io/client/server_latency`[1]. However Prometheus metric names only take
    alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
    chars to underscore, like the official opencensus prometheus exporter[3].
    - distribution bucket bounds: The Metric proto asks distribution bucket bounds to
    be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
    This makes the `DistributionAggregationData` constructor to raise Exceptions. This
    applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
    bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
    we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.

    [1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units  # noqa: E501
    [2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
    [3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
    [4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
    zgrpc.io/N_distribution_valuer   gHz>)metric_descriptorr6   
startswithRE_NON_ALPHANUMSsub
timeseriespointsHasFieldrJ   bucket_optionsexplicitboundslen)rH   seriespoint
dist_valuebucket_boundsr'   r'   r,   fix_grpc_metrica   s   



rZ   c                   @   s   e Zd ZdZdZdZdS )MetricCardinalityLevelzCardinality level of the metric.

    This is used to determine the cardinality level of the metric.
    The cardinality level is used to determine the type of the metric.
    legacyrecommendedN)r?   r@   rA   rB   LEGACYRECOMMENDEDr'   r'   r'   r,   r[      s    r[   c                	   @   s   e Zd Zdedededee fddZedd Zed	d
 Zedd Z	edd Z
edd Zdd ZdedefddZdefddZdS )OpencensusProxyMetricr6   descr8   
label_keysc                 C   s"   || _ || _|| _|| _i | _dS )z>Represents the OpenCensus metrics that will be proxy exported.N)_name_desc_unit_label_keys_data)r5   r6   ra   r8   rb   r'   r'   r,   r9      s
   
zOpencensusProxyMetric.__init__c                 C   r:   r;   )rc   r<   r'   r'   r,   r6      r=   zOpencensusProxyMetric.namec                 C   r:   r;   )rd   r<   r'   r'   r,   ra      r=   zOpencensusProxyMetric.descc                 C   r:   r;   )re   r<   r'   r'   r,   r8      r=   zOpencensusProxyMetric.unitc                 C   r:   r;   )rf   r<   r'   r'   r,   rb      r=   z OpencensusProxyMetric.label_keysc                 C   r:   r;   rg   r<   r'   r'   r,   data   r=   zOpencensusProxyMetric.datac                 C   s&   t | jdkottt| j tS )z8Check if the metric is a distribution aggreation metric.r   )rU   rg   
isinstancenextitervaluesr   r<   r'   r'   r,    is_distribution_aggregation_data   s   z6OpencensusProxyMetric.is_distribution_aggregation_datalabel_valuesri   c                 C   s   || j |< dS )zAdd the data to the metric.

        Args:
            label_values: The label values of the metric.
            data: The data to be added.
        Nrh   )r5   ro   ri   r'   r'   r,   add_data   s   zOpencensusProxyMetric.add_datarH   c           
      C   s   |j }t|dkrdS |D ]j}tdd |jD }|jD ]Z}|jjtjkr+t	|j
}nF|jjtjkr9tt|j}n8|jjtjkrGtt|j}n*|jjtjkrm|j}dd |jD }|jjj}	t|j|j |j|j||	}ntd|| j|< qqdS )zzParse the Opencensus Protobuf and store the data.

        The data can be accessed via `data` API once recorded.
        r   Nc                 s   s    | ]}|j V  qd S r;   rG   )r*   valr'   r'   r,   	<genexpr>   s    z/OpencensusProxyMetric.record.<locals>.<genexpr>c                 S      g | ]}|j qS r'   )count)r*   bucketr'   r'   r,   r-          z0OpencensusProxyMetric.record.<locals>.<listcomp>zSummary is not supported)rO   rU   tuplero   rP   rK   typer   CUMULATIVE_INT64r   int64_valueCUMULATIVE_DOUBLEr   r   double_valueGAUGE_DOUBLEr   CUMULATIVE_DISTRIBUTIONrJ   bucketsrR   rS   rT   r   sumru   sum_of_squared_deviationr.   rg   )
r5   rH   rO   rV   labelsrW   ri   rX   counts_per_bucketrY   r'   r'   r,   record   sB   


zOpencensusProxyMetric.recordN)r?   r@   rA   rC   r   r9   rD   r6   ra   r8   rb   ri   rn   r	   r   rp   r   r   r'   r'   r'   r,   r`      s    




	r`   c                   @   sR   e Zd ZdefddZedeeef fddZedd Z	d	e
e fd
dZdS )	Componentidc                 C   s   || _ t | _i | _dS )zyRepresent a component that requests to proxy export metrics

        Args:
            id: Id of this component.
        N)r   time	monotonic_last_reported_time_metrics)r5   r   r'   r'   r,   r9      s   

zComponent.__init__returnc                 C   r:   )zAReturn the metrics requested to proxy export from this component.)r   r<   r'   r'   r,   metrics   s   zComponent.metricsc                 C   r:   r;   )r   r<   r'   r'   r,   last_reported_time  r=   zComponent.last_reported_timer   c                 C   sn   t  | _|D ]-}t| |j}|j}dd |jD }|| jvr,t||j	|j
|| j|< | j| | qdS )zParse the Opencensus protobuf and store metrics.

        Metrics can be accessed via `metrics` API for proxy export.

        Args:
            metrics: A list of Opencensus protobuf for proxy export.
        c                 S   rt   r'   )key)r*   	label_keyr'   r'   r,   r-     rw   z$Component.record.<locals>.<listcomp>N)r   r   r   rZ   rK   r6   rb   r   r`   r7   r8   r   )r5   r   rH   
descriptorr6   rb   r'   r'   r,   r     s   


zComponent.recordN)r?   r@   rA   rC   r9   rD   r   r`   r   r   r   r   r   r'   r'   r'   r,   r      s    
r   c                   @   s   e Zd Zd#dedefddZd$dee defd	d
Zdd Z	dededee dede
ej dedeeee f ddfddZdefddZdefddZdeeeeef  deeeef fddZdee dee fdd Zd!d" ZdS )%OpenCensusProxyCollector<   	namespacecomponent_timeout_sc                 C   s,   t  | _|| _|| _i | _tdd| _dS )a)  Prometheus collector implementation for opencensus proxy export.

        Prometheus collector requires to implement `collect` which is
        invoked whenever Prometheus queries the endpoint.

        The class is thread-safe.

        Args:
            namespace: Prometheus namespace.
        RAY_EXPORT_COUNTER_AS_GAUGETN)	threadingLock_components_lock_component_timeout_s
_namespace_componentsr   _export_counter_as_gauge)r5   r   r   r'   r'   r,   r9     s
   
z!OpenCensusProxyCollector.__init__Nr   worker_id_hexc                 C   s`   |st n|}| j || jvrt|| j|< | j| | W d   dS 1 s)w   Y  dS )a.  Record the metrics reported from the component that reports it.

        Args:
            metrics: A list of opencensus protobuf to proxy export metrics.
            worker_id_hex: A worker id that reports these metrics.
                If None, it means they are reported from Raylet or GCS.
        N)GLOBAL_COMPONENT_KEYr   r   r   r   )r5   r   r   r   r'   r'   r,   r   >  s   
"zOpenCensusProxyCollector.recordc                 C   s   | j @ g }g }| j D ]\}}t |j }|| jkr+|| t	d
|| q|D ]}|| j| q.|W  d   S 1 sFw   Y  dS )zClean up stale components.

        Stale means the component is dead or unresponsive.

        Stale components won't be reported to Prometheus anymore.
        zSMetrics from a worker ({}) is cleaned up due to timeout. Time since last report {}sN)r   r   itemsr   r   r   r   appendloggerinfoformatpop)r5   stale_componentsstale_component_idsr   	componentelapsedr'   r'   r,   clean_stale_componentsL  s    

$z/OpenCensusProxyCollector.clean_stale_componentsmetric_namemetric_descriptionrb   metric_unitsro   agg_datametrics_mapr   c                 C   sh  | j  sJ | j d| }t|t|ksJ ||fdd |D }t|trK||}|s?t||||d}	|	g}|||< |d j||j	d dS t|t
r||}|set|||d}	|	g}|||< |d j||jd | jsu	 dS |d	r}	 dS t|d
krt|d| d| |d}	||	 t|dksJ |d
 j||jd dS t|tr|jt|jksJ g }
d}t|jD ]\}}||j| 7 }t||g}|
| q|
d|j	g ||}|st|||d}	|	g}|||< |d j||
|jd dS t|tr+||}|st|||d}	|	g}|||< |d j||jd dS tdt| )a  to_metric translate the data that OpenCensus create
        to Prometheus format, using Prometheus Metric object.

        This method is from Opencensus Prometheus Exporter.

        Args:
            metric_name: Name of the metric.
            metric_description: Description of the metric.
            label_keys: The fixed label keys of the metric.
            metric_units: Units of the metric.
            label_values: The values of `label_keys`.
            agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
                Aggregated data that needs to be converted as Prometheus samples
            metrics_map: The converted metric is added to this map.

        rI   c                 S   s   g | ]}|r|nd qS ) r'   )r*   tvr'   r'   r,   r-     s    zBOpenCensusProxyCollector.to_prometheus_metrics.<locals>.<listcomp>)r6   documentationr8   r   r   )r   rG   N)r6   r   r   _total   z(DEPRECATED, use z_total metric instead)    z+Inf)r   r   	sum_valuezunsupported aggregation type )r   lockedr   rU   rj   r   getr   
add_metric
count_datar   sum_datar   endswithr   r   r   rT   sorted	enumerater   rC   r   r   r   rG   r.   ry   )r5   r   r   rb   r   ro   r   r   r   rH   r   	cum_countiiboundrv   r'   r'   r,   to_prometheus_metricsc  s   







z.OpenCensusProxyCollector.to_prometheus_metricsc                 C   s   t  S r;   )r   lowerr<   r'   r'   r,   %_get_metric_cardinality_level_setting  s   z>OpenCensusProxyCollector._get_metric_cardinality_level_settingc                 C   s(   zt |  W S  ty   t j Y S w )zGet the cardinality level of the core metric.

        This is used to determine set of metric labels. Some high cardinality labels
        such as `WorkerId` and `Name` will be removed on low cardinality level.
        )r[   r   r.   r^   r<   r'   r'   r,   _get_metric_cardinality_level  s
   
z6OpenCensusProxyCollector._get_metric_cardinality_leveldatasc                 C   s   t |dksJ |d }t|trtttdd |D S t|tr-ttdd |D S t|tr>tttdd |D S tdt| dt dt dt d	| d
)Nr   c                 S   rt   r'   rq   r*   ri   r'   r'   r,   r-   	  rw   zCOpenCensusProxyCollector._aggregate_metric_data.<locals>.<listcomp>c                 S   rt   r'   )r   r   r'   r'   r,   r-     rw   c                 S   rt   r'   )r   r   r'   r'   r,   r-     rw   zUnsupported aggregation type z. Supported types are z, z.Got .)	rU   rj   r   r   r   r   r   r.   ry   )r5   r   sampler'   r'   r,   _aggregate_metric_data  s.   


z/OpenCensusProxyCollector._aggregate_metric_dataper_worker_metricsc           	      C   s   t t|d}|rt|jvr|S |jt}tt}|D ]}|j D ]\}}||d| ||d d   	| q#qt
|j|j|j|jd| |j|d d  d}| D ]\}}||| | qY|gS )a  Collect per-worker metrics, aggregate them into per-node metrics and convert
        them to Prometheus format.

        Args:
            per_worker_metrics: A list of per-worker metrics for the same metric name.
        Returns:
            A list of per-node metrics for the same metric name, with the high
            cardinality labels removed and the values aggregated.
        Nr   )r6   ra   r8   rb   )rk   rl   WORKER_ID_TAG_KEYrb   indexr   listri   r   r   r`   r6   ra   r8   rp   r   )	r5   r   rH   worker_id_label_indexlabel_value_to_dataro   ri   aggregated_metricr   r'   r'   r,   '_aggregate_with_recommended_cardinality  s:   


z@OpenCensusProxyCollector._aggregate_with_recommended_cardinalityc                 c   s
   | j h g }tt}|  }| j D ]!}|j D ]}|tjkr/|	 s/||j
 | q|| qq| D ]
}|| | q:i }|D ]}|j D ]\}}	| |j
|j|j|j||	| qPqIW d   n1 sow   Y  | D ]
}
|
D ]}|V  q|qxdS )a,  Collect fetches the statistics from OpenCensus
        and delivers them as Prometheus Metrics.
        Collect is invoked every time a prometheus.Gatherer is run
        for example when the HTTP endpoint is invoked by Prometheus.

        This method is required as a Prometheus Collector.
        N)r   r   r   r   r   rm   r   r[   r_   rn   r6   r   extendr   ri   r   r   ra   rb   r8   )r5   open_cencus_metricsto_lower_cardinalitycardinality_levelr   rH   r   prometheus_metrics_mapro   ri   r   r'   r'   r,   collectP  sR   
.z OpenCensusProxyCollector.collect)r   r;   )r?   r@   rA   rC   intr9   r   r   r   r   r	   tag_value_moduleTagValuer   r   PrometheusMetricr   r   r[   r   r
   r   r   r   r   r`   r   r   r'   r'   r'   r,   r     sJ    	
 

7r   c                   @   s   e Zd Z	ddededefddZddee fdd	Z	d
e
dedefddZddee defddZddee defddZdd ZdS )MetricsAgentNview_managerstats_recorderstats_exporterc                 C   sl   t  | _|| _|| _|| _d| _| jdu rd| _n| j| t| jj	j
tttdd| _t | _dS )a+  A class to record and export metrics.

        The class exports metrics in 2 different ways.
        - Directly record and export metrics using OpenCensus.
        - Proxy metrics from other core components
            (e.g., raylet, GCS, core workers).

        This class is thread-safe.
        Nx   )r   )r   r   _lockr   r   r   proxy_exporter_collectorregister_exporterr   optionsr   r   osgetenvr"   set_registered_views)r5   r   r   r   r'   r'   r,   r9     s   

zMetricsAgent.__init__recordsc                 C   s   |pi }| j W | js	 W d   dS |D ]?}|j}|j}|j}z| ||i || W q tyU } ztd|j	 d| d|d|d|
 W Y d}~qd}~ww W d   dS 1 saw   Y  dS )z7Directly record and export stats from the same process.NzFailed to record metric z with value z with tags z and global tags 	 due to: )
r   r   rF   rG   r&   _record_gauge	Exceptionr   errorr6   )r5   r   global_tagsr   rF   rG   r&   er'   r'   r,   record_and_export  s&   ""zMetricsAgent.record_and_exportrF   rG   r&   c                 C   s  |j | jvr| j|j | j|j  | j }t	 }|
 D ]W\}}zt|}W n tyI }	 ztd| d|j  d|	 |	d }	~	ww zt|}
W n  tyq }	 ztd| d| d|j  d|	 |	d }	~	ww |||
 q!||j| || d S )NzFailed to create tag key z for metric r   zFailed to create tag value z	 for key )r6   r   r   register_viewr>   addr   new_measurement_maptag_map_moduleTagMapr   r(   r)   r.   r   r   r   r   insertmeasure_float_putr   r   )r5   rF   rG   r&   measurement_mapr   r   tag_valr   r   r   r'   r'   r,   r     s6   
zMetricsAgent._record_gauger   r   c                 C   sN   | j  | js	 W d   dS W d   n1 sw   Y  | || dS )a{  Proxy export metrics specified by a Opencensus Protobuf.

        This API is used to export metrics emitted from
        core components.

        Args:
            metrics: A list of protobuf Metric defined from OpenCensus.
            worker_id_hex: The worker ID it proxies metrics export. None
                if the metric is not from a worker (i.e., raylet, GCS).
        N)r   r   _proxy_export_metricsr5   r   r   r'   r'   r,   proxy_export_metrics  s   z!MetricsAgent.proxy_export_metricsc                 C   s   | j || d S r;   )r   r   r   r'   r'   r,   r     s   z"MetricsAgent._proxy_export_metricsc                 C   sL   | j  | js	 W d   dS W d   n1 sw   Y  | j  dS )zClean dead worker's metrics.

        Worker metrics are cleaned up and won't be exported once
        it is considered as dead.

        This method has to be periodically called by a caller.
        N)r   r   r   r   r<   r'   r'   r,   clean_all_dead_worker_metrics  s   z*MetricsAgent.clean_all_dead_worker_metricsr;   )r?   r@   rA   r   r   r   r9   r   rE   r   r%   floatdictr   r   rC   r   r   r   r'   r'   r'   r,   r     s    
1r   c                       sH   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	  Z
S ) PrometheusServiceDiscoveryWriteraF  A class to support Prometheus service discovery.

    It supports file-based service discovery. Checkout
    https://prometheus.io/docs/guides/file-sd/ for more details.

    Args:
        gcs_address: Gcs address for this cluster.
        temp_dir: Temporary directory used by
            Ray to store logs and metadata.
    c                    sF   t jjj|d ddd}|| _t jjj| || _d| _	t
   d S )NTF)allow_cluster_id_nilfetch_cluster_id_if_nil   )ray_rayletGcsClientOptionscreategcs_address_privatestate_initialize_global_statetemp_dir&default_service_discovery_flush_periodsuperr9   )r5   r	  r  gcs_client_options	__class__r'   r,   r9     s   z)PrometheusServiceDiscoveryWriter.__init__c                 C   sx   t  }dd |D }t| jd}|dd}|r!||d |dd}|r1||d tdd	i|d
gS )z4Return the content for Prometheus service discovery.c                 S   s,   g | ]}|d  du rd |d |d qS )aliveTz{}:{}NodeManagerAddressMetricsExportPort)r   )r*   noder'   r'   r,   r-   &  s
    zOPrometheusServiceDiscoveryWriter.get_file_discovery_content.<locals>.<listcomp>)addresss   AutoscalerMetricsAddressNzutf-8s   DashboardMetricsAddressjobr  )r   targets)	r  nodesr    r	  internal_kv_getr   decodejsondumps)r5   r  metrics_export_addresses
gcs_clientautoscaler_addrdashboard_addrr'   r'   r,   get_file_discovery_content#  s   z;PrometheusServiceDiscoveryWriter.get_file_discovery_contentc                 C   sT   |   }t|d}||   W d    n1 sw   Y  t||   d S )Nw)get_temp_file_nameopenwriter#  r   replaceget_target_file_name)r5   temp_file_name	json_filer'   r'   r,   r'  6  s
   z&PrometheusServiceDiscoveryWriter.writec                 C   s   t j| jtjjjS r;   )r   pathjoinr  r  r
  ray_constants!PROMETHEUS_SERVICE_DISCOVERY_FILEr<   r'   r'   r,   r)  C  s   z5PrometheusServiceDiscoveryWriter.get_target_file_namec                 C   s   t j| jddtjjjS )Nz{}_{}tmp)	r   r,  r-  r  r   r  r
  r.  r/  r<   r'   r'   r,   r%  H  s   
z3PrometheusServiceDiscoveryWriter.get_temp_file_namec              
   C   st   	 z|    W n+ ty2 } ztd|   tt  td|  W Y d }~nd }~ww t	| j
 q)NTz,Writing a service discovery file, {},failed.zError message: )r'  r   r   warningr   r)  	traceback
format_excr   sleepr  )r5   r   r'   r'   r,   runP  s   
z$PrometheusServiceDiscoveryWriter.run)r?   r@   rA   rB   r9   r#  r'  r)  r%  r5  __classcell__r'   r'   r  r,   r    s    r  )Qr  loggingr   rer   r   r2  collectionsr   r   enumr   typingr   r   r   r   r	   r
   +opencensus.metrics.export.metric_descriptorr   opencensus.metrics.export.valuer   opencensus.statsr   r   r/   !opencensus.stats.aggregation_datar   r   r   r   opencensus.stats.base_exporterr   opencensus.stats.stats_recorderr   opencensus.stats.viewr   opencensus.stats.view_managerr   opencensus.tagsr   r(   r   r   r   r   prometheus_client.corer   r   r   r   r   r  ray._private.ray_constantsr   r   ray._rayletr    ray.core.generated.metrics_pb2ray.util.metricsr!   	getLoggerr?   r   r"   r   compilerM   r   r%   rE   rZ   rC   r[   r`   r   r   r   Threadr  r'   r'   r'   r,   <module>   sT     

'&^.  o 