o
    biȁ                     @   s.  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
mZmZmZmZmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z! e
rld dl"m#Z#m$Z$ e %e&Z'ej()dddkZ*eefZ+G dd dZ,G dd deZ-G dd de-Z.dS )    N)ABCabstractmethod)defaultdict)TYPE_CHECKINGCallableDictIterableListOptional)ExecutionOptionsExecutionResources)PhysicalOperatorReportsExtraResourceUsage)AllToAllOperator)InputDataBuffer)ZipOperator)memory_string)DataContextlog_once)OpStateTopologyRAY_DATA_DEBUG_RESOURCE_MANAGER01c                   @   s  e Zd ZdZdZdZdZdddedeg e	f d	e
fd
dZdd ZdddddefddZdd Zde	fddZde	fddZde	fddZde	fddZdede	fdd Zdedefd!d"Zdefd#d$Zed-d&d'Zdedee fd(d)Zdedee	 fd*d+Zd,S ).ResourceManagerz@A class that manages the resource usage of a streaming executor.
         ?g      ?topologyr   optionsget_total_resourcesdata_contextc                 C   s   || _ || _|| _t | _d| _t | _t | _t | _	i | _
i | _i | _tt| _tt| _t| _i | _i | _d | _|jrTtdd |D }|rTt| |j| _|jd ur\|jn	|  rc| jn| j| _|   d S )Nr   c                 s   s    | ]}|  V  qd S N)%implements_accurate_memory_accounting.0op r'   a/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/execution/resource_manager.py	<genexpr>Z   s    
z+ResourceManager.__init__.<locals>.<genexpr>) 	_topology_options_get_total_resourcesr   zero_global_limits_global_limits_last_update_time_global_usage_global_running_usage_global_pending_usage
_op_usages_op_running_usages_op_pending_usagesr   int_mem_op_internal_mem_op_outputsDEBUG_RESOURCE_MANAGER_debug_downstream_fraction_downstream_object_store_memory_op_resource_allocatorop_resource_reservation_enabledallReservationOpResourceAllocatorop_resource_reservation_ratio+override_object_store_memory_limit_fractionop_resource_allocator_enabled*DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION9DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION_NO_RESERVATION#_object_store_memory_limit_fraction)_warn_about_object_store_memory_if_needed)selfr   r   r    r!   should_enabler'   r'   r(   __init__6   s@   







zResourceManager.__init__c              
   C   s   ddl }ddlm} ddlm} | sdS | }|dd}|dd}|dkrP|| }|dk rR|drTt	| d	|d
d|d dd|d dd dS dS dS dS )zDWarn if object store memory is configured below 50% of total memory.r   N)WARN_PREFIXr   memoryobject_store_memoryr   $ray_data_object_store_memory_warningz. Ray's object store is configured to use only z.1%z of available memory (g    eA.1fz
GB out of a!  GB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.)
rayray.data.contextrK   ray.util.debugr   is_initializedcluster_resourcesgetloggerwarning)rH   rP   rK   r   rT   total_memoryrM   object_store_fractionr'   r'   r(   rG   n   s0   
z9ResourceManager._warn_about_object_store_memory_if_neededr&   r   stater   returnc                 C   sj   t |trdS |jjpd}||jj7 }| }|jD ]}||jj|jj 7 }q|| j	|< || j
|< || S )Nr   )
isinstancer   metrics"obj_store_mem_pending_task_outputsobj_store_mem_internal_outqueueoutqueue_memory_usageoutput_dependenciesobj_store_mem_internal_inqueue!obj_store_mem_pending_task_inputsr7   r8   )rH   r&   rZ   mem_op_internalmem_op_outputsnext_opr'   r'   r(   _estimate_object_store_memory   s   



