o
    bi}!                     @   s@  d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 e 2e3Z4G dd dZ5dS )    N)Queue)ListOptional)	GcsClient)_get_node_provider)AutoscalerEventLogger)KubeRayProvider)ReadOnlyProvider)AutoscalingConfigIConfigReaderProvider)InstanceManagerInstanceUpdatedSubscriber)InstanceStorage)ICloudInstanceProviderNodeProviderAdapter)RayInstaller)
Reconciler)InMemoryStorage)CloudInstanceUpdater)
RayStopper)ThreadedRayInstaller)AutoscalerMetricsReporter)ResourceDemandScheduler)get_cluster_resource_state)AutoscalingState)urlsplitc                   @   sz   e Zd Z		ddedededee dee ddfdd	Z	d
e
defddZdededed
e
fddZdee fddZdS )
AutoscalerNsession_nameconfig_reader
gcs_clientevent_loggermetrics_reporterreturnc                 C   s   || _ | }td|   || _d| _d| _t | _	t | _
|| _|| _| || | j||| j| jd t| j| _dS )a=  
        Args:
            session_name: The name of the ray session.
            config_reader: The config reader.
            gcs_client: The GCS client.
            event_logger: The event logger for emitting cluster events.
            metrics_reporter: The metrics reporter for emitting cluster metrics.
        zUsing Autoscaling Config: 
N)r   configcloud_providerr    )_config_readerget_cached_autoscaling_configloggerinfodump_gcs_client_cloud_instance_provider_instance_managerr   _ray_stop_errors_queue_ray_install_errors_queue_event_logger_metrics_reporter_init_cloud_instance_provider_init_instance_managerr   
_scheduler)selfr   r   r    r!   r"   r$    r6   P/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/v2/autoscaler.py__init__0   s$   zAutoscaler.__init__r$   c                 C   s   |  }|d dkr| |d< t|d|| _dS |jtjkr/| jj	|d< t
|d| _dS t||d}t||d| _dS )	z
        Initialize the cloud provider, and its dependencies (the v1 node provider)

        Args:
            config: The autoscaling config.
            config_reader: The config reader.

        typekuberayhead_node_typecluster_namegcs_address)provider_config)v1_providerr   N)get_provider_configget_head_node_typer   
get_configr,   providerr   	READ_ONLYr+   addressr	   r   r   )r5   r$   r   r>   node_provider_v1r6   r6   r7   r2   W   s(   
z(Autoscaler._init_cloud_instance_providerr%   c              	   C   s   t |t d}g }|t|d |t|| jd | sKt|trKt	d| j
j j}|dus5J d|t||t|j|d| j| pGdd	 t||d
| _dS )zH
        Initialize the instance manager, and its dependencies.
        )
cluster_idstorage)r%   )r    error_queuez//NzInvalid GCS address format)rC   r$   2   )head_node_ipinstance_storageray_installerrI   max_concurrent_installs)rL   "instance_status_update_subscribers)r   r   appendr   r   r.   disable_node_updaters
isinstancer   r   r+   rE   hostnamer   r   r?   r/   get_max_num_worker_nodesr   r-   )r5   r   r%   r    r$   rL   subscribersrK   r6   r6   r7   r3   y   s<   

z!Autoscaler._init_instance_managerc                 C   s   zQg }| j  s|| j   | j  rg }| j s)|| j  | j rt| j}| j  | j	 }t
j| j| j| j|| j | j |||| jd
W S  tyi } zt| W Y d}~dS d}~ww )a  Update the autoscaling state of the cluster by reconciling the current
        state of the cluster resources, the cloud providers as well as instance
        update subscribers with the desired state.

        Returns:
            AutoscalingState: The new autoscaling state of the cluster or None if
            the state is not updated.

        Raises:
            None: No exception.
        )
instance_manager	schedulerr%   ray_cluster_resource_statenon_terminated_cloud_instancescloud_provider_errorsray_install_errorsray_stop_errorsautoscaling_configr"   N)r.   emptyrP   getr/   r   r+   r&   !refresh_cached_autoscaling_configr'   r   	reconciler-   r4   r,   get_non_terminatedpoll_errorsr1   	Exceptionr(   	exception)r5   r\   r[   rX   r]   er6   r6   r7   update_autoscaling_state   s:   







z#Autoscaler.update_autoscaling_state)NN)__name__
__module____qualname__strr   r   r   r   r   r8   r
   r2   r   r3   r   rg   r6   r6   r6   r7   r   /   sB    
'
"
0r   )6loggingqueuer   typingr   r   ray._rayletr   !ray.autoscaler._private.providersr   ray.autoscaler.v2.event_loggerr   Iray.autoscaler.v2.instance_manager.cloud_providers.kuberay.cloud_providerr   Kray.autoscaler.v2.instance_manager.cloud_providers.read_only.cloud_providerr	   )ray.autoscaler.v2.instance_manager.configr
   r   r   3ray.autoscaler.v2.instance_manager.instance_managerr   r   3ray.autoscaler.v2.instance_manager.instance_storager   0ray.autoscaler.v2.instance_manager.node_providerr   r   0ray.autoscaler.v2.instance_manager.ray_installerr   -ray.autoscaler.v2.instance_manager.reconcilerr   *ray.autoscaler.v2.instance_manager.storager   Eray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updaterr   :ray.autoscaler.v2.instance_manager.subscribers.ray_stopperr   Eray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installerr   "ray.autoscaler.v2.metrics_reporterr   ray.autoscaler.v2.schedulerr   ray.autoscaler.v2.sdkr   !ray.core.generated.autoscaler_pb2r   urllib.parser   	getLoggerrh   r(   r   r6   r6   r6   r7   <module>   s2    
