o
    b²“i‡  ã                   @   s.   d dl mZmZmZmZmZ G dd„ dƒZdS )é    )ÚAnyÚCallableÚListÚOptionalÚTuplec                   @   sŠ   e Zd ZdZddœdedeed gdf  fdd„Zdd	„ Zd
d„ Z	e
defdd„ƒZe
defdd„ƒZdeee  fdd„Zddd„ZdS )ÚBarrieraž  Barrier to collect results and process them in bulk.

    A barrier can be used to collect multiple results and process them in bulk once
    a certain count or a timeout is reached.

    For instance, if ``max_results=N``, the ``on_completion`` callback will be
    invoked once :meth:`arrive` has been called ``N`` times.

    The completion callback will only be invoked once, even if more results
    arrive after completion. The collected results can be resetted
    with :meth:`reset`, after which the callback may be invoked again.

    The completion callback should expect one argument, which is the barrier
    object that completed.

    Args:
        max_results: Maximum number of results to collect before a call to
            :meth:`wait` resolves or the :meth:`on_completion` callback is invoked.
        on_completion: Callback to invoke when ``max_results`` results
            arrived at the barrier.

    N)Úon_completionÚmax_resultsr   c                C   s   || _ d| _|| _g | _d S )NF)Ú_max_resultsÚ
_completedÚ_on_completionÚ_results)Úselfr	   r   © r   úW/home/ubuntu/.local/lib/python3.10/site-packages/ray/air/execution/_internal/barrier.pyÚ__init__   s   
zBarrier.__init__c                 G   s,   t |ƒdkr
|d }| j |¡ |  ¡  dS )a,  Notify barrier that a result successfully arrived.

        This will count against the ``max_results`` limit. The received result
        will be included in a call to :meth:`get_results`.

        Args:
            *data: Result data to be cached. Can be obtained via :meth:`get_results`.

        é   r   N)Úlenr   ÚappendÚ_check_completion)r   Údatar   r   r   Úarrive+   s   
zBarrier.arrivec                 C   s8   | j rd S | j| jkrd| _ | jr|  | ¡ d S d S d S )NT)r   Únum_resultsr
   r   ©r   r   r   r   r   ;   s   ûzBarrier._check_completionÚreturnc                 C   ó   | j S )z)Returns True if the barrier is completed.)r   r   r   r   r   Ú	completedG   s   zBarrier.completedc                 C   s
   t | jƒS )z(Number of received (successful) results.)r   r   r   r   r   r   r   L   s   
zBarrier.num_resultsc                 C   r   )z Return list of received results.)r   r   r   r   r   Úget_resultsQ   s   zBarrier.get_resultsc                 C   s   d| _ g | _dS )a  Reset barrier, removing all received results.

        Resetting the barrier will reset the completion status. When ``max_results``
        is set and enough new events arrive after resetting, the
        :meth:`on_completion` callback will be invoked again.
        FN)r   r   r   r   r   r   ÚresetU   s   
zBarrier.reset)r   N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__Úintr   r   r   r   r   ÚpropertyÚboolr   r   r   r   r   r   r   r   r   r   r   r      s     üþ
ür   N)Útypingr   r   r   r   r   r   r   r   r   r   Ú<module>   s    