o
    -wi                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	m
Z
mZmZmZmZ edZdd	d
ZG dd deZG dd dZG dd dZejdddZejdddZdS )z)Functions for compatibility with asyncio.    )annotationsN)AnyAsyncIteratorCallable	CoroutineIteratorTypeVar_Tfn%Callable[[], Coroutine[Any, Any, _T]]returnc              	   C  sd   t jjdd!}t }||j| }z| W |  W  d   S |  w 1 s+w   Y  dS )a  Run `fn` in an asyncio loop in a new thread.

    This must always be used instead of `asyncio.run` which fails if there is
    an active `asyncio` event loop in the current thread. Since `wandb` was not
    originally designed with `asyncio` in mind, using `asyncio.run` would break
    users who were calling `wandb` methods from an `asyncio` loop.

    Note that due to starting a new thread, this is slightly slow.
       )max_workersN)
concurrentfuturesThreadPoolExecutor_Runnersubmitrunresultcancel)r
   executorrunnerfuture r   Y/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/wandb/sdk/lib/asyncio_compat.pyr      s   

r   c                   @  s   e Zd ZdZdS )_RunnerCancelledErrorz-The `_Runner.run()` invocation was cancelled.N)__name__
__module____qualname____doc__r   r   r   r   r   $   s    r   c                   @  s8   e Zd ZdZdddZdd	d
ZdddZdddZdS )r   a  Runs an asyncio event loop allowing cancellation.

    This is like `asyncio.run()`, except it provides a `cancel()` method
    meant to be called in a `finally` block.

    Without this, it is impossible to make `asyncio.run()` stop if it runs
    in a non-main thread. In particular, a KeyboardInterrupt causes the
    ThreadPoolExecutor above to block until the asyncio thread completes,
    but there is no way to tell the asyncio thread to cancel its work.
    A second KeyboardInterrupt makes ThreadPoolExecutor give up while the
    asyncio thread still runs in the background, with terrible effects if it
    prints to the user's terminal.
    r   Nonec                 C  s,   t  | _d| _d| _d| _d | _d | _d S )NF)	threadingLock_lock_is_cancelled_started_done_loop_cancel_eventselfr   r   r   __init__7   s   

z_Runner.__init__r
   r   r	   c                 C  s   t | |S )zRun a coroutine in asyncio, cancelling it on `cancel()`.

        Returns:
            The result of the coroutine returned by `fn`.

        Raises:
            _RunnerCancelledError: If `cancel()` is called.
        )asyncior   _run_or_cancel)r+   r
   r   r   r   r   A   s   	z_Runner.runc                   s  | j  | jrt t | _t | _d| _W d    n1 s"w   Y  t	| j
 }t	| }z7tj
||gtjdI d H  | rj| W |  |  | j  d| _W d    S 1 sdw   Y  S t |  |  | j  d| _W d    w 1 sw   Y  w )NTreturn_when)r$   r%   r   r-   get_running_loopr(   Eventr)   r&   create_taskwaitFIRST_COMPLETEDdoner   r   r'   )r+   r
   cancellation_taskfn_taskr   r   r   r.   L   s8   

z_Runner._run_or_cancelc                 C  s   | j 9 | jr	 W d   dS d| _| js| js"	 W d   dS | js'J | js,J | j| jj W d   dS 1 s?w   Y  dS )z+Cancel all asyncio work started by `run()`.NT)r$   r%   r'   r&   r(   r)   call_soon_threadsafesetr*   r   r   r   r   m   s   

"z_Runner.cancelNr   r!   r
   r   r   r	   )r   r   r   r    r,   r   r.   r   r   r   r   r   r   (   s    



!r   c                   @  s8   e Zd ZdZdddZddd	Zdd
dZdddZdS )	TaskGroupz'Object that `open_task_group()` yields.r   r!   c                 C  s
   g | _ d S )N)_tasksr*   r   r   r   r,      s   
zTaskGroup.__init__coroCoroutine[Any, Any, Any]c                 C  s   | j t| dS )zSchedule a task in the group.

        Args:
            coro: The return value of the `async` function defining the task.
        N)r>   appendr-   r3   )r+   r?   r   r   r   
start_soon   s   zTaskGroup.start_soonc              	     sV   t j| jtjjdI dH \}}|D ]}z
|  }r|W q t jy(   Y qw dS )zBlock until all tasks complete.

        Raises:
            Exception: If one or more tasks raises an exception, one of these
                is raised arbitrarily.
        r/   N)r-   r4   r>   r   r   FIRST_EXCEPTION	exceptionCancelledError)r+   r6   _taskexcr   r   r   	_wait_all   s   zTaskGroup._wait_allc                 C  s   | j D ]}|  qdS )zCancel all tasks.N)r>   r   )r+   rG   r   r   r   _cancel_all   s   

zTaskGroup._cancel_allNr;   )r?   r@   r   r!   )r   r   r   r    r,   rB   rI   rJ   r   r   r   r   r=      s    


r=   AsyncIterator[TaskGroup]c                  C s6   t  } z| V  |  I dH  W |   dS |   w )ae  Create a task group.

    `asyncio` gained task groups in Python 3.11.

    This is an async context manager, meant to be used with `async with`.
    On exit, it blocks until all subtasks complete. If any subtask fails, or if
    the current task is cancelled, it cancels all subtasks in the group and
    raises the subtask's exception. If multiple subtasks fail simultaneously,
    one of their exceptions is chosen arbitrarily.

    NOTE: Subtask exceptions do not propagate until the context manager exits.
    This means that the task group cannot cancel code running inside the
    `async with` block .
    N)r=   rI   rJ   )
task_groupr   r   r   open_task_group   s   rM   r?   r@   Iterator[None]c                 c  s\    t | }zdV  W | r|  }r||  dS | r)|  }r)||  w )zSchedule a task, cancelling it when exiting the context manager.

    If the given coroutine raises an exception, that exception is raised
    when exiting the context manager.
    N)r-   r3   r6   rD   r   )r?   rG   rD   r   r   r   cancel_on_exit   s   

rO   r<   )r   rK   )r?   r@   r   rN   )r    
__future__r   r-   r   concurrent.futures
contextlibr"   typingr   r   r   r   r   r   r	   r   	Exceptionr   r   r=   asynccontextmanagerrM   contextmanagerrO   r   r   r   r   <module>   s"     
X*