z-ResourceManager._estimate_object_store_memoryc           	      C   s  t ddd| _t ddd| _t ddd| _| j  | j  | j  | j  | j	  d}t
| j}t| j D ]\}}|  | }| }| }|jrUJ |jrZJ |jr_J | |||_| |||_t|try||  || j|< || j|< || j|< | j|| _| j|| _| j|| _d| td|d  }|d7 }td|| j|< | jj| j	|< |j|j_q<| jdur| j  dS dS )zRecalculate resource usages.r         ?   N)r   r0   r1   r2   r3   clearr4   r5   r;   r<   lenr*   reverseditemsupdate_resource_usagecurrent_processor_usagerunning_processor_usagepending_processor_usagerM   rg   r\   r   addextra_resource_usagemaxmin_metricsobj_store_mem_usedr=   update_usages)	rH   num_ops_so_farnum_ops_totalr&   rZ   op_usageop_running_usageop_pending_usagefr'   r'   r(   rx      sZ   













zResourceManager.update_usagesc                 C      | j S )z5Return the global resource usage at the current time.)r0   rH   r'   r'   r(   get_global_usage      z ResourceManager.get_global_usagec                 C   r   )z=Return the global running resource usage at the current time.)r1   r   r'   r'   r(   get_global_running_usage   r   z(ResourceManager.get_global_running_usagec                 C   r   )z=Return the global pending resource usage at the current time.)r2   r   r'   r'   r(   get_global_pending_usage   r   z(ResourceManager.get_global_pending_usagec                 C   sh   t   | j | jk r| jS t   | _| jj}| jj}|  }| j}| j	|9  _	|
||| _| jS )a  Return the global resource limits at the current time.

        This method autodetects any unspecified execution resource limits based on the
        current cluster size, refreshing these values periodically to support cluster
        autoscaling.
        )timer/   GLOBAL_LIMITS_UPDATE_INTERVAL_Sr.   r+   resource_limitsexclude_resourcesr,   rF   rM   ru   subtract)rH   default_limitsexcludetotal_resourcesdefault_mem_fractionr'   r'   r(   get_global_limits   s   
z!ResourceManager.get_global_limitsc                 C   s
   | j | S )zDReturn the resource usage of the given operator at the current time.)r3   rH   r&   r'   r'   r(   get_op_usage
     
zResourceManager.get_op_usagec                 C   s  | j | jdd}| j | jr|d| j | jdd7 }|d| j |   d7 }| jr|dt| j|  dt| j|  d7 }t| j	t
r|| j	jv r| j	j| }|d	|jd7 }|d
|jd7 }|d|  7 }t| j	j|d}|d| d7 }|S )zbReturn a human-readable string representation of the resource usage of
        the given operator.rO   z CPUz, z GPUz object storez (in=z,out=)z, budget=(cpu=z,gpu=z,obj_store=r   )r4   cpugpuobject_store_memory_strr:   r   r7   r8   r\   r=   r@   _op_budgets_output_budgetsrU   )rH   r&   	usage_strbudgetreserved_for_outputr'   r'   r(   get_op_usage_str  s0   
z ResourceManager.get_op_usage_strc                 C   s
   | j duS )z.Return whether OpResourceAllocator is enabled.Nr=   r   r'   r'   r(   rC   +  r   z-ResourceManager.op_resource_allocator_enabledOpResourceAllocatorc                 C   s   | j dusJ | j S )zReturn the OpResourceAllocator.Nr   r   r'   r'   r(   op_resource_allocator/  s   z%ResourceManager.op_resource_allocatorc                 C      | j du rdS | j |S zqReturn the maximum bytes of pending task outputs can be read for
        the given operator. None means no limit.N)r=   max_task_output_bytes_to_readr   r'   r'   r(   r   5     
z-ResourceManager.max_task_output_bytes_to_readc                 C   r   z_Return the budget for the given operator, or None if the operator
        has unlimited budget.N)r=   
get_budgetr   r'   r'   r(   r   <  r   zResourceManager.get_budgetN)r[   r   )__name__
__module____qualname____doc__r   rD   rE   r   r   r   r   rJ   rG   r6   rg   rx   r   r   r   r   r   r   strr   boolrC   propertyr   r
   r   r   r'   r'   r'   r(   r   &   sD    

8
Br   c                   @   s^   e Zd ZdZdefddZedd Zedede	e
 fd	d
Zedede	e fddZdS )r   a  An interface for dynamic operator resource allocation.

    This interface allows dynamically allocating available resources to each operator,
    limiting how many tasks each operator can submit, and how much data each operator
    can read from its running tasks.
    resource_managerc                 C   s
   || _ d S r"   )_resource_manager)rH   r   r'   r'   r(   rJ   L  s   
zOpResourceAllocator.__init__c                 C      dS )z#Callback to update resource usages.Nr'   r   r'   r'   r(   rx   O  s   z!OpResourceAllocator.update_usagesr&   r[   c                 C   r   r   r'   r   r'   r'   r(   r   T     z1OpResourceAllocator.max_task_output_bytes_to_readc                 C   r   r   r'   r   r'   r'   r(   r   Z  r   zOpResourceAllocator.get_budgetN)r   r   r   r   r   rJ   r   rx   r   r
   r6   r   r   r   r'   r'   r'   r(   r   D  s    
