o
    biB                  	   @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	 d dl
Z
d dlmZmZmZmZ d dlmZmZ d dlmZ eeZdZdZd	Zd
ZdZdZdZdZdZ dZ!dZ"G dd dZ#dee$ef dee$ef fddZ%de$dee$ef fddZ&dee$ef fddZ'dee$ef dee$ef fddZ(dee$ef d e)dee$ef fd!d"Z*dee$ef d e)dee$e+f fd#d$Z,d%ee$e$f d&ee$ee$e$f f d'e$de+fd(d)Z-d%ee$e$f d&ee$ee$e$f f de	e+ fd*d+Z.d%ee$e$f d&ee$ee$e$f f d'e$de	e+ fd,d-Z/d.ee$e+f d&ee$ee$e$f f de	e+ fd/d0Z0d1e$de+fd2d3Z1d%ee$ef d'e$dee$e+f fd4d5Z2dS )6    N)chain)AnyDictOptional)DISABLE_LAUNCH_CONFIG_CHECK_KEYDISABLE_NODE_UPDATERS_KEYFOREGROUND_NODE_LAUNCH_KEYWORKER_LIVENESS_CHECK_KEY)node_providerutils)validate_configautoscalerOptionsidleTimeoutSecondsupscalingMode
AggressiveDefaultConservative   z!cloud.google.com/gke-tpu-topologyz$cloud.google.com/gke-tpu-accelerator	headgroupc                   @   s6   e Zd ZdZdd Zdd Zdeeef fddZ	d	S )
AutoscalingConfigProducera  Produces an autoscaling config by reading data from the RayCluster CR.

    Used to fetch the autoscaling config at the beginning of each autoscaler iteration.

    In the context of Ray deployment on Kubernetes, the autoscaling config is an
    internal interface.

    The autoscaling config carries the strict subset of RayCluster CR data required by
    the autoscaler to make scaling decisions; in particular, the autoscaling config does
    not carry pod configuration data.

    This class is the only public object in this file.
    c                 C   s   t j|d| _d| | _d S )N)	namespacezrayclusters/)r
   KubernetesHttpApiClientkubernetes_api_client_ray_cr_path)selfray_cluster_nameray_cluster_namespace r   f/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/_private/kuberay/autoscaling_config.py__init__6   s   z"AutoscalingConfigProducer.__init__c                 C   s   |   }t|}|S )N)#_fetch_ray_cr_from_k8s_with_retries&_derive_autoscaling_config_from_ray_cr)r   ray_crautoscaling_configr   r   r   __call__<   s   z"AutoscalingConfigProducer.__call__returnc                 C   sv   t dtd D ]1}z
| j| jW   S  tjy8 } z|tk r+td t	
t n|dW Y d}~qd}~ww t)zFetch the RayCluster CR by querying the K8s API server.

        Retry on HTTPError for robustness, in particular to protect autoscaler
        initialization.
           z1Failed to fetch RayCluster CR from K8s. Retrying.N)rangeMAX_RAYCLUSTER_FETCH_TRIESr   getr   requests	HTTPErrorlogger	exceptiontimesleepRAYCLUSTER_FETCH_RETRY_SAssertionError)r   ier   r   r   r    A   s   
z=AutoscalingConfigProducer._fetch_ray_cr_from_k8s_with_retriesN)
__name__
__module____qualname____doc__r   r$   r   strr   r    r   r   r   r   r   '   s
    r   r"   r%   c           	      C   s   t | d d }t| d }tdd | D }t }| d ti }t|v r/|t d }nd}|tt	kr;d}n|tt
