o
    `۷iX                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZmZmZ dZG d	d
 d
eZefdedee defddZefdedede	fddZdedefddZdS )    N)Counter)List
NamedTuple)	GcsClient)ClusterStatusStats)ClusterStatusParser)ClusterResourceStateGetClusterResourceStateReplyGetClusterStatusReply
   c                   @   s   e Zd ZU eed< eed< dS )ResourceRequest	resourceslabel_selectorN)__name__
__module____qualname__dict__annotations__ r   r   K/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/autoscaler/v2/sdk.pyr      s   
 r   gcs_address
to_requesttimeoutc                    s   t | dks
J dg }|D ]$}t|tsJ dt| |di }|di }|t|| q|}dd  t fdd	|D }g }g }	g }
| D ]\\}}}|t| |	t| |
| qNt	| j
||	|
|d
 dS )a@  Request resources from the autoscaler.

    This will add a cluster resource constraint to GCS. GCS will asynchronously
    pass the constraint to the autoscaler, and the autoscaler will try to provision the
    requested minimal bundles in `to_request`.

    If the cluster already has `to_request` resources, this will be an no-op.
    Future requests submitted through this API will overwrite the previous requests.

    Args:
        gcs_address: The GCS address to query.
        to_request: A list of resource requests to request the cluster to have.
            Each resource request is a tuple of resources and a label_selector
            to apply per-bundle. e.g.: [{"resources": {"CPU": 1, "GPU": 1}, "label_selector": {"accelerator-type": "A100"}}]
        timeout: Timeout in seconds for the request to be timeout

    r   GCS address is not specified.z)Internal Error: Expected a dict, but got r   r   c                 S   s   t | j t | j fS N)	frozensetr   itemsr   )rr   r   r   keyfunc;   s   z*request_cluster_resources.<locals>.keyfuncc                 3   s    | ]} |V  qd S r   r   ).0r   r   r   r   	<genexpr>A   s    z,request_cluster_resources.<locals>.<genexpr>	timeout_sN)len
isinstancer   typegetappendr   r   r   r   #request_cluster_resource_constraint)r   r   r   
normalizedr   r   selectorgrouped_requestsbundleslabel_selectorscountsbundlecountr   r!   r   request_cluster_resources   s0   
r3   returnc                 C   s^   t | dks
J dt }t| j|d}t }t }|| tj|t|| |ddS )z
    Get the cluster status from the autoscaler.

    Args:
        gcs_address: The GCS address to query.
        timeout: Timeout in seconds for the request to be timeout

    Returns:
        A ClusterStatus object.
    r   r   r#   )gcs_request_time_srequest_ts_s)stats)	r%   timer   get_cluster_statusr   ParseFromStringr   from_get_cluster_status_replyr   )r   r   req_time	str_reply
reply_timereplyr   r   r   r9   Q   s   
r9   
gcs_clientc                 C   s   |   }t }|| |jS )z
    Get the cluster resource state from GCS.
    Args:
        gcs_client: The GCS client to query.
    Returns:
        A ClusterResourceState object
    Raises:
        Exception: If the request times out or failed.
    )get_cluster_resource_stater
   r:   cluster_resource_state)r@   r=   r?   r   r   r   rA   l   s   

rA   )r8   collectionsr   typingr   r   ray._rayletr   ray.autoscaler.v2.schemar   r   ray.autoscaler.v2.utilsr   !ray.core.generated.autoscaler_pb2r	   r
   r   DEFAULT_RPC_TIMEOUT_Sr   strr   intr3   r9   rA   r   r   r   r   <module>   s4    
<
