o
    `۷i                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z(m)Z*m+Z, d dl-m.Z.m/Z/m0Z0m1Z2 d dl3Z3d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m1Z1 d dl>m?Z? e@eAZBdZCdZDeEdZFG dd de#ZGe	dg dZHde1fddZIG dd dZJG d d! d!ZKG d"d# d#ZLG d$d% d%ZMG d&d' d'ejNZOdS )(    N)defaultdict
namedtuple)AnyDictListSetTupleUnion)MetricDescriptorType)ValueDouble)aggregationmeasure)CountAggregationDataDistributionAggregationDataLastValueAggregationDataSumAggregationData)StatsExporter)StatsRecorder)View)ViewManager)tag_keytag_map	tag_value)CounterMetricFamilyGaugeMetricFamilyHistogramMetricFamilyMetricbuild_address)env_bool)WORKER_ID_TAG_KEYMetricCardinality)	GcsClient)r   )_is_invalid_metric_nameRAY_WORKER_TIMEOUT_SCOREz[^a-zA-Z0-9]c                   @   sR   e Zd ZdZdee fddZedd Zedd Z	ed	d
 Z
edd ZdS )GaugezGauge representation of opencensus view.

    This class is used to collect process metrics from the reporter agent.
    Cpp metrics should be collected in a different way.
    tagsc                 C   sX   t |rtd| dt|||| _|| _dd |D }t|||| jt	 | _
d S )NzInvalid metric name: z. Metric will be discarded and data will not be collected or published. Metric names can only contain letters, numbers, _, and :. Metric names cannot start with numbers.c                 S   s   g | ]}t |qS  )tag_key_moduleTagKey).0tagr(   r(   P/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/_private/metrics_agent.py
<listcomp>J   s    z"Gauge.__init__.<locals>.<listcomp>)r#   
ValueErrormeasure_module
MeasureInt_measure_descriptionr   r   r   LastValueAggregation_view)selfnamedescriptionunitr'   r(   r(   r-   __init__@   s   

zGauge.__init__c                 C      | j S N)r2   r6   r(   r(   r-   r   O      zGauge.measurec                 C   r;   r<   )r5   r=   r(   r(   r-   viewS   r>   z
Gauge.viewc                 C   s   | j jS r<   )r   r7   r=   r(   r(   r-   r7   W   s   z
Gauge.namec                 C   r;   r<   )r3   r=   r(   r(   r-   r8   [   r>   zGauge.descriptionN)__name__
__module____qualname____doc__r   strr:   propertyr   r?   r7   r8   r(   r(   r(   r-   r&   9   s    


r&   Record)gaugevaluer'   metricc                 C   s|   | j jds	dS td| j j| j _| jD ]%}|jD ]}|dr:|j}|j	j
j}t|dkr:|d dkr:d|d< qqdS )a  
    Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
    by opencensus.stats.DistributionAggregationData.

    - metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
    `grpc.io/client/server_latency`[1]. However Prometheus metric names only take
    alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
    chars to underscore, like the official opencensus prometheus exporter[3].
    - distribution bucket bounds: The Metric proto asks distribution bucket bounds to
    be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
    This makes the `DistributionAggregationData` constructor to raise Exceptions. This
    applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
    bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
    we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.

    [1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units  # noqa: E501
    [2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
    [3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
    [4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
    zgrpc.io/N_distribution_valuer   gHz>)metric_descriptorr7   
startswithRE_NON_ALPHANUMSsub
timeseriespointsHasFieldrK   bucket_optionsexplicitboundslen)rI   seriespoint
dist_valuebucket_boundsr(   r(   r-   fix_grpc_metricc   s   



r[   c                	   @   s   e Zd Zdedededee fddZedd Zed	d
 Zedd Z	edd Z
edd Zdd ZdedefddZdefddZdS )OpencensusProxyMetricr7   descr9   
label_keysc                 C   s"   || _ || _|| _|| _i | _dS )z>Represents the OpenCensus metrics that will be proxy exported.N)_name_desc_unit_label_keys_data)r6   r7   r]   r9   r^   r(   r(   r-   r:      s
   
zOpencensusProxyMetric.__init__c                 C   r;   r<   )r_   r=   r(   r(   r-   r7      r>   zOpencensusProxyMetric.namec                 C   r;   r<   )r`   r=   r(   r(   r-   r]      r>   zOpencensusProxyMetric.descc                 C   r;   r<   )ra   r=   r(   r(   r-   r9      r>   zOpencensusProxyMetric.unitc                 C   r;   r<   )rb   r=   r(   r(   r-   r^      r>   z OpencensusProxyMetric.label_keysc                 C   r;   r<   rc   r=   r(   r(   r-   data   r>   zOpencensusProxyMetric.datac                 C   s&   t | jdkottt| j tS )z8Check if the metric is a distribution aggreation metric.r   )rV   rc   
isinstancenextitervaluesr   r=   r(   r(   r-    is_distribution_aggregation_data   s   z6OpencensusProxyMetric.is_distribution_aggregation_datalabel_valuesre   c                 C   s   || j |< dS )zAdd the data to the metric.

        Args:
            label_values: The label values of the metric.
            data: The data to be added.
        Nrd   )r6   rk   re   r(   r(   r-   add_data   s   zOpencensusProxyMetric.add_datarI   c           
      C   s   |j }t|dkrdS |D ]j}tdd |jD }|jD ]Z}|jjtjkr+t	|j
}nF|jjtjkr9tt|j}n8|jjtjkrGtt|j}n*|jjtjkrm|j}dd |jD }|jjj}	t|j|j |j|j||	}ntd|| j|< qqdS )zzParse the Opencensus Protobuf and store the data.

        The data can be accessed via `data` API once recorded.
        r   Nc                 s   s    | ]}|j V  qd S r<   rH   )r+   valr(   r(   r-   	<genexpr>   s    z/OpencensusProxyMetric.record.<locals>.<genexpr>c                 S      g | ]}|j qS r(   )count)r+   bucketr(   r(   r-   r.          z0OpencensusProxyMetric.record.<locals>.<listcomp>zSummary is not supported)rP   rV   tuplerk   rQ   rL   typer
   CUMULATIVE_INT64r   int64_valueCUMULATIVE_DOUBLEr   r   double_valueGAUGE_DOUBLEr   CUMULATIVE_DISTRIBUTIONrK   bucketsrS   rT   rU   r   sumrq   sum_of_squared_deviationr/   rc   )