r   c                       s   e Zd ZdZG dd dZdedef fddZded	e	fd
dZ
d	ee fddZdd Zded	ee fddZded	e	fddZded	efddZded	ee fddZded	ee fddZded	ee fddZdd Z  ZS )r@   a  An OpResourceAllocator implementation that reserves resources for each operator.

    This class reserves memory and CPU resources for eligible operators, and considers
    runtime resource usages to limit the resources that each operator can use.

    It works in the following way:
    1. An operator is eligible for resource reservation, if it has enabled throttling
       and hasn't completed. Ineligible operators are not throttled, but
       their usage will be accounted for their upstream eligible operators. E.g., for
       such a dataset "map1->limit->map2->streaming_split", we'll treat "map1->limit" as
       a group and "map2->streaming_split" as another group.
    2. For each eligible operator, we reserve `reservation_ratio * global_resources /
        num_eligible_ops` resources, half of which is reserved only for the operator
        outputs, excluding pending task outputs.
    3. Non-reserved resources are shared among all operators.
    4. In each scheduling iteration, each eligible operator will get "remaining of their
       own reserved resources" + "remaining of shared resources / num_eligible_ops"
       resources.

    The `reservation_ratio` is set to 50% by default. Users can tune this value to
    adjust how aggressive or conservative the resource allocation is. A higher value
    will make the resource allocation more even, but may lead to underutilization and
    worse performance. And vice versa.
    c                   @   sH   e Zd ZdZdZdZdZdd Zdefdd	Z	e
ded
efddZdS )z+ReservationOpResourceAllocator.IdleDetectora  Utility class for detecting idle operators.

        Note, stalling can happen when there are less resources than Data executor
        expects. E.g., when some resources are preempted by non-Data code, see
        `test_no_deadlock_on_resource_contention` as an example.

        This class is used to detect potential stalling and allow the execution
        to make progress.
        g      $@g      N@Fc                 C   s*   t t| _t dd | _t dd | _d S )Nc                   S      t   S r"   r   r'   r'   r'   r(   <lambda>      zFReservationOpResourceAllocator.IdleDetector.__init__.<locals>.<lambda>c                   S   r   r"   r   r'   r'   r'   r(   r     r   )r   r6   last_num_outputslast_output_timelast_detection_timer   r'   r'   r(   rJ     s   
z4ReservationOpResourceAllocator.IdleDetector.__init__r&   c                 C   s|   t   }|| j|  | jkr<|jj}|| j| kr*|| j|< || j|< || j|< dS || j|< | ||| j|   dS dS NTF)r   r   DETECTION_INTERVAL_Sr]   num_task_outputs_generatedr   r   "print_warning_if_idle_for_too_long)rH   r&   cur_timecur_num_outputsr'   r'   r(   detect_idle  s   



z7ReservationOpResourceAllocator.IdleDetector.detect_idle	idle_timec                 C   s:   || j k s| jr
dS d| _d| d| d}t| dS )z4Print a warning if an operator is idle for too long.NTz	Operator z# is running but has no outputs for a   seconds. Execution may be slower than expected.
Ignore this warning if your UDF is expected to be slow. Otherwise, this can happen when there are fewer cluster resources available to Ray Data than expected. If you have non-Data tasks or actors running in the cluster, exclude their resources from Ray Data with `DataContext.get_current().execution_options.exclude_resources`. This message will only print once.)WARN_ON_IDLE_TIME_S_warn_printedrV   rW   )clsr&   r   msgr'   r'   r(   r     s   zNReservationOpResourceAllocator.IdleDetector.print_warning_if_idle_for_too_longN)r   r   r   r   r   r   r   rJ   r   r   classmethodfloatr   r'   r'   r'   r(   IdleDetector{  s    r   r   reservation_ratioc                    sf   t  | || _d| j  krdksJ  J i | _i | _t | _i | _i | _	i | _
|  | _d S )Ng        rh   )superrJ   _reservation_ratio_op_reserved_reserved_for_op_outputsr   r-   _total_sharedr   r   _reserved_min_resourcesr   _idle_detector)rH   r   r   	__class__r'   r(   rJ     s   	
z'ReservationOpResourceAllocator.__init__r&   r[   c                 C   s   |   o	|  S )z2Whether the op is eligible for memory reservation.)throttling_disabledexecution_finishedr   r'   r'   r(   _is_op_eligible  s   
z.ReservationOpResourceAllocator._is_op_eligiblec                    s    fdd j jD S )Nc                    s   g | ]	}  |r|qS r'   )r   r$   r   r'   r(   
<listcomp>  s
    
zDReservationOpResourceAllocator._get_eligible_ops.<locals>.<listcomp>)r   r*   r   r'   r   r(   _get_eligible_ops  s   
z0ReservationOpResourceAllocator._get_eligible_opsc                 C   sV  | j  }|  }| j  | j  | j  | }t|dkr$d S |	| j
t| }t|D ]s\}}tddt|jd d}| \}}	||}
|
|}
|
|	}
|
|j|ddrgd| j|< n"d| j|< tdd|j}
|dkrtdt|  rtd| d	 |
| j|< |j| j|< |
|}||}|t }q2|| _d S )
Nr      ri   T)ignore_object_store_memoryFlow_resource_warning_z7Cluster resources are not engough to run any task from z8. The job may hang forever unless the cluster scales up.)r   r   r   r   rj   r   r   copyrk   scaler   	enumerater   rt   rM   min_max_resource_requirementsr   ru   rr   satisfies_limitr   idrV   rW   r-   r   )rH   global_limitseligible_ops	remainingdefault_reservedindexr&   reserved_for_outputsmin_resource_usagemax_resource_usagereserved_for_tasksop_total_reservedr'   r'   r(   _update_reservation  sJ   








	




z2ReservationOpResourceAllocator._update_reservationc                 C   s   | j |d S r"   )r   rU   r   r'   r'   r(   r   +  s   z)ReservationOpResourceAllocator.get_budgetc                 C   s6   |  |D ]}| j| s dS | j|r dS qdS r   )_get_downstream_eligible_opsr   r   r   rH   r&   rf   r'   r'   r(   -_should_unblock_streaming_output_backpressure.  s   
zLReservationOpResourceAllocator._should_unblock_streaming_output_backpressurec                    s0    j j| }|t fdd |D 7 }|S )zsGet the outputs memory usage of the given operator, including the downstream
        ineligible operators.
        c                 3   s    | ]
} j |jV  qd S r"   )r   r   rM   r%   rf   r   r'   r(   r)   I  s
    
