o
    bi                     @   s  d dl Z d dlmZmZmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZ e eZedZedZeeee e	gee f eee e	egee f edee f f ZeG d	d
 d
ZeG dd deZG dd deZdeeef defddZdS )    N)AnyCallableIterableOptionalTypeVarUnion)TaskContext)BlockUserDefinedFunction)DeveloperAPITU.c                   @   s   e Zd ZdS )ComputeStrategyN)__name__
__module____qualname__ r   r   N/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/compute.pyr      s    r   c                   @   sB   e Zd Z	ddee fddZdedefddZde	fd	d
Z
dS )TaskPoolStrategyNsizec                 C   s$   |dur|dk rt d||| _dS )zConstruct TaskPoolStrategy for a Dataset transform.

        Args:
            size: Specify the maximum size of the task pool.
        N   z`size` must be >= 1)
ValueErrorr   )selfr   r   r   r   __init__!   s   


zTaskPoolStrategy.__init__otherreturnc                 C   s(   t |tr| j|jkp|dko| jd u S )Ntasks)
isinstancer   r   r   r   r   r   r   __eq__/   s   zTaskPoolStrategy.__eq__c                 C   s   d| j  dS )NzTaskPoolStrategy(size=))r   r   r   r   r   __repr__4   s   zTaskPoolStrategy.__repr__N)r   r   r   r   intr   r   boolr   strr"   r   r   r   r   r      s    
r   c                
   @   sf   e Zd ZdZddddddee dee dee dee fdd	Zd
edefddZ	de
fddZdS )ActorPoolStrategya  Specify the compute strategy for a Dataset transform.

    ActorPoolStrategy specifies that an autoscaling pool of actors should be used
    for a given Dataset transform. This is useful for stateful setup of callable
    classes.

    For a fixed-sized pool of size ``n``, specify ``compute=ActorPoolStrategy(size=n)``.
    To autoscale from ``m`` to ``n`` actors, specify
    ``ActorPoolStrategy(min_size=m, max_size=n)``.

    To increase opportunities for pipelining task dependency prefetching with
    computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
    to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker
    actors, set max_tasks_in_flight_per_actor to 1.
    N)r   min_sizemax_sizemax_tasks_in_flight_per_actorr   r(   r)   r*   c                C   s   |dur|dk rt d||dus|durt d|}|}|dur*|dk r*t d||dur>|du r4d}||kr>t d|||durK|dk rKt d||pNd| _|pUtd| _|| _d	| _d
| _dS )a  Construct ActorPoolStrategy for a Dataset transform.

        Args:
            size: Specify a fixed size actor pool of this size. It is an error to
                specify both `size` and `min_size` or `max_size`.
            min_size: The minimum size of the actor pool.
            max_size: The maximum size of the actor pool.
            max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently
                send to a single actor worker. Increasing this will increase
                opportunities for pipelining task dependency prefetching with
                computation and avoiding actor startup delays, but will also increase
                queueing delay.
        Nr   zsize must be >= 1z>min_size and max_size cannot be set at the same time as `size`zmin_size must be >= 1zmin_size must be <= max_sizez1max_tasks_in_flight_per_actor must be >= 1, got: infr   g?)r   r(   floatr)   r*   num_workersready_to_total_workers_ratio)r   r   r(   r)   r*   r   r   r   r   I   s6   



zActorPoolStrategy.__init__r   r   c                 C   s.   t |to| j|jko| j|jko| j|jkS r#   )r   r'   r(   r)   r*   r   r   r   r   r   |   s   

zActorPoolStrategy.__eq__c                 C   s.   d| j  d| j d| j d| j d| j dS )NzActorPoolStrategy(min_size=z, max_size=z , max_tasks_in_flight_per_actor=z)num_workers=z, ready_to_total_workers_ratio=r    )r(   r)   r*   r-   r.   r!   r   r   r   r"      s   
zActorPoolStrategy.__repr__)r   r   r   __doc__r   r$   r   r   r%   r   r&   r"   r   r   r   r   r'   8   s"    
3r'   compute_specr   c                 C   sT   t | ttfstd|  d| r| dkrt S | dkrt S t | tr&| S td)NzXIn Ray 2.5, the compute spec must be either TaskPoolStrategy or ActorPoolStrategy, was: .r   actorsz;compute must be one of [`tasks`, `actors`, ComputeStrategy])r   r   r'   r   r   )r0   r   r   r   get_compute   s   
r3   )loggingtypingr   r   r   r   r   r   'ray.data._internal.execution.interfacesr   ray.data.blockr	   r
   ray.util.annotationsr   	getLoggerr   loggerr   r   BlockTransformr   r   r'   r&   r3   r   r   r   r   <module>   s(     
V