o
    bi9                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
Z
 d dlmZmZ d dlmZmZmZmZ d dlmZ e eZd	eeef d
eeef deeef fddZdefdee defddZG dd dZdS )    N)Counter)reduce)DictList)PlacementGroupTableData)*AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE!AUTOSCALER_REPORT_PER_NODE_STATUS)	DictCountLoadMetricsSummaryNodeIPResourceDict)PlacementStrategydict1dict2returnc                 C   s2   |   }| D ]\}}|||d ||< q|S )zqAdd the values in two dictionaries.

    Returns:
        dict: A new dictionary (inputs remain unmodified).
    r   )copyitemsget)r   r   new_dictkv r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/_private/load_metrics.pyadd_resources   s   r   dictsc                    sR    du rdd  t  fdd| D }g }| D ]\}}||||f q|S )a$  Count a list of dictionaries (or unhashable types).

    This is somewhat annoying because mutable data structures aren't hashable,
    and set/dict keys must be hashable.

    Args:
        dicts (List[D]): A list of dictionaries to be counted.
        serializer (D -> S): A custom serialization function. The output type S
            must be hashable. The default serializer converts a dictionary into
            a frozenset of KV pairs.
        deserializer (S -> U): A custom deserialization function. See the
            serializer for information about type S. For dictionaries U := D.

    Returns:
        List[Tuple[U, int]]: Returns a list of tuples. Each entry in the list
            is a tuple containing a unique entry from `dicts` and its
            corresponding frequency count.
    Nc                 S   s   t |  S N)	frozensetr   )dr   r   r   <lambda>7   s    zfreq_of_dicts.<locals>.<lambda>c                 3   s    | ]} |V  qd S r   r   ).0r   
serializerr   r   	<genexpr>9   s    z freq_of_dicts.<locals>.<genexpr>)r   r   append)r   r!   deserializerfreqsas_listas_setcountr   r    r   freq_of_dicts#   s   r)   c                   @   s  e Zd ZdZdd Zdd Z				d4ded	ed
eeef deeef de	de
eee	f  de
eee	f  de
e defddZdd Zdd Zde
e fddZdd Zdeeef fddZdd  Zd!d" Zd5d$d%Zd&d' Zd(d) Zdefd*d+Zd,d- Zd.d/ Zd0d1 Zd2d3 ZdS )6LoadMetricszContainer for cluster load metrics.

    Metrics here are updated from raylet heartbeats. The autoscaler
    queries these metrics to determine when to scale up, and which nodes
    can be removed.
    c                 C   s@   i | _ i | _i | _i | _g | _g | _g | _g | _d| _i | _	d S )NF)
last_heartbeat_time_by_ipstatic_resources_by_ipdynamic_resources_by_ipraylet_id_by_ipwaiting_bundlesinfeasible_bundlespending_placement_groupsresource_requestscluster_full_of_actors_detectedray_nodes_last_used_time_by_ipselfr   r   r   __init__H   s   
zLoadMetrics.__init__c                 C   s
   t | jS )zA load metrics instance is Falsey iff the autoscaler process
        has not received a resource message from the GCS.
        )boolr.   r5   r   r   r   __bool__T   s   
zLoadMetrics.__bool__NFip	raylet_idstatic_resourcesdynamic_resourcesnode_idle_duration_sr/   r0   r1   r3   c
                 C   s   || j |< || j|< |	| _|sg }|sg }|sg }| }
| j |  D ]\}}||
vr0d|
|< q$|
| j|< t }|| | j|< || j|< || _	|| _
|| _d S )N        )r,   r.   r3   r   r   r-   timer4   r+   r/   r0   r1   )r6   r:   r;   r<   r=   r>   r/   r0   r1   r3   dynamic_resources_updateresource_namecapacitynowr   r   r   updateZ   s*   




zLoadMetrics.updatec                 C   s2   |d usJ dt d| t | j|< d S )NzIP should be known at this timez*Node {} is newly setup, treating as active)loggerdebugformatr@   r+   r6   r:   r   r   r   mark_active   s   zLoadMetrics.mark_activec                 C   s
   || j v S r   )r+   rI   r   r   r   	is_active   s   
