o
    biHm                  
   @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlm	Z	m
Z
mZ d dlZd dlmZ d dlZd dlm  mZ d dlmZmZmZ d dlmZ d dlmZmZmZ d dlmZmZ d d	l m!Z!m"Z" d d
l#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+m,Z,m-Z-m.Z. e/e0Z1dZ2dZ3dd Z4dd Z5ddefddZ6	dde7de
e7e	f dee
e7e	f  fddZ8G dd de(Z9dS )    N)OrderedDictdefaultdict)AnyDictList)ServiceResource)"CLOUDWATCH_AGENT_INSTALLED_AMI_TAGCLOUDWATCH_AGENT_INSTALLED_TAGCloudwatchHelperbootstrap_aws)boto_exception_handlerclient_cacheresource_cache)cf
cli_logger)BOTO_CREATE_MAX_RETRIESBOTO_MAX_RETRIES)LogTimer)NodeLaunchException)NodeProvider)TAG_RAY_CLUSTER_NAMETAG_RAY_LAUNCH_CONFIGTAG_RAY_NODE_KINDTAG_RAY_NODE_NAMETAG_RAY_USER_NODE_TYPE   c                 C   s   t | v r| t  | d< | t = | S )z=Convert the Ray node name tag to the AWS-specific 'Name' tag.Namer   tags r!   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/_private/aws/node_provider.pyto_aws_format,      r#   c                 C   s   d| v r| d | t < | d= | S )z=Convert the AWS-specific 'Name' tag to the Ray node name tag.r   r   r   r!   r!   r"   from_aws_format5   r$   r%   returnc                 C   s   |pi }t d| |fi |S )z3Make client, retrying requests up to `max_retries`.ec2)r   regionmax_retriesaws_credentialsr!   r!   r"   make_ec2_resource>   s   r,   r)   r+   c                 C   st   g }|pi }t d| tfi |}| }|t|d  d|v r8|j|d d}|t|d  d|v s"|S )aq  Get all instance-types/resources available in the user's AWS region.
    Args:
        region: the region of the AWS provider. e.g., "us-west-2".
    Returns:
        final_instance_types: a list of instances. An example of one element in
        the list:
            {'InstanceType': 'm5a.xlarge', 'ProcessorInfo':
            {'SupportedArchitectures': ['x86_64'], 'SustainedClockSpeedInGhz':
            2.5},'VCpuInfo': {'DefaultVCpus': 4, 'DefaultCores': 2,
            'DefaultThreadsPerCore': 2, 'ValidCores': [2],
            'ValidThreadsPerCore': [1, 2]}, 'MemoryInfo': {'SizeInMiB': 16384},
            ...}

    r'   InstanceTypes	NextToken)r.   )r   r   describe_instance_typesextendcopydeepcopy)r)   r+   final_instance_typesr'   instance_typesr!   r!   r"   list_ec2_instancesD   s   r5   c                   @   s
  e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Zdeeef fddZedeeeef  deeeef  ddfddZdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zed*d+ Zed,eeef deeef fd-d.ZdS )/AWSNodeProvideri  c                 C   s   t | || |dd| _|d}t|d t|d| _t|d d|d| _i | _t	t
| _d| _t | _| j  t | _| j  t | _t | _i | _d S )Ncache_stopped_nodesTr+   r)   r(   r   )r   __init__getr7   r,   r   r'   ec2_fail_fast	tag_cacher   dicttag_cache_pendingbatch_thread_count	threadingEventbatch_update_donesetready_for_new_batchLocktag_cache_lock
count_lockcached_nodes)selfprovider_configcluster_namer+   r!   r!   r"   r8   f   s.   








zAWSNodeProvider.__init__c                 C   s   t |}dddgddt| jgdg}| D ]\}}|d||gd qtd t| jj	j
|d}W d    n1 sCw   Y  |D ]}|j| jv rSqJtdd	 |jD | j|j< qJd
d	 |D | _dd |D S )Ninstance-state-namependingrunningr   Valuestag:{}z+Failed to fetch running instances from AWS.Filtersc                 S      i | ]	}|d  |d qS KeyValuer!   .0xr!   r!   r"   
<dictcomp>       z8AWSNodeProvider.non_terminated_nodes.<locals>.<dictcomp>c                 S      i | ]}|j |qS r!   idrX   noder!   r!   r"   rZ          c                 S      g | ]}|j qS r!   r]   r_   r!   r!   r"   
<listcomp>       z8AWSNodeProvider.non_terminated_nodes.<locals>.<listcomp>)r#   formatr   rJ   itemsappendr   listr'   	instancesfilterr^   r;   r%   r    rG   )rH   tag_filtersfilterskvnodesr`   r!   r!   r"   non_terminated_nodes   s2   

