o
    $iP$                     @   s
  d dl Z d dlmZmZmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZmZ e eZedZedZeeee e	gee f eee e	egee f edee f f ZeG d	d
 d
ZeG dd deZeG dd deZdeeef defddZdS )    N)AnyCallableIterableOptionalTypeVarUnion)TaskContext)BlockUserDefinedFunction)DeveloperAPI	PublicAPITU.c                   @   s   e Zd ZdS )ComputeStrategyN)__name__
__module____qualname__ r   r   W/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/compute.pyr      s    r   c                   @   sF   e Zd ZdZ	ddee fddZdedefdd	Z	de
fd
dZdS )TaskPoolStrategya;  Specify the task-based compute strategy for a Dataset transform.

    TaskPoolStrategy executes dataset transformations using Ray tasks that are
    scheduled through a pool. Provide ``size`` to cap the number of concurrent
    tasks; leave it unset to allow Ray Data to scale the task count
    automatically.
    Nsizec                 C   s$   |dur|dk rt d||| _dS )zConstruct TaskPoolStrategy for a Dataset transform.

        Args:
            size: Specify the maximum size of the task pool.
        N   z`size` must be >= 1)
ValueErrorr   )selfr   r   r   r   __init__)   s   


zTaskPoolStrategy.__init__otherreturnc                 C   s(   t |tr| j|jkp|dko| jd u S )Ntasks)
isinstancer   r   r   r   r   r   r   __eq__7   s   zTaskPoolStrategy.__eq__c                 C   s   d| j  dS )NzTaskPoolStrategy(size=))r   r   r   r   r   __repr__<   s   zTaskPoolStrategy.__repr__N)r   r   r   __doc__r   intr   r   boolr    strr#   r   r   r   r   r      s    

r   c                   @   sv   e Zd ZdZddddddddee dee dee dee d	ee d
efddZdedefddZ	de
fddZdS )ActorPoolStrategyae	  Specify the actor-based compute strategy for a Dataset transform.

    ActorPoolStrategy specifies that an autoscaling pool of actors should be used
    for a given Dataset transform. This is useful for stateful setup of callable
    classes.

    For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``.

    To autoscale from ``m`` to ``n`` actors, use
    ``ActorPoolStrategy(min_size=m, max_size=n)``.

    To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use
    ``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``.

    To increase opportunities for pipelining task dependency prefetching with
    computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
    to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker
    actors, set max_tasks_in_flight_per_actor to 1.

    The `enable_true_multi_threading` argument primarily exists to prevent GPU OOM issues with multi-threaded actors.
    The life cycle of an actor task involves 3 main steps:

        1. Batching Inputs
        2. Running actor UDF
        3. Batching Outputs

    The `enable_true_multi_threading` flag affects step 2. If set to `True`, then the UDF can be run concurrently.
    By default, it is set to `False`, so at most 1 actor UDF is running at a time per actor. The `max_concurrency`
    flag on `ray.remote` affects steps 1 and 3. Below is a matrix summary:

    - [`enable_true_multi_threading=False or True`, `max_concurrency=1`] = 1 actor task running per actor. So at most 1
        of steps 1, 2, or 3 is running at any point in time.
    - [`enable_true_multi_threading=False`, `max_concurrency>1`] = multiple tasks running per actor
      (respecting GIL) but UDF runs 1 at a time. This is useful for doing CPU and GPU work,
      where you want to use a large batch size but want to hide the overhead of *batching*
      the inputs. In this case, CPU *batching* is done concurrently, while GPU *inference*
      is done 1 at a time. Concretely, steps 1 and 3 can have multiple threads, while step 2 is done serially.
    - [`enable_true_multi_threading=True`, `max_concurrency>1`] = multiple tasks running per actor.
      Unlike bullet #3 ^, the UDF runs concurrently (respecting GIL). No restrictions on steps 1, 2, or 3

    NOTE: `enable_true_multi_threading` does not apply to async actors
    NF)r   min_sizemax_sizeinitial_sizemax_tasks_in_flight_per_actorenable_true_multi_threadingr   r*   r+   r,   r-   r.   c                C   s<  |dur#|dk rt d||dus|dus|durt d|}|}|}|dur0|dk r0t d||durD|du r:d}||krDt d|||durQ|dk rQt d||pTd| _|p[td| _|dur|| jk rrt d	| d
| j d| jtdkr|| jkrt d	| d| j d|p| j| _|| _d| _d| _|| _dS )a  Construct ActorPoolStrategy for a Dataset transform.

        Args:
            size: Specify a fixed size actor pool of this size. It is an error to
                specify both `size` and `min_size` or `max_size`.
            min_size: The minimum size of the actor pool.
            max_size: The maximum size of the actor pool.
            initial_size: The initial number of actors to start with. If not specified,
                defaults to min_size. Must be between min_size and max_size.
            max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently
                send to a single actor worker. Increasing this will increase
                opportunities for pipelining task dependency prefetching with
                computation and avoiding actor startup delays, but will also increase
                queueing delay.
            enable_true_multi_threading: If enable_true_multi_threading=True, no more than 1 actor task
                runs per actor. Otherwise, respects the `max_concurrency` argument.
        Nr   zsize must be >= 1zMmin_size, max_size, and initial_size cannot be set at the same time as `size`zmin_size must be >= 1zmin_size must be <= max_sizez1max_tasks_in_flight_per_actor must be >= 1, got: infzinitial_size (z) must be >= min_size (r!   z) must be <= max_size (r   g?)	r   r*   floatr+   r,   r-   num_workersready_to_total_workers_ratior.   )r   r   r*   r+   r,   r-   r.   r   r   r   r   m   sN   




zActorPoolStrategy.__init__r   r   c                 C   sF   t |to"| j|jko"| j|jko"| j|jko"| j|jko"| j|jkS r$   )r   r)   r*   r+   r,   r.   r-   r   r   r   r   r       s   



zActorPoolStrategy.__eq__c                 C   s>   d| j  d| j d| j d| j d| j d| j d| j dS )	NzActorPoolStrategy(min_size=z, max_size=z, initial_size=z , max_tasks_in_flight_per_actor=z)num_workers=z, enable_true_multi_threading=z, ready_to_total_workers_ratio=r!   )r*   r+   r,   r-   r1   r.   r2   r"   r   r   r   r#      s   
zActorPoolStrategy.__repr__)r   r   r   r%   r   r&   r'   r   r   r    r(   r#   r   r   r   r   r)   @   s.    .
I
r)   compute_specr   c                 C   sT   t | ttfstd|  d| r| dkrt S | dkrt S t | tr&| S td)NzXIn Ray 2.5, the compute spec must be either TaskPoolStrategy or ActorPoolStrategy, was: .r   actorsz;compute must be one of [`tasks`, `actors`, ComputeStrategy])r   r   r)   r   r   )r3   r   r   r   get_compute   s   
r6   )loggingtypingr   r   r   r   r   r   'ray.data._internal.execution.interfacesr   ray.data.blockr	   r
   ray.util.annotationsr   r   	getLoggerr   loggerr   r   BlockTransformr   r   r)   r(   r6   r   r   r   r   <module>   s,     
  