zLoadMetrics.is_active
active_ipsc                    s^   t    fdd}|| jdd || jdd || jdd || jdd || jdd dS )a  The Raylet ips stored by LoadMetrics are obtained by polling
        the GCS in Monitor.update_load_metrics().

        On the other hand, the autoscaler gets a list of node ips from
        its NodeProvider.

        This method removes from LoadMetrics the ips unknown to the autoscaler.

        Args:
            active_ips (List[str]): The node ips known to the autoscaler.
        c                    sf   t |   }|D ]}|rtd| d | |= q|r)|r)tdt||  |t | @ r1J d S )NzLoadMetrics: Removed ip: .z7LoadMetrics: Removed {} stale ip mappings: {} not in {})setrF   inforH   len)mapping
should_logunwanted_ipsunwanted_iprL   r   r   prune   s   
z+LoadMetrics.prune_active_ips.<locals>.pruneT)rR   FN)rN   r4   r,   r.   r-   r+   )r6   rL   rV   r   rU   r   prune_active_ips   s   zLoadMetrics.prune_active_ipsc                 C   s
   | j  S )ad  Return a list of node resources (static resource sizes).

        Example:
            >>> from ray.autoscaler._private.load_metrics import LoadMetrics
            >>> metrics = LoadMetrics(...) # doctest: +SKIP
            >>> metrics.get_node_resources() # doctest: +SKIP
            [{"CPU": 1}, {"CPU": 4, "GPU": 8}]  # for two different nodes
        )r,   valuesr5   r   r   r   get_node_resources   s   
	zLoadMetrics.get_node_resourcesr   c                 C      | j S )ah  Return a dict of node resources for every node ip.

        Example:
            >>> from ray.autoscaler._private.load_metrics import LoadMetrics
            >>> metrics = LoadMetrics(...)  # doctest: +SKIP
            >>> metrics.get_static_node_resources_by_ip()  # doctest: +SKIP
            {127.0.0.1: {"CPU": 1}, 127.0.0.2: {"CPU": 4, "GPU": 8}}
        )r,   r5   r   r   r   get_static_node_resources_by_ip   s   	z+LoadMetrics.get_static_node_resources_by_ipc                 C   rZ   r   )r-   r5   r   r   r   get_resource_utilization      z$LoadMetrics.get_resource_utilizationc           	      C   s   i }i }| j  D ]9\}}| j| }| D ]+\}}|||  }||vr,d||< d||< ||  |7  < ||  |7  < td|}qq	||fS )Nr?   r   )r,   r   r-   max)	r6   resources_usedresources_totalr:   max_resourcesavail_resourcesresource_idamountusedr   r   r   _get_resource_usage   s   
	zLoadMetrics._get_resource_usageTc                 C   s,   |r| j d t | jd t  S | j | j S r   )r/   r   r0   )r6   clipr   r   r   get_resource_demand_vector   s   z&LoadMetrics.get_resource_demand_vectorc                 C   rZ   r   )r2   r5   r   r   r   get_resource_requests   r]   z!LoadMetrics.get_resource_requestsc                 C   rZ   r   )r1   r5   r   r   r   get_pending_placement_groups   r]   z(LoadMetrics.get_pending_placement_groupsc                 C   sp   | j rtt| j  ni }dt|dd}d|v r'|dt|d 7 }d|v r6|dt|d 7 }|S )zqReturn a concise string of cluster size to report to event logs.

        For example, "3 CPUs, 4 GPUs".
        z{} CPUsCPUr   GPUz	, {} GPUsTPUz	, {} TPUs)r,   r   r   rX   rH   intr   )r6   total_resourcesoutr   r   r   resources_avail_summary   s   z#LoadMetrics.resources_avail_summaryc                 C   sL  | j rtt| j  ni }| jrtt| j ni }i }|D ]%}|dv r5|| }|| }|| |f||< q|| }|||  |f||< qt| jdd}t|  }dd }	dd }
t|  |	|
d}t| j }d }t	ri }| j
 D ]&\}}| j |i }i ||< |
 D ]\}}|||d	 |f|| |< qqut||||||d