krEd	}n|ttkrOd	}nd	}|| d d
 t||||d|}t| |S )Nmetadatar   specc                 s   s    | ]}|d  V  qdS )max_workersNr   ).0	node_typer   r   r   	<genexpr>`   s    
z9_derive_autoscaling_config_from_ray_cr.<locals>.<genexpr>g      N@g      ?r&   i  name)providercluster_namehead_node_typeavailable_node_typesr;   idle_timeout_minutesupscaling_speed)_generate_provider_config/_generate_available_node_types_from_ray_cr_specsumvalues*_generate_legacy_autoscaling_config_fieldsr)   AUTOSCALER_OPTIONS_KEYIDLE_SECONDS_KEYUPSCALING_KEYUPSCALING_VALUE_CONSERVATIVEUPSCALING_VALUE_DEFAULTUPSCALING_VALUE_AGGRESSIVE_HEAD_GROUP_NAMEr   )	r"   provider_configrC   global_max_workerslegacy_autoscaling_fieldsautoscaler_optionsrD   rE   r#   r   r   r   r!   W   s>   
r!   r   c                 C   s   ddd| t dtdtdtdiS )zGenerates the `provider` field of the autoscaling config, which carries data
    required to instantiate the KubeRay node provider.
    typekuberayr   TF)r   r   r   r	   )r   r   r   r   rF      s   rF   c                   C   s   i g dg g g g g g i d
S )zEGenerates legacy autoscaling config fields required for compatibiliy.F)
file_mountscluster_synced_filesfile_mounts_sync_continuouslyinitialization_commandssetup_commandshead_setup_commandsworker_setup_commandshead_start_ray_commandsworker_start_ray_commandsauthr   r   r   r   r   rJ      s   rJ   ray_cr_specc                 C   s*   | d }t t|ddidd | d D S )z[Formats autoscaler "available_node_types" field based on the Ray CR's group
    specs.
    headGroupSpecTis_headc                 S   s   i | ]}|d  t |ddqS )	groupNameFrd   )_node_type_from_group_spec)r<   worker_group_specr   r   r   
<dictcomp>   s    zC_generate_available_node_types_from_ray_cr_spec.<locals>.<dictcomp>workerGroupSpecs)rQ   rg   )rb   rc   r   r   r   rG      s   rG   
group_specre   c                 C   sp   |rd }}n| d |  dd }| d |  dd }t| |}||i |d}|  t}|dur6t||d< |S )	z/Converts CR group spec to autoscaler node type.r   minReplicas
numOfHostsr&   maxReplicas)min_workersr;   node_config	resourcesNidle_timeout_s)r)   "_get_ray_resources_from_group_specrL   float)rk   re   ro   r;   rq   r=   rr   r   r   r   rg      s   


	rg   c                 C   sj  |  di }| d d d d  di }|rtn| d }t|||}t|||}t||}t||}t||}	i }
t|ts@J ||
d< |d	urL||
d
< |d	urd|vrX||
d< 	 d| d d v rt	| d d d v rt
| d d d v r| d d d t	 }| d d d t
 }t||}|rd|
d| d< ntdt	 dt
 d |	d	ur|	|
d< |
| |
S )a'  
    Infers Ray resources from rayStartCommands and K8s limits.
    The resources extracted are used in autoscaling calculations.

    TODO: Expose a better interface in the RayCluster CRD for Ray resource annotations.
    For now, we take the rayStartParams as the primary source of truth.
    rayStartParamstemplater:   
containersr   rq   rf   CPUNGPUTPUnodeSelectorr&   zTPU-z-headzPods using TPUs require both `z` and `z` node selectors. See https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/tpu.html#configuring-ray-pods-for-tpu-usage and https://cloud.google.com/kubernetes-engine/docs/how-to/tpus.memory)r)   rQ   _get_num_cpus_get_num_gpus_get_custom_resources_get_num_tpus_get_memory
isinstanceintGKE_TPU_TOPOLOGY_LABELGKE_TPU_ACCELERATOR_LABELr   tpu_node_selectors_to_typer,   errorupdate)rk   re   ray_start_paramsk8s_resources
group_namenum_cpusnum_gpuscustom_resource_dictnum_tpusr|   rq   topologyacceleratoraccelerator_typer   r   r   rs      sN   




rs   r   r   r   c                 C   sl   d| v r
t | d S d|di v r|d d }t|S d|di v r.|d d }t|S td| d)zgGet CPU annotation from ray_start_params or k8s_resources,
    with priority for ray_start_params.
    znum-cpuscpulimitsr*   z6Autoscaler failed to detect `CPU` resources for group zY.
Set the `--num-cpus` rayStartParam and/or the CPU resource limit for the Ray container.)r   r)   _round_up_k8s_quantity
ValueError)r   r   r   cpu_quantityr   r   r   r}   ,  s   
r}   c                 C   s`   d| v r
t | d S d|di v r|d d }t|S d|di v r.|d d }t|S dS )sGet memory resource annotation from ray_start_params or k8s_resources,
    with priority for ray_start_params.
    r|   r   r*   N)r   r)   r   )r   r   memory_quantityr   r   r   r   E  s   r   c                 C   sf   d| v r
t | d S t|di  |di  D ]\}}|dr0t|}|dkr0|  S qdS )r   znum-gpusr   r*   gpur   N)r   r   r)   itemsendswithr   )r   r   r   keyresource_quantityr   r   r   r   r~   V  s   	
r~   r   c                 C   sP   d| v r| d S dD ]}| |i  d}|dur%t|}|dkr%|  S q
dS )zGet TPU custom resource annotation from custom_resource_dict in ray_start_params,
    or k8s_resources, with priority for custom_resource_dict.
    rz   )r   r*   zgoogle.com/tpuNr   )r)   r   )r   r   typtpu_resource_quantityr   r   r   r   r   s  s   r   quantityc                 C   s    t | }|jtjd}t|S )zRounds a Kubernetes resource quantity up to the nearest integer.

    Args:
        quantity: Resource quantity as a string in the canonical K8s form.

    Returns:
        The quantity, rounded up, as an integer.
    )rounding)r   parse_quantityto_integral_valuedecimalROUND_UPr   )r   resource_decimalroundedr   r   r   r     s   
	r   c              
   C   s   d| vri S | d }z0|dd  dd}t|}t|ts!J | D ]\}}t|ts0J t|ts7J q%W |S  tyQ } zt	
d| d |d}~ww )	a?  Format custom resources based on the `resources` Ray start param.

    Currently, the value of the `resources` field must
    be formatted as follows:
    '"{"Custom1": 1, "Custom2": 5}"'.

    This method first converts the input to a correctly formatted
    json string and then loads that json string to a dict.
    rq   r&   \ z1Error reading `resource` rayStartParam for group z. For the correct format, refer to example configuration at https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml.N)replacejsonloadsr   dictr   r8   r   	Exceptionr,   r   )r   r   resources_stringresources_jsonrq   r   valuer3   r   r   r   r     s&   

r   )3r   r   loggingr.   	itertoolsr   typingr   r   r   r*   !ray.autoscaler._private.constantsr   r   r   r	   ray.autoscaler._private.kuberayr
   r   ray.autoscaler._private.utilr   	getLoggerr4   r,   rK   rL   rM   rP   rO   rN   r(   r0   r   r   rQ   r   r8   r!   rF   rJ   rG   boolrg   r   rs   r}   r   r~   r   r   r   r   r   r   r   <module>   s    
"07








O









