o
    `۷i8                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
Z
 d dlmZmZ d dlmZmZmZmZ d dlmZ e eZd	eeef d
eeef deeef fddZdefdee defddZG dd dZdS )    N)Counter)reduce)DictList)PlacementGroupTableData)*AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE!AUTOSCALER_REPORT_PER_NODE_STATUS)	DictCountLoadMetricsSummaryNodeIPResourceDict)PlacementStrategydict1dict2returnc                 C   s2   |   }| D ]\}}|||d ||< q|S )zqAdd the values in two dictionaries.

    Returns:
        dict: A new dictionary (inputs remain unmodified).
    r   )copyitemsget)r   r   new_dictkv r   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/autoscaler/_private/load_metrics.pyadd_resources   s   r   dictsc                    sR    du rdd  t  fdd| D }g }| D ]\}}||||f q|S )a$  Count a list of dictionaries (or unhashable types).

    This is somewhat annoying because mutable data structures aren't hashable,
    and set/dict keys must be hashable.

    Args:
        dicts (List[D]): A list of dictionaries to be counted.
        serializer (D -> S): A custom serialization function. The output type S
            must be hashable. The default serializer converts a dictionary into
            a frozenset of KV pairs.
        deserializer (S -> U): A custom deserialization function. See the
            serializer for information about type S. For dictionaries U := D.

    Returns:
        List[Tuple[U, int]]: Returns a list of tuples. Each entry in the list
            is a tuple containing a unique entry from `dicts` and its
            corresponding frequency count.
    Nc                 S   s   t |  S N)	frozensetr   )dr   r   r   <lambda>7   s    zfreq_of_dicts.<locals>.<lambda>c                 3   s    | ]} |V  qd S r   r   ).0r   
serializerr   r   	<genexpr>9   s    z freq_of_dicts.<locals>.<genexpr>)r   r   append)r   r!   deserializerfreqsas_listas_setcountr   r    r   freq_of_dicts#   s   r)   c                   @   s  e Zd ZdZdd Zdd Z			d0deded	eeef d
eeef de	de
eee	f  de
eee	f  de
e fddZdd Zde
e fddZdd Zdeeef fddZdd Zdd Zd1d d!Zd"d# Zd$d% Zdefd&d'Zd(d) Zd*d+ Zd,d- Zd.d/ ZdS )2LoadMetricszContainer for cluster load metrics.

    Metrics here are updated from raylet heartbeats. The autoscaler
    queries these metrics to determine when to scale up, and which nodes
    can be removed.
    c                 C   s:   i | _ i | _i | _i | _g | _g | _g | _g | _i | _d S r   )	last_heartbeat_time_by_ipstatic_resources_by_ipdynamic_resources_by_ipnode_id_by_ipwaiting_bundlesinfeasible_bundlespending_placement_groupsresource_requestsray_nodes_last_used_time_by_ipselfr   r   r   __init__H   s   
zLoadMetrics.__init__c                 C   s
   t | jS )zA load metrics instance is Falsey iff the autoscaler process
        has not received a resource message from the GCS.
        )boolr.   r4   r   r   r   __bool__S   s   
zLoadMetrics.__bool__Nipnode_idstatic_resourcesdynamic_resourcesnode_idle_duration_sr/   r0   r1   c	                 C   s   || j |< || j|< |sg }|sg }|sg }| }	| j |  D ]\}
}|
|	vr-d|	|
< q!|	| j|< t }|| | j|< || j|< || _|| _	|| _
d S )N        )r,   r.   r   r   r-   timer3   r+   r/   r0   r1   )r5   r9   r:   r;   r<   r=   r/   r0   r1   dynamic_resources_updateresource_namecapacitynowr   r   r   updateY   s(   




zLoadMetrics.updatec                 C   s2   |d usJ dt d| t | j|< d S )NzIP should be known at this timez*Node {} is newly setup, treating as active)loggerdebugformatr?   r+   )r5   r9   r   r   r   mark_active   s   zLoadMetrics.mark_active
active_ipsc                    s^   t    fdd}|| jdd || jdd || jdd || jdd || jdd dS )a  The Raylet ips stored by LoadMetrics are obtained by polling
        the GCS in Monitor.update_load_metrics().

        On the other hand, the autoscaler gets a list of node ips from
        its NodeProvider.

        This method removes from LoadMetrics the ips unknown to the autoscaler.

        Args:
            active_ips (List[str]): The node ips known to the autoscaler.
        c                    sf   t |   }|D ]}|rtd| d | |= q|r)|r)tdt||  |t | @ r1J d S )NzLoadMetrics: Removed ip: .z7LoadMetrics: Removed {} stale ip mappings: {} not in {})setrE   inforG   len)mapping
should_logunwanted_ipsunwanted_iprI   r   r   prune   s   
z+LoadMetrics.prune_active_ips.<locals>.pruneT)rO   FN)rK   r3   r,   r.   r-   r+   )r5   rI   rS   r   rR   r   prune_active_ips   s   zLoadMetrics.prune_active_ipsc                 C   s
   | j  S )ad  Return a list of node resources (static resource sizes).

        Example:
            >>> from ray.autoscaler._private.load_metrics import LoadMetrics
            >>> metrics = LoadMetrics(...) # doctest: +SKIP
            >>> metrics.get_node_resources() # doctest: +SKIP
            [{"CPU": 1}, {"CPU": 4, "GPU": 8}]  # for two different nodes
        )r,   valuesr4   r   r   r   get_node_resources   s   
	zLoadMetrics.get_node_resourcesr   c                 C      | j S )ah  Return a dict of node resources for every node ip.

        Example:
            >>> from ray.autoscaler._private.load_metrics import LoadMetrics
            >>> metrics = LoadMetrics(...)  # doctest: +SKIP
            >>> metrics.get_static_node_resources_by_ip()  # doctest: +SKIP
            {127.0.0.1: {"CPU": 1}, 127.0.0.2: {"CPU": 4, "GPU": 8}}
        )r,   r4   r   r   r   get_static_node_resources_by_ip   s   	z+LoadMetrics.get_static_node_resources_by_ipc                 C   rW   r   )r-   r4   r   r   r   get_resource_utilization      z$LoadMetrics.get_resource_utilizationc           	      C   s   i }i }| j  D ]9\}}| j| }| D ]+\}}|||  }||vr,d||< d||< ||  |7  < ||  |7  < td|}qq	||fS )Nr>   r   )r,   r   r-   max)	r5   resources_usedresources_totalr9   max_resourcesavail_resourcesresource_idamountusedr   r   r   _get_resource_usage   s   
	zLoadMetrics._get_resource_usageTc                 C   s,   |r| j d t | jd t  S | j | j S r   )r/   r   r0   )r5   clipr   r   r   get_resource_demand_vector   s   z&LoadMetrics.get_resource_demand_vectorc                 C   rW   r   )r2   r4   r   r   r   get_resource_requests   rZ   z!LoadMetrics.get_resource_requestsc                 C   rW   r   )r1   r4   r   r   r   get_pending_placement_groups   rZ   z(LoadMetrics.get_pending_placement_groupsc                 C   sp   | j rtt| j  ni }dt|dd}d|v r'|dt|d 7 }d|v r6|dt|d 7 }|S )zqReturn a concise string of cluster size to report to event logs.

        For example, "3 CPUs, 4 GPUs".
        z{} CPUsCPUr   GPUz	, {} GPUsTPUz	, {} TPUs)r,   r   r   rU   rG   intr   )r5   total_resourcesoutr   r   r   resources_avail_summary   s   z#LoadMetrics.resources_avail_summaryc                 C   sL  | j rtt| j  ni }| jrtt| j ni }i }|D ]%}|dv r5|| }|| }|| |f||< q|| }|||  |f||< qt| jdd}t|  }dd }	dd }
t|  |	|
d}t| j }d }t	ri }| j
 D ]&\}}| j |i }i ||< |
 D ]\}}|||d	 |f|| |< qqut||||||d
S )N)memoryobject_store_memoryF)rd   c                 S   s   t dd | jD }|| jfS )Nc                 s   s    | ]
}t |j V  qd S r   )r   unit_resourcesr   )r   bundler   r   r   r"     s    
zJLoadMetrics.summary.<locals>.placement_group_serializer.<locals>.<genexpr>)tuplebundlesstrategy)pgrt   r   r   r   placement_group_serializer  s   
z7LoadMetrics.summary.<locals>.placement_group_serializerc                 S   s*   t tt| d }t|t| d dS )Nr      )rt   ru   )listmapdictr)   r   Name)pg_tuplert   r   r   r   placement_group_deserializer  s   z9LoadMetrics.summary.<locals>.placement_group_deserializer)r!   r$   r   )usageresource_demand	pg_demandrequest_demand
node_typesusage_by_node)r-   r   r   rU   r,   r)   re   rf   rg   r   r   r   r
   )r5   available_resourcesrl   
usage_dictkeytotal	availablesummarized_demand_vectorsummarized_resource_requestsrw   r~   summarized_placement_groupsnodes_summaryr   r9   totalsresourcer   r   r   summary   s^   
zLoadMetrics.summaryc                 C   s.   |d urt |tsJ |dd |D | _d S )Nc                 S   s   g | ]
}t |d kr|qS )r   )rM   )r   requestr   r   r   
<listcomp>>  s    z5LoadMetrics.set_resource_requests.<locals>.<listcomp>)
isinstancery   r2   )r5   requested_resourcesr   r   r   set_resource_requests;  s
   z!LoadMetrics.set_resource_requestsc                 C   s$   dd dd t|   D  S )Nz - z
 - c                 S   s   g | ]
\}}d  ||qS )z{}: {})rG   )r   r   r   r   r   r   r   D  s    z+LoadMetrics.info_string.<locals>.<listcomp>)joinsorted_infor   r4   r   r   r   info_stringB  s   zLoadMetrics.info_stringc              	      s2  |   \t fdd| j D }fdd| j D }t| j dd dd d }fdd	|D }d
d  d fddtD d|rWt	t
|nd|rft	tt|t| nd|rot	t|ndd|r{t	t
|nd|rt	tt|t| nd|rt	t|nd|dS )Nc                       g | ]} | qS r   r   r   trC   r   r   r   K      z%LoadMetrics._info.<locals>.<listcomp>c                    r   r   r   r   r   r   r   r   L  r   c                 S   s   | d S )Nrx   r   )pairr   r   r   r   N  s    z#LoadMetrics._info.<locals>.<lambda>)r      c                    s   i | ]	\}}| | qS r   r   )r   r9   r   r   r   r   
<dictcomp>P  s    z%LoadMetrics._info.<locals>.<dictcomp>c                 S   s&   | dv rd t|d dS t|dS )N)rp   ro   z{} GiBi   @   )rG   round)r   valuer   r   r   format_resourceR  s   
z*LoadMetrics._info.<locals>.format_resourcez, c              	      s8   g | ]}| d sd ||  || |qS )znode:z{}/{} {})
startswithrG   )r   rid)r   r]   r\   r   r   r   Z  s    zMin={} Mean={} Max={})ResourceUsageNodeIdleSecondsTimeSinceLastHeartbeatMostDelayedHeartbeats)rc   r?   r3   rU   r+   r   r   r   rG   rk   minfloatsumrM   r[   )r5   
idle_timesheartbeat_timesmost_delayed_heartbeatsr   )r   rC   r]   r\   r   r   G  s<   zLoadMetrics._info)NNN)T)__name__
__module____qualname____doc__r6   r8   strbytesr   r   r   r   rD   rH   rT   rV   r   r   rX   rY   rc   re   rf   rg   rn   r   r   r   r   r   r   r   r   r*   @   sL    

	
&#
Gr*   )loggingr?   collectionsr   	functoolsr   typingr   r   ray._private.gcs_utilsr   !ray.autoscaler._private.constantsr   r   ray.autoscaler._private.utilr	   r
   r   r   ray.core.generated.common_pb2r   	getLoggerr   rE   r   r   r   r{   r)   r*   r   r   r   r   <module>   s    
.