S )N)memoryobject_store_memoryF)rg   c                 S   s   t dd | jD }|| jfS )Nc                 s   s    | ]
}t |j V  qd S r   )r   unit_resourcesr   )r   bundler   r   r   r"     s    
zJLoadMetrics.summary.<locals>.placement_group_serializer.<locals>.<genexpr>)tuplebundlesstrategy)pgrw   r   r   r   placement_group_serializer  s   
z7LoadMetrics.summary.<locals>.placement_group_serializerc                 S   s*   t tt| d }t|t| d dS )Nr      )rw   rx   )listmapdictr)   r   Name)pg_tuplerw   r   r   r   placement_group_deserializer  s   z9LoadMetrics.summary.<locals>.placement_group_deserializer)r!   r$   r   )usageresource_demand	pg_demandrequest_demand
node_typesusage_by_node)r-   r   r   rX   r,   r)   rh   ri   rj   r   r   r   r
   )r6   available_resourcesro   
usage_dictkeytotal	availablesummarized_demand_vectorsummarized_resource_requestsrz   r   summarized_placement_groupsnodes_summaryr   r:   totalsresourcer   r   r   summary   s^   
zLoadMetrics.summaryc                 C   s.   |d urt |tsJ |dd |D | _d S )Nc                 S   s   g | ]
}t |d kr|qS )r   )rP   )r   requestr   r   r   
<listcomp>D  s    z5LoadMetrics.set_resource_requests.<locals>.<listcomp>)
isinstancer|   r2   )r6   requested_resourcesr   r   r   set_resource_requestsA  s
   z!LoadMetrics.set_resource_requestsc                 C   s$   dd dd t|   D  S )Nz - z
 - c                 S   s   g | ]
\}}d  ||qS )z{}: {})rH   )r   r   r   r   r   r   r   J  s    z+LoadMetrics.info_string.<locals>.<listcomp>)joinsorted_infor   r5   r   r   r   info_stringH  s   zLoadMetrics.info_stringc              	      s2  |   \t fdd| j D }fdd| j D }t| j dd dd d }fdd	|D }d
d  d fddtD d|rWt	t
|nd|rft	tt|t| nd|rot	t|ndd|r{t	t
|nd|rt	tt|t| nd|rt	t|nd|dS )Nc                       g | ]} | qS r   r   r   trD   r   r   r   Q      z%LoadMetrics._info.<locals>.<listcomp>c                    r   r   r   r   r   r   r   r   R  r   c                 S   s   | d S )Nr{   r   )pairr   r   r   r   T  s    z#LoadMetrics._info.<locals>.<lambda>)r      c                    s   i | ]	\}}| | qS r   r   )r   r:   r   r   r   r   
<dictcomp>V  s    z%LoadMetrics._info.<locals>.<dictcomp>c                 S   s&   | dv rd t|d dS t|dS )N)rs   rr   z{} GiBi   @   )rH   round)r   valuer   r   r   format_resourceX  s   
z*LoadMetrics._info.<locals>.format_resourcez, c              	      s8   g | ]}| d sd ||  || |qS )znode:z{}/{} {})
startswithrH   )r   rid)r   r`   r_   r   r   r   `  s    zMin={} Mean={} Max={})ResourceUsageNodeIdleSecondsTimeSinceLastHeartbeatMostDelayedHeartbeats)rf   r@   r4   rX   r+   r   r   r   rH   rn   minfloatsumrP   r^   )r6   
idle_timesheartbeat_timesmost_delayed_heartbeatsr   )r   rD   r`   r_   r   r   M  s<   zLoadMetrics._info)NNNF)T)__name__
__module____qualname____doc__r7   r9   strbytesr   r   r   r   r8   rE   rJ   rK   rW   rY   r   r   r[   r\   rf   rh   ri   rj   rq   r   r   r   r   r   r   r   r   r*   @   sT    

	

(#
Gr*   )loggingr@   collectionsr   	functoolsr   typingr   r   ray._private.gcs_utilsr   !ray.autoscaler._private.constantsr   r   ray.autoscaler._private.utilr	   r
   r   r   ray.core.generated.common_pb2r   	getLoggerr   rF   r   r   r   r~   r)   r*   r   r   r   r   <module>   s    
.