r6   rI   rP   rW   labelsrX   re   rY   counts_per_bucketrZ   r(   r(   r-   record   sB   


zOpencensusProxyMetric.recordN)r@   rA   rB   rD   r   r:   rE   r7   r]   r9   r^   re   rj   r   r   rl   r   r   r(   r(   r(   r-   r\      s    




	r\   c                   @   sR   e Zd ZdefddZedeeef fddZedd Z	d	e
e fd
dZdS )	Componentidc                 C   s   || _ t | _i | _dS )zyRepresent a component that requests to proxy export metrics

        Args:
            id: Id of this component.
        N)r   time	monotonic_last_reported_time_metrics)r6   r   r(   r(   r-   r:      s   

zComponent.__init__returnc                 C   r;   )zAReturn the metrics requested to proxy export from this component.)r   r=   r(   r(   r-   metrics   s   zComponent.metricsc                 C   r;   r<   )r   r=   r(   r(   r-   last_reported_time   r>   zComponent.last_reported_timer   c                 C   sn   t  | _|D ]-}t| |j}|j}dd |jD }|| jvr,t||j	|j
|| j|< | j| | qdS )zParse the Opencensus protobuf and store metrics.

        Metrics can be accessed via `metrics` API for proxy export.

        Args:
            metrics: A list of Opencensus protobuf for proxy export.
        c                 S   rp   r(   )key)r+   	label_keyr(   r(   r-   r.     rs   z$Component.record.<locals>.<listcomp>N)r   r   r   r[   rL   r7   r^   r   r\   r8   r9   r   )r6   r   rI   
descriptorr7   r^   r(   r(   r-   r      s   


zComponent.recordN)r@   rA   rB   rD   r:   rE   r   r\   r   r   r   r   r   r(   r(   r(   r-   r      s    
r   c                   @   s   e Zd ZddedefddZd dee defd	d
Zdd Z	dededee dede
ej dedeeee f ddfddZdeeeeef  deeeef fddZdee dee fddZdd ZdS )!OpenCensusProxyCollector<   	namespacecomponent_timeout_sc                 C   s,   t  | _|| _|| _i | _tdd| _dS )a)  Prometheus collector implementation for opencensus proxy export.

        Prometheus collector requires to implement `collect` which is
        invoked whenever Prometheus queries the endpoint.

        The class is thread-safe.

        Args:
            namespace: Prometheus namespace.
        RAY_EXPORT_COUNTER_AS_GAUGETN)	threadingLock_components_lock_component_timeout_s
_namespace_componentsr   _export_counter_as_gauge)r6   r   r   r(   r(   r-   r:     s
   
z!OpenCensusProxyCollector.__init__Nr   worker_id_hexc                 C   s`   |st n|}| j || jvrt|| j|< | j| | W d   dS 1 s)w   Y  dS )a.  Record the metrics reported from the component that reports it.

        Args:
            metrics: A list of opencensus protobuf to proxy export metrics.
            worker_id_hex: A worker id that reports these metrics.
                If None, it means they are reported from Raylet or GCS.
        N)GLOBAL_COMPONENT_KEYr   r   r   r   )r6   r   r   r   r(   r(   r-   r   5  s   
"zOpenCensusProxyCollector.recordc                 C   s   | j @ g }g }| j D ]\}}t |j }|| jkr+|| t	d
|| q|D ]}|| j| q.|W  d   S 1 sFw   Y  dS )zClean up stale components.

        Stale means the component is dead or unresponsive.

        Stale components won't be reported to Prometheus anymore.
        zSMetrics from a worker ({}) is cleaned up due to timeout. Time since last report {}sN)r   r   itemsr   r   r   r   appendloggerinfoformatpop)r6   stale_componentsstale_component_idsr   	componentelapsedr(   r(   r-   clean_stale_componentsC  s    

$z/OpenCensusProxyCollector.clean_stale_componentsmetric_namemetric_descriptionr^   metric_unitsrk   agg_datametrics_mapr   c                 C   sh  | j  sJ | j d| }t|t|ksJ ||fdd |D }t|trK||}|s?t||||d}	|	g}|||< |d j||j	d dS t|t
r||}|set|||d}	|	g}|||< |d j||jd | jsu	 dS |d	r}	 dS t|d
krt|d| d| |d}	||	 t|dksJ |d
 j||jd dS t|tr|jt|jksJ g }
d}t|jD ]\}}||j| 7 }t||g}|
| q|
d|j	g ||}|st|||d}	|	g}|||< |d j||
|jd dS t|tr+||}|st|||d}	|	g}|||< |d j||jd dS tdt| )a  to_metric translate the data that OpenCensus create
        to Prometheus format, using Prometheus Metric object.

        This method is from Opencensus Prometheus Exporter.

        Args:
            metric_name: Name of the metric.
            metric_description: Description of the metric.
            label_keys: The fixed label keys of the metric.
            metric_units: Units of the metric.
            label_values: The values of `label_keys`.
            agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
                Aggregated data that needs to be converted as Prometheus samples
            metrics_map: The converted metric is added to this map.

        rJ   c                 S   s   g | ]}|r|nd qS ) r(   )r+   tvr(   r(   r-   r.   z  s    zBOpenCensusProxyCollector.to_prometheus_metrics.<locals>.<listcomp>)r7   documentationr9   r   r   )r   rH   N)r7   r   r   _total   z(DEPRECATED, use z_total metric instead)    z+Inf)r   r|   	sum_valuezunsupported aggregation type )r   lockedr   rV   rf   r   getr   
add_metric
count_datar   sum_datar   endswithr   r   r   rU   sorted	enumerater   rD   r   r}   r   rH   r/   ru   )r6   r   r   r^   r   rk   r   r   r   rI   r|   	cum_countiiboundrr   r(   r(   r-   to_prometheus_metricsZ  s   







z.OpenCensusProxyCollector.to_prometheus_metricsdatasc                 C   s   t |dksJ |d }t|trtttdd |D S t|tr-ttdd |D S t|tr>tttdd |D S tdt| dt dt dt d	| d
)Nr   c                 S   rp   r(   rm   r+   re   r(   r(   r-   r.     rs   zCOpenCensusProxyCollector._aggregate_metric_data.<locals>.<listcomp>c                 S   rp   r(   )r   r   r(   r(   r-   r.     rs   c                 S   rp   r(   )r   r   r(   r(   r-   r.     rs   zUnsupported aggregation type z. Supported types are z, z.Got .)	rV   rf   r   r   r}   r   r   r/   ru   )r6   r   sampler(   r(   r-   _aggregate_metric_data  s.   


z/OpenCensusProxyCollector._aggregate_metric_dataper_worker_metricsc           	      C   s   t t|d}|rt|jvr|S |jt}tt}|D ]}|j D ]\}}||d| ||d d   	| q#qt
|j|j|j|jd| |j|d d  d}| D ]\}}||| | qY|gS )a  Collect per-worker metrics, aggregate them into per-node metrics and convert
        them to Prometheus format.

        Args:
            per_worker_metrics: A list of per-worker metrics for the same metric name.
        Returns:
            A list of per-node metrics for the same metric name, with the high
            cardinality labels removed and the values aggregated.
        Nr   )r7   r]   r9   r^   )rg   rh   r    r^   indexr   listre   r   r   r\   r7   r]   r9   rl   r   )	r6   r   rI   worker_id_label_indexlabel_value_to_datark   re   aggregated_metricr   r(   r(   r-   '_aggregate_with_recommended_cardinality  s:   


z@OpenCensusProxyCollector._aggregate_with_recommended_cardinalityc                 c   s
   | j h g }tt}t }| j D ]!}|j D ]}|tjkr/|	 s/||j
 | q|| qq| D ]
}|| | q:i }|D ]}|j D ]\}}	| |j
|j|j|j||	| qPqIW d   n1 sow   Y  | D ]
}
|
D ]}|V  q|qxdS )a,  Collect fetches the statistics from OpenCensus
        and delivers them as Prometheus Metrics.
        Collect is invoked every time a prometheus.Gatherer is run
        for example when the HTTP endpoint is invoked by Prometheus.

        This method is required as a Prometheus Collector.
        N)r   r   r   r!   get_cardinality_levelr   ri   r   RECOMMENDEDrj   r7   r   extendr   re   r   r   r]   r^   r9   )r6   open_cencus_metricsto_lower_cardinalitycardinality_levelr   rI   r   prometheus_metrics_maprk   re   r   r(   r(   r-   collect9  sR   
.z OpenCensusProxyCollector.collect)r   r<   )r@   rA   rB   rD   intr:   r   r   r   r   r   tag_value_moduleTagValuer   r   PrometheusMetricr   r	   r   r   r   r   r\   r   r   r(   r(   r(   r-   r     sF    	
 

7r   c                   @   s   e Zd Z	ddededefddZddee fdd	Z	d
e
dedefddZddee defddZddee defddZdd ZdS )MetricsAgentNview_managerstats_recorderstats_exporterc                 C   sl   t  | _|| _|| _|| _d| _| jdu rd| _n| j| t| jj	j
tttdd| _t | _dS )a+  A class to record and export metrics.

        The class exports metrics in 2 different ways.
        - Directly record and export metrics using OpenCensus.
        - Proxy metrics from other core components
            (e.g., raylet, GCS, core workers).

        This class is thread-safe.
        Nx   )r   )r   r   _lockr   r   r   proxy_exporter_collectorregister_exporterr   optionsr   r   osgetenvr$   set_registered_views)r6   r   r   r   r(   r(   r-   r:   u  s   

zMetricsAgent.__init__recordsc                 C   s   |pi }| j W | js	 W d   dS |D ]?}|j}|j}|j}z| ||i || W q tyU } ztd|j	 d| d|d|d|
 W Y d}~qd}~ww W d   dS 1 saw   Y  dS )z7Directly record and export stats from the same process.NzFailed to record metric z with value z with tags z and global tags 	 due to: )
r   r   rG   rH   r'   _record_gauge	Exceptionr   errorr7   )r6   r   global_tagsr   rG   rH   r'   er(   r(   r-   record_and_export  s&   ""zMetricsAgent.record_and_exportrG   rH   r'   c                 C   s  |j | jvr| j|j | j|j  | j }t	 }|
 D ]W\}}zt|}W n tyI }	 ztd| d|j  d|	 |	d }	~	ww zt|}
W n  tyq }	 ztd| d| d|j  d|	 |	d }	~	ww |||
 q!||j| || d S )NzFailed to create tag key z for metric r   zFailed to create tag value z	 for key )r7   r   r   register_viewr?   addr   new_measurement_maptag_map_moduleTagMapr   r)   r*   r/   r   r   r   r   insertmeasure_float_putr   r   )r6   rG   rH   r'   measurement_mapr   r   tag_valr   r   r   r(   r(   r-   r     s6   
zMetricsAgent._record_gauger   r   c                 C   sN   | j  | js	 W d   dS W d   n1 sw   Y  | || dS )a{  Proxy export metrics specified by a Opencensus Protobuf.

        This API is used to export metrics emitted from
        core components.

        Args:
            metrics: A list of protobuf Metric defined from OpenCensus.
            worker_id_hex: The worker ID it proxies metrics export. None
                if the metric is not from a worker (i.e., raylet, GCS).
        N)r   r   _proxy_export_metricsr6   r   r   r(   r(   r-   proxy_export_metrics  s   z!MetricsAgent.proxy_export_metricsc                 C   s   | j || d S r<   )r   r   r   r(   r(   r-   r     s   z"MetricsAgent._proxy_export_metricsc                 C   sL   | j  | js	 W d   dS W d   n1 sw   Y  | j  dS )zClean dead worker's metrics.

        Worker metrics are cleaned up and won't be exported once
        it is considered as dead.

        This method has to be periodically called by a caller.
        N)r   r   r   r   r=   r(   r(   r-   clean_all_dead_worker_metrics  s   z*MetricsAgent.clean_all_dead_worker_metricsr<   )r@   rA   rB   r   r   r   r:   r   rF   r   r&   floatdictr   r   rD   r   r   r   r(   r(   r(   r-   r   t  s    
1r   c                       sP   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
  ZS ) PrometheusServiceDiscoveryWriteraF  A class to support Prometheus service discovery.

    It supports file-based service discovery. Checkout
    https://prometheus.io/docs/guides/file-sd/ for more details.

    Args:
        gcs_address: Gcs address for this cluster.
        temp_dir: Temporary directory used by
            Ray to store logs and metadata.
    c                    sV   t jjj|d ddd}|| _t jjj| || _d| _	g | _
t | _t   d S )NTF)allow_cluster_id_nilfetch_cluster_id_if_nil   )ray_rayletGcsClientOptionscreategcs_address_privatestate_initialize_global_statetemp_dir&default_service_discovery_flush_period latest_service_discovery_contentr   RLock_content_locksuperr:   )r6   r  r  gcs_client_options	__class__r(   r-   r:     s   
z)PrometheusServiceDiscoveryWriter.__init__c                 C   s0   | j  | jW  d   S 1 sw   Y  dS )z3Return the latest stored service discovery content.N)r  r	  r=   r(   r(   r-   $get_latest_service_discovery_content  s   $zEPrometheusServiceDiscoveryWriter.get_latest_service_discovery_contentc                 C   s   t  }dd |D }t| jd}|dd}|r!||d |dd}|r1||d dd	i|d
g}| j || _W d   n1 sJw   Y  t	
|S )z4Return the content for Prometheus service discovery.c                 S   s*   g | ]}|d  du rt |d |d qS )aliveTNodeManagerAddressMetricsExportPortr   )r+   noder(   r(   r-   r.     s
    zOPrometheusServiceDiscoveryWriter.get_file_discovery_content.<locals>.<listcomp>)addresss   AutoscalerMetricsAddressNzutf-8s   DashboardMetricsAddressjobr   )r   targets)r   nodesr"   r  internal_kv_getr   decoder  r	  jsondumps)r6   r  metrics_export_addresses
gcs_clientautoscaler_addrdashboard_addrcontentr(   r(   r-   get_file_discovery_content  s    
z;PrometheusServiceDiscoveryWriter.get_file_discovery_contentc                 C   sT   |   }t|d}||   W d    n1 sw   Y  t||   d S )Nw)get_temp_file_nameopenwriter"  r   replaceget_target_file_name)r6   temp_file_name	json_filer(   r(   r-   r&  *  s
   z&PrometheusServiceDiscoveryWriter.writec                 C   s   t j| jtjjjS r<   )r   pathjoinr  r   r  ray_constants!PROMETHEUS_SERVICE_DISCOVERY_FILEr=   r(   r(   r-   r(  7  s   z5PrometheusServiceDiscoveryWriter.get_target_file_namec                 C   s   t j| jddtjjjS )Nz{}_{}tmp)	r   r+  r,  r  r   r   r  r-  r.  r=   r(   r(   r-   r$  <  s   
z3PrometheusServiceDiscoveryWriter.get_temp_file_namec              
   C   st   	 z|    W n+ ty2 } ztd|   tt  td|  W Y d }~nd }~ww t	| j
 q)NTz,Writing a service discovery file, {},failed.zError message: )r&  r   r   warningr   r(  	traceback
format_excr   sleepr  )r6   r   r(   r(   r-   runD  s   
z$PrometheusServiceDiscoveryWriter.run)r@   rA   rB   rC   r:   r  r"  r&  r(  r$  r4  __classcell__r(   r(   r  r-   r     s    r   )Pr  loggingr   rer   r   r1  collectionsr   r   typingr   r   r   r   r   r	   +opencensus.metrics.export.metric_descriptorr
   opencensus.metrics.export.valuer   opencensus.statsr   r   r0   !opencensus.stats.aggregation_datar   r   r   r   opencensus.stats.base_exporterr   opencensus.stats.stats_recorderr   opencensus.stats.viewr   opencensus.stats.view_managerr   opencensus.tagsr   r)   r   r   r   r   prometheus_client.corer   r   r   r   r   r   ray._common.network_utilsr   ray._private.ray_constantsr   )ray._private.telemetry.metric_cardinalityr    r!   ray._rayletr"   ray.core.generated.metrics_pb2ray.util.metricsr#   	getLoggerr@   r   r$   r   compilerN   r&   rF   r[   r\   r   r   r   Threadr   r(   r(   r(   r-   <module>   sR     

'&^.  a 