o
    bi                     @   s   U d dl Z d dlZd dlZd dlmZmZ d dlZd dlmZ d dl	m
Z
 dZed ZdZejd ddd	G d
d dZe Zejed< dd ZdS )    N)DictList)DataContext)NodeAffinitySchedulingStrategy<      g333333?)num_cpusmax_restartsmax_task_retriesc                   @   sX   e Zd ZdZdd Zdd Zdee defdd	Z	d
d Z
dee fddZdd ZdS )AutoscalingRequestera  Actor to make resource requests to autoscaler for the datasets.

    The resource requests are set to timeout after RESOURCE_REQUEST_TIMEOUT seconds.
    For those live requests, we keep track of the last request made for each execution,
    which overrides all previous requests it made; then sum the requested amounts
    across all executions as the final request to the autoscaler.
    c                    sB   i  _ t _t j _ fdd}tj|dd _	 j	
  d S )Nc                      s    	 t t t jj  qN)timesleepPURGE_INTERVALrayget_self_handlepurge_expired_requestsremote selfr   f/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/execution/autoscaling_requester.pypurge_thread_run)   s   
z7AutoscalingRequester.__init__.<locals>.purge_thread_runT)targetdaemon)_resource_requestsRESOURCE_REQUEST_TIMEOUT_timeoutr   get_runtime_contextcurrent_actorr   	threadingThread_purge_threadstart)r   r   r   r   r   __init__    s   zAutoscalingRequester.__init__c                 C   s    |    tjjj|  d d S N)bundles)_purger   
autoscalersdkrequest_resources_aggregate_requestsr   r   r   r   r   3   s   z+AutoscalingRequester.purge_expired_requestsreqexecution_idc                 C   s8   |    |t | j f| j|< tjjj|  d d S r'   )	r)   r   r   r   r   r*   r+   r,   r-   )r   r.   r/   r   r   r   r,   7   s
   
z&AutoscalingRequester.request_resourcesc                 C   s<   t   }t| j D ]\}\}}||k r| j| qd S r   )r   listr   itemspop)r   nowk_tr   r   r   r)   D   s   zAutoscalingRequester._purgereturnc                 C   s   g }| j  D ]\}\}}|| qdd }||}|dkrBt }d|v rB||d krBtt|d  | }|ddig|  |S )Nc                 S   s&   d}| D ]}d|v r||d 7 }q|S )Nr   CPUr   )r.   r	   rr   r   r   get_cpusP   s   z:AutoscalingRequester._aggregate_requests.<locals>.get_cpusr   r8      )r   r1   extendr   cluster_resourcesmathceilARTIFICIAL_CPU_SCALING_FACTOR)r   r.   r5   r9   r:   r	   totaldeltar   r   r   r-   K   s   z(AutoscalingRequester._aggregate_requestsc                 C   s
   || _ dS )z&Set the timeout. This is for test onlyN)r   )r   ttlr   r   r   _test_set_timeouth   s   
z&AutoscalingRequester._test_set_timeoutN)__name__
__module____qualname____doc__r&   r   r   r   strr,   r)   r-   rD   r   r   r   r   r      s    r   _autoscaling_requester_lockc                  C   sb   t  } | j}tt  dd}t tj	dddd|d
 W  d    S 1 s*w   Y  d S )NF)softr   Tdetached)name	namespaceget_if_existslifetimescheduling_strategy)r   get_currentrQ   r   r   r    get_node_idrJ   r   optionsr   )ctxrQ   r   r   r   )get_or_create_autoscaling_requester_actorr   s    
$rV   )r>   r"   r   typingr   r   r   ray.data.contextr   ray.util.scheduling_strategiesr   r   r   r@   r   r   RLockrJ   __annotations__rV   r   r   r   r   <module>   s   
 X