o
    biE                     @   sf   d dl Z d dlmZmZmZmZmZmZmZm	Z	 d dl
Z
eegdf Zeegdf ZG dd dZdS )    N)AnyCallableDictIterableOptionalSetTupleUnionc                   @   s   e Zd ZdZddefddZ		ddejdee	 d	ee
 fd
dZ		ddeej dee	 d	ee
 fddZdejfddZdeej fddZedefddZdejfddZ		ddeeeef  dee fddZdS ) RayEventManageraO  Event manager for Ray futures.

    The event manager can be used to track futures and invoke callbacks when
    they resolve.

    Futures are tracked with :meth:`track_future`. Future can then be awaited with
    :meth:`wait`. When futures successfully resolve, they trigger an optional
    ``on_result`` callback that can be passed to :meth:`track_future`. If they
    fail, they trigger an optional ``on_error`` callback.

    Args:
        shuffle_futures: If True, futures will be shuffled before awaited. This
            will avoid implicit prioritization of futures within Ray.
    Tshuffle_futuresc                 C   s   || _ i | _d S N)_shuffle_futures_tracked_futures)selfr    r   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py__init__   s   zRayEventManager.__init__Nfuture	on_resulton_errorc                 C   s   ||f| j |< dS )a  Track a single future and invoke callbacks on resolution.

        Control has to be yielded to the event manager for the callbacks to
        be invoked, either via :meth:`wait` or via :meth:`resolve_future`.

        Args:
            future: Ray future to await.
            on_result: Callback to invoke when the future resolves successfully.
            on_error: Callback to invoke when the future fails.

        N)r   )r   r   r   r   r   r   r   track_future"   s   zRayEventManager.track_futurefuturesc                 C   s   |D ]
}| j |||d qdS )a  Track multiple futures and invoke callbacks on resolution.

        Control has to be yielded to the event manager for the callbacks to
        be invoked, either via :meth:`wait` or via :meth:`resolve_future`.

        Args:
            futures: Ray futures to await.
            on_result: Callback to invoke when the future resolves successfully.
            on_error: Callback to invoke when the future fails.

        )r   r   N)r   )r   r   r   r   r   r   r   r   track_futures5   s   zRayEventManager.track_futuresc                 C   s   | j |d dS )zRemove future from tracking.

        The future will not be awaited anymore, and it will not trigger any callbacks.

        Args:
            future: Ray futures to discard.
        N)r   pop)r   r   r   r   r   discard_futureI   s   zRayEventManager.discard_futurereturnc                 C   
   t | jS )z)Get futures tracked by the event manager.)setr   r   r   r   r   get_futuresS      
zRayEventManager.get_futuresc                 C   r   r   )lenr   r   r   r   r   num_futuresW   r    zRayEventManager.num_futuresc              
   C   s   z
| j |\}}W n ty } z	td| d|d}~ww zt|}W n tyC } z|r6|| n|W Y d}~dS d}~ww |rL|| dS dS )a  Resolve a single future.

        This method will block until the future is available. It will then
        trigger the callback associated to the future and the event (success
        or error), if specified.

        Args:
            future: Ray future to resolve.

        zFuture z' is not tracked by this RayEventManagerN)r   r   KeyError
ValueErrorrayget	Exception)r   r   r   r   eresultr   r   r   resolve_future[   s,   

zRayEventManager.resolve_future   timeoutnum_resultsc                 C   sX   t |  }| jrt| |pt|}tjt |||d\}}|D ]}| | q"dS )a8  Wait up to ``timeout`` seconds for ``num_results`` futures to resolve.

        If ``timeout=None``, this method will block until all `num_results`` futures
        resolve. If ``num_results=None``, this method will await all tracked futures.

        For every future that resolves, the respective associated callbacks will be
        invoked.

        Args:
            timeout: Timeout in second to wait for futures to resolve.
            num_results: Number of futures to await. If ``None``, will wait for
                all tracked futures to resolve.

        )r,   num_returnsN)	listr   r   randomshuffler!   r%   waitr*   )r   r,   r-   r   ready_r   r   r   r   r2   x   s   
zRayEventManager.wait)T)NN)Nr+   )__name__
__module____qualname____doc__boolr   r%   	ObjectRefr   _ResultCallback_ErrorCallbackr   r   r   r   r   r   propertyintr"   r*   r	   floatr2   r   r   r   r   r
   
   sB    


r
   )r0   typingr   r   r   r   r   r   r   r	   r%   r;   r'   r<   r
   r   r   r   r   <module>   s    (