o
    $i                     @   s.   d dl mZmZmZmZmZ G dd dZdS )    )AnyCallableListOptionalTuplec                   @   s   e Zd ZdZdddedeed gdf  fddZdd	 Zd
d Z	e
defddZe
defddZdeee  fddZdddZdS )Barriera  Barrier to collect results and process them in bulk.

    A barrier can be used to collect multiple results and process them in bulk once
    a certain count or a timeout is reached.

    For instance, if ``max_results=N``, the ``on_completion`` callback will be
    invoked once :meth:`arrive` has been called ``N`` times.

    The completion callback will only be invoked once, even if more results
    arrive after completion. The collected results can be resetted
    with :meth:`reset`, after which the callback may be invoked again.

    The completion callback should expect one argument, which is the barrier
    object that completed.

    Args:
        max_results: Maximum number of results to collect before a call to
            :meth:`wait` resolves or the :meth:`on_completion` callback is invoked.
        on_completion: Callback to invoke when ``max_results`` results
            arrived at the barrier.

    N)on_completionmax_resultsr   c                C   s   || _ d| _|| _g | _d S )NF)_max_results
_completed_on_completion_results)selfr	   r    r   `/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/air/execution/_internal/barrier.py__init__   s   
zBarrier.__init__c                 G   s,   t |dkr
|d }| j| |   dS )a,  Notify barrier that a result successfully arrived.

        This will count against the ``max_results`` limit. The received result
        will be included in a call to :meth:`get_results`.

        Args:
            *data: Result data to be cached. Can be obtained via :meth:`get_results`.

           r   N)lenr   append_check_completion)r   datar   r   r   arrive+   s   
zBarrier.arrivec                 C   s8   | j rd S | j| jkrd| _ | jr| |  d S d S d S )NT)r   num_resultsr
   r   r   r   r   r   r   ;   s   zBarrier._check_completionreturnc                 C      | j S )z)Returns True if the barrier is completed.)r   r   r   r   r   	completedG   s   zBarrier.completedc                 C   s
   t | jS )z(Number of received (successful) results.)r   r   r   r   r   r   r   L   s   
zBarrier.num_resultsc                 C   r   )z Return list of received results.)r   r   r   r   r   get_resultsQ   s   zBarrier.get_resultsc                 C   s   d| _ g | _dS )a  Reset barrier, removing all received results.

        Resetting the barrier will reset the completion status. When ``max_results``
        is set and enough new events arrive after resetting, the
        :meth:`on_completion` callback will be invoked again.
        FN)r   r   r   r   r   r   resetU   s   
zBarrier.reset)r   N)__name__
__module____qualname____doc__intr   r   r   r   r   propertyboolr   r   r   r   r   r   r   r   r   r   r   r      s     
r   N)typingr   r   r   r   r   r   r   r   r   r   <module>   s    