zWReservationOpResourceAllocator._get_op_outputs_usage_with_downstream.<locals>.<genexpr>)r   r8   sum_get_downstream_ineligible_ops)rH   r&   op_outputs_usager'   r   r(   %_get_op_outputs_usage_with_downstreamB  s
   
zDReservationOpResourceAllocator._get_op_outputs_usage_with_downstreamc                 C   s   || j vrd S | j | j}| |}|t| j| | d7 }t|r*|| j|< d S t|}|dks4J |dkr?| 	|r?d}|| j|< |S )Nr   ri   )
r   rM   r   rt   r   mathisinfr   r6   r   )rH   r&   resr   r'   r'   r(   r   O  s   




z<ReservationOpResourceAllocator.max_task_output_bytes_to_readc                 c   s2    |j D ]}| |s|V  | |E dH  qdS )zGet the downstream ineligible operators of the given operator.

        E.g.,
          - "cur_map->downstream_map" will return an empty list.
          - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
        N)ra   r   r   r   r'   r'   r(   r   a  s   
	
z=ReservationOpResourceAllocator._get_downstream_ineligible_opsc                 c   s4    |j D ]}| |r|V  q| |E dH  qdS )a   Get the downstream eligible operators of the given operator, ignoring
        intermediate ineligible operators.

        E.g.,
          - "cur_map->downstream_map" will return [downstream_map].
          - "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
        N)ra   r   r   r   r'   r'   r(   r   o  s   


z;ReservationOpResourceAllocator._get_downstream_eligible_opsc                 C   s  |    | j  |  }t|dkrd S | j}|D ]K}d}|| jj| 7 }| |}|t	|| j
|  d7 }| j| }||_| j| }||	t }|| j|< ||	t }	||	}q|	t }tt|D ]V\}
}|dt||
  }| | j| |	t }| s|||r||}||}| sJ ||||f| j| || j|< td| j| _qs|D ]}tdd |jD rtd| j| _qd S )Nr   rh   infc                 s   s    | ]}t |tV  qd S r"   )r\   MATERIALIZING_OPERATORSr   r'   r'   r(   r)     s
    
z?ReservationOpResourceAllocator.update_usages.<locals>.<genexpr>)r   r   rj   r   rk   r   r   r7   r   rt   r   r   r   rM   r   r   r   r-   r   rl   r   incremental_resource_usagerr   is_zeror   is_non_negativer   r   anyra   )rH   r   remaining_sharedr&   op_mem_usager   r{   op_reservedop_reserved_remainingop_reserved_exceededi	op_shared	to_borrowr'   r'   r(   rx     sb   









z,ReservationOpResourceAllocator.update_usages)r   r   r   r   r   r   r   rJ   r   r   r   r	   r   r   r
   r   r   r   r   r6   r   r   r   r   rx   __classcell__r'   r'   r   r(   r@   a  s4    ?	E


r@   )/loggingr   osr   abcr   r   collectionsr   typingr   r   r   r   r	   r
   9ray.data._internal.execution.interfaces.execution_optionsr   r   9ray.data._internal.execution.interfaces.physical_operatorr   r   =ray.data._internal.execution.operators.base_physical_operatorr   8ray.data._internal.execution.operators.input_data_bufferr   3ray.data._internal.execution.operators.zip_operatorr   !ray.data._internal.execution.utilr   rQ   r   rR   r   5ray.data._internal.execution.streaming_executor_stater   r   	getLoggerr   rV   environrU   r9   r   r   r   r@   r'   r'   r'   r(   <module>   s2     
   