z$AWSNodeProvider.non_terminated_nodesc                 C   s   |  |}|jd dkS )Nr   rM   _get_cached_nodestaterH   node_idr`   r!   r!   r"   
is_running   s   
zAWSNodeProvider.is_runningc                 C   s   |  |}|jd }|dvS )Nr   )rM   rL   rq   )rH   ru   r`   rs   r!   r!   r"   is_terminated   s   

zAWSNodeProvider.is_terminatedc                 C   sR   | j  | j| }| j|i }t|fi |W  d    S 1 s"w   Y  d S N)rE   r;   r=   r9   r<   )rH   ru   d1d2r!   r!   r"   	node_tags   s
   
$zAWSNodeProvider.node_tagsc                 C   $   |  |}|jd u r| |}|jS rx   )rr   public_ip_address	_get_nodert   r!   r!   r"   external_ip      


zAWSNodeProvider.external_ipc                 C   r|   rx   )rr   private_ip_addressr~   rt   r!   r!   r"   internal_ip   r   zAWSNodeProvider.internal_ipc                 C   sD  d}| j $ | jsd}| j  | j  | j  | j| | W d    n1 s,w   Y  |rTtt	 | j  | 
  | j  W d    n1 sOw   Y  | j |  jd7  _W d    n1 siw   Y  | j  | j" |  jd8  _| jdkr| j  W d    d S W d    d S 1 sw   Y  d S )NFTr   r   )rE   r=   rC   waitclearrA   updatetimesleepTAG_BATCH_DELAY_update_node_tagsrB   rF   r>   )rH   ru   r    is_batching_threadr!   r!   r"   set_node_tags   s4   


	


"zAWSNodeProvider.set_node_tagsc                 C   s`   t t}| j D ]\}}| D ]	}|| | q| j| | q	t t| _| | d S rx   )	r   rh   r=   rf   rg   r;   r   r<   _create_tags)rH   batch_updatesru   r    rY   r!   r!   r"   r      s   
z!AWSNodeProvider._update_node_tagsc              	   C   s~   |  D ]8\\}}}d|||}td| |tkrd}| jjjj|||dgd W d    n1 s7w   Y  qd S )NzSet tag {}={} on {}zAWSNodeProvider: {}r   rT   )	ResourcesTags)rf   re   r   r   r'   metaclientcreate_tags)rH   r   rm   rn   node_idsmr!   r!   r"   r      s   

zAWSNodeProvider._create_tagsr&   c                 C   s  t tt| }i }| jrdddgddt| jgddt	|t	 gddt
|t
 gdg}t|v rG|dt|t gd t| jjj|dd| }dd	 |D }d
d |D }|rtdt| td- |D ]"}tdd |jD | j|j< |jd dkrtd|j |  qvW d   n1 sw   Y  | jjjj|d |D ]}	| |	| q|t|8 }i }
|r|  |||}
|}|!|
 |S )zCreates instances.

        Returns dict mapping instance id to ec2.Instance object for the created
        instances.
        rK   stoppedstoppingrN   rP   rQ   Nc                 S   rb   r!   r]   rX   nr!   r!   r"   rc   &  rd   z/AWSNodeProvider.create_node.<locals>.<listcomp>c                 S   r\   r!   r]   r   r!   r!   r"   rZ   '  ra   z/AWSNodeProvider.create_node.<locals>.<dictcomp>zsReusing nodes {}. To disable reuse, set `cache_stopped_nodes: False` under `provider` in the cluster configuration.zStopping instances to reusec                 S   rS   rT   r!   rW   r!   r!   r"   rZ   5  r[   r   zWaiting for instance {} to stopInstanceIds)"r   sortedr1   r2   rf   r7   re   r   rJ   r   r   r   rg   rh   r'   ri   rj   r   printrender_listgroupr%   r    r;   r^   rs   wait_until_stoppedr   r   start_instancesr   len_create_noder   )rH   node_configr    countreused_nodes_dictrl   reuse_nodesreuse_node_idsr`   ru   created_nodes_dictall_created_nodesr!   r!   r"   create_node   sf   		
zAWSNodeProvider.create_node	tag_specsuser_tag_specsNc                 C   s   |D ]?}|d dkr<|d D ],}d}| d d D ]}|d |d kr,d}|d |d<  nq|s:| d d  |g7  < qq| |g7 } qd	S )
aZ  
        Merges user-provided node config tag specifications into a base
        list of node provider tag specifications. The base list of
        node provider tag specs is modified in-place.

        This allows users to add tags and override values of existing
        tags with their own, and only applies to the resource type
        "instance". All other resource types are appended to the list of
        tag specs.

        Args:
            tag_specs (List[Dict[str, Any]]): base node provider tag specs
            user_tag_specs (List[Dict[str, Any]]): user's node config tag specs
        ResourceTypeinstancer   Fr   rU   TrV   Nr!   )r   r   user_tag_specuser_tagexiststagr!   r!   r"   _merge_tag_specsH  s    
z AWSNodeProvider._merge_tag_specsc                 C   sn  i }t |}| }t| jdg}| D ]\}}|||d qt| jdr9| 	|}	|	r9|
tddg d|dg}
|dg }t|
| |d}|d||
d	 d
}i }ttt|}td|d D ]}zud|v r|d }|dd  t||d< n||t|  }||d< ||d< | jjd i |}dd |D }tjd||d, |D ]}d}|jr|jd p|}tjd|jt|jd |dd qW d    W  |S 1 sw   Y  W  |S  tj j!y4 } zC|d7 }||kr$zt"|j#d d |j#d d t$% d}W n t&y   t'(d| Y nw tj)d|d nt(d| W Y d }~qkd }~ww |S )!NrT   agentTruer   )r   r   TagSpecifications	SubnetIdsr   )MinCountMaxCountr   r   NetworkInterfacesSecurityGroupIdsnetwork_interfacesSubnetId	subnet_idc                 S   r\   r!   r]   r   r!   r!   r"   rZ     ra   z0AWSNodeProvider._create_node.<locals>.<dictcomp>zLaunched {} nodes)_tagsrL   MessagezLaunched instance {}r   )rs   infoErrorCode)categorydescriptionsrc_exc_infozCouldn't parse exception.z2Failed to launch instances. Max attempts exceeded.)excz3create_instances: Attempt failed with {}, retrying.r!   )*r#   r1   r   rJ   rf   rg   r
   cloudwatch_config_existsrI   _check_ami_cwa_installationr0   r	   r9   r6   r   popr   maxr   r   rangestrr:   create_instancesr   r   state_reasonr   instance_idr<   rs   botocore
exceptionsClientErrorr   responsesysexc_info	Exceptionloggerwarningabort)rH   r   r    r   r   conf	tag_pairsrm   rn   cwa_installedr   r   
subnet_ids
subnet_idxcli_logger_tags	max_triesattemptnet_ifsr   createdr   r   r   r!   r!   r"   r   i  s   


	


zAWSNodeProvider._create_nodec                 C   sf   |  |}| jr-|jrtdtd | |  d S tdtd | |  d S |  d S )NzTerminating instance {} ,(cannot stop spot instances, only terminate)zStopping instance {} f(to terminate instead, set `cache_stopped_nodes: False` under `provider` in the cluster configuration))	rr   r7   spot_instance_request_idr   r   r   dimmed	terminatestoprt   r!   r!   r"   terminate_node  s(   
	zAWSNodeProvider.terminate_nodec                 C   sj   | j jjj|d gd}d}|d}|r3t|dks%J dt| d|d d	d
}t|v r3d}|S )NImageId)ImageIdsFImagesr   z9Expected to find only 1 AMI with the given ID, but found .r   r    T)r'   r   r   describe_imagesr9   r   r   )rH   configr   r   images
image_namer!   r!   r"   r     s   
z+AWSNodeProvider._check_ami_cwa_installationc                 C   s  |sd S | j jjj}| j jjj}|g |g i}| jr\g }g }|D ]}| |jr-||g7 }q||g7 }q|rCt	dt
d t| |rSt	dt
d t| |||< |||< n|||< | jd urh| jnt|}| D ]\}	}
tdt|
|D ]}|	|
|||  d q|qpd S )NzStopping instances {} r   zTerminating instances {} r   r   r   )r'   r   r   terminate_instancesstop_instancesr7   rr   r   r   r   r   r   r   max_terminate_nodesr   rf   r   )rH   r   terminate_instances_funcstop_instances_funcnodes_to_terminatespot_idson_demand_idsru   r   terminate_funcro   startr!   r!   r"   terminate_nodes  sP   


zAWSNodeProvider.terminate_nodesc              	   C   s   |  i  || jv r| j| S tttdD ],}t| jjj|gd}t	|dkr/|d   S t
d|t	|t|d t tt qtd|)z7Refresh and get info for this node, updating the cache.r   r   r   zAttempt to fetch EC2 instances that have instance ID {}. Got {} matching EC2 instances. Will retry after {} second. This is retry number {}, and the maximum number of retries is {}.zInvalid instance id {})rp   rG   r   r   r   rh   r'   ri   rj   r   r   r   LIST_RETRY_DELAY_SECr   r   AssertionErrorre   )rH   ru   attemptsmatchesr!   r!   r"   r~   M  s"   


zAWSNodeProvider._get_nodec                 C   s   || j v r
| j | S | |S )z>Return node info from cache if possible, otherwise fetches it.)rG   r~   )rH   ru   r!   r!   r"   rr   i  s   


z AWSNodeProvider._get_cached_nodec                 C   s   t | S rx   r   )cluster_configr!   r!   r"   bootstrap_configp  s   z AWSNodeProvider.bootstrap_configr  c                 C   s  d| vr| S t | } t| d d | d d}dd |D }| d }| d }|D ]}|| d d	 }||v r|| d
 d }d|i}||krg|| d d }	t|	d d }	dtj }
t|	|
 }||d< tjj	
 D ]}|||}|||}|r||| < |rd|d| < qm||| di  ||| di kr||| d< td|| q*td| d | d d  d | S )z=Fills out missing "resources" field for available_node_types.available_node_typesproviderr)   r+   c                 S   s   i | ]}|d  |qS )InstanceTyper!   )rX   r   r!   r!   r"   rZ     s    
zJAWSNodeProvider.fillout_available_node_types_resources.<locals>.<dictcomp>head_node_typer   r  VCpuInfoDefaultVCpusCPU
MemoryInfo	SizeInMiBi   r   memoryzaccelerator_type:	resourcesz#Updating the resources of {} to {}.zInstance type z! is not available in AWS region: r   )r1   r2   r5   r9   intray_constants&DEFAULT_OBJECT_STORE_MEMORY_PROPORTIONray_privateacceleratorsget_all_accelerator_managers!get_ec2_instance_num_accelerators!get_ec2_instance_accelerator_typeget_resource_namer   r   debugre   
ValueError)r  instances_listinstances_dictr  r	  	node_typeinstance_typecpusautodetected_resourcesmemory_totalpropmemory_resourcesaccelerator_managernum_acceleratorsaccelerator_typer!   r!   r"   &fillout_available_node_types_resourcest  s   







z6AWSNodeProvider.fillout_available_node_types_resources)__name__
__module____qualname__r   r8   rp   rv   rw   r{   r   r   r   r   r   r   r   r   r   staticmethodr   r   r   r   r   r   r~   rr   r  r)  r!   r!   r!   r"   r6   c   sF    !%L };


r6   rx   ):r1   loggingr   r?   r   collectionsr   r   typingr   r   r   r   boto3.resources.baser   r  ray._private.ray_constantsr  r  8ray.autoscaler._private.aws.cloudwatch.cloudwatch_helperr   r	   r
   "ray.autoscaler._private.aws.configr   !ray.autoscaler._private.aws.utilsr   r   r   "ray.autoscaler._private.cli_loggerr   r   !ray.autoscaler._private.constantsr   r   !ray.autoscaler._private.log_timerr   $ray.autoscaler.node_launch_exceptionr   ray.autoscaler.node_providerr   ray.autoscaler.tagsr   r   r   r   r   	getLoggerr*  r   r   r   r#   r%   r,   r   r5   r6   r!   r!   r!   r"   <module>   sF    
		

