"""Functions for compatibility with asyncio."""

from __future__ import annotations

import asyncio
import concurrent
import concurrent.futures
import contextlib
import threading
from typing import Any, AsyncIterator, Callable, Coroutine, TypeVar

_T = TypeVar("_T")


def run(fn: Callable[[], Coroutine[Any, Any, _T]]) -> _T:
    """Run `fn` in an asyncio loop in a new thread.

    This must always be used instead of `asyncio.run` which fails if there is
    an active `asyncio` event loop in the current thread. Since `wandb` was not
    originally designed with `asyncio` in mind, using `asyncio.run` would break
    users who were calling `wandb` methods from an `asyncio` loop.

    Note that due to starting a new thread, this is slightly slow.
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        runner = CancellableRunner()
        future = executor.submit(runner.run, fn)

        try:
            return future.result()

        finally:
            runner.cancel()


class RunnerCancelledError(Exception):
    """The `CancellableRunner.run()` invocation was cancelled."""


class CancellableRunner:
    """Runs an asyncio event loop allowing cancellation.

    The `run()` method is like `asyncio.run()`. The `cancel()` method may
    be used in a different thread, for instance in a `finally` block, to cancel
    all tasks, and it is a no-op if `run()` completed.

    Without this, it is impossible to make `asyncio.run()` stop if it runs
    in a non-main thread. In particular, a KeyboardInterrupt causes the
    ThreadPoolExecutor above to block until the asyncio thread completes,
    but there is no way to tell the asyncio thread to cancel its work.
    A second KeyboardInterrupt makes ThreadPoolExecutor give up while the
    asyncio thread still runs in the background, with terrible effects if it
    prints to the user's terminal.
    """

    def __init__(self) -> None:
        self._lock = threading.Lock()

        self._is_cancelled = False
        self._started = False
        self._done = False

        self._loop: asyncio.AbstractEventLoop | None = None
        self._cancel_event: asyncio.Event | None = None

    def run(self, fn: Callable[[], Coroutine[Any, Any, _T]]) -> _T:
        """Run a coroutine in asyncio, cancelling it on `cancel()`.

        Returns:
            The result of the coroutine returned by `fn`.

        Raises:
            RunnerCancelledError: If `cancel()` is called.
        """
        return asyncio.run(self._run_or_cancel(fn))

    async def _run_or_cancel(
        self,
        fn: Callable[[], Coroutine[Any, Any, _T]],
    ) -> _T:
        with self._lock:
            if self._is_cancelled:
                raise RunnerCancelledError()

            self._loop = asyncio.get_running_loop()
            self._cancel_event = asyncio.Event()
            self._started = True

        cancellation_task = asyncio.create_task(self._cancel_event.wait())
        fn_task = asyncio.create_task(fn())

        try:
            await asyncio.wait(
                [cancellation_task, fn_task],
                return_when=asyncio.FIRST_COMPLETED,
            )

            if fn_task.done():
                return fn_task.result()
            else:
                raise RunnerCancelledError()

        finally:
            # NOTE: asyncio.run() cancels all tasks after the main task exits,
            #   but this is not documented, so we cancel them explicitly here
            #   as well. It also blocks until canceled tasks complete.
            cancellation_task.cancel()
            fn_task.cancel()

            with self._lock:
                self._done = True

    def cancel(self) -> None:
        """Cancel all asyncio work started by `run()`."""
        with self._lock:
            if self._is_cancelled:
                return
            self._is_cancelled = True

            if self._done or not self._started:
                # If the runner already finished, no need to cancel it.
                #
                # If the runner hasn't started the loop yet, then it will not
                # as we already set _is_cancelled.
                return

            assert self._loop
            assert self._cancel_event
            self._loop.call_soon_threadsafe(self._cancel_event.set)


class TaskGroup:
    """Object that `open_task_group()` yields."""

    def __init__(self) -> None:
        self._tasks: list[asyncio.Task[None]] = []

    def start_soon(self, coro: Coroutine[Any, Any, Any]) -> None:
        """Schedule a task in the group.

        Args:
            coro: The return value of the `async` function defining the task.
        """
        self._tasks.append(asyncio.create_task(coro))

    async def _wait_all(self, *, race: bool, timeout: float | None) -> None:
        """Block until tasks complete.

        Args:
            race: If true, blocks until the first task completes and then
                cancels the rest. Otherwise, waits for all tasks or until
                the first exception.
            timeout: How long to wait.

        Raises:
            TimeoutError: If the timeout expires.
            Exception: If one or more tasks raises an exception, one of these
                is raised arbitrarily.
        """
        if not self._tasks:
            return

        if race:
            return_when = asyncio.FIRST_COMPLETED
        else:
            return_when = asyncio.FIRST_EXCEPTION

        done, pending = await asyncio.wait(
            self._tasks,
            timeout=timeout,
            return_when=return_when,
        )

        if not done:
            raise TimeoutError(f"Timed out after {timeout} seconds.")

        # If any of the finished tasks raised an exception, pick the first one.
        for task in done:
            if exc := task.exception():
                raise exc

        # Wait for remaining tasks to clean up, then re-raise any exceptions
        # that arise. Note that pending is only non-empty when race=True.
        for task in pending:
            task.cancel()
        await asyncio.gather(*pending, return_exceptions=True)
        for task in pending:
            if task.cancelled():
                continue
            if exc := task.exception():
                raise exc

    async def _cancel_all(self) -> None:
        """Cancel all tasks.

        Blocks until cancelled tasks complete to allow them to clean up.
        Ignores exceptions.
        """
        for task in self._tasks:
            # NOTE: It is safe to cancel tasks that have already completed.
            task.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)


@contextlib.asynccontextmanager
async def open_task_group(
    *,
    exit_timeout: float | None = None,
    race: bool = False,
) -> AsyncIterator[TaskGroup]:
    """Create a task group.

    `asyncio` gained task groups in Python 3.11.

    This is an async context manager, meant to be used with `async with`.
    On exit, it blocks until all subtasks complete. If any subtask fails, or if
    the current task is cancelled, it cancels all subtasks in the group and
    raises the subtask's exception. If multiple subtasks fail simultaneously,
    one of their exceptions is chosen arbitrarily.

    NOTE: Subtask exceptions do not propagate until the context manager exits.
    This means that the task group cannot cancel code running inside the
    `async with` block .

    Args:
        exit_timeout: An optional timeout in seconds. When exiting the
            context manager, if tasks don't complete in this time,
            they are cancelled and a TimeoutError is raised.
        race: If true, all pending tasks are cancelled once any task
            in the group completes. Prefer to use the race() function instead.

    Raises:
        TimeoutError: if exit_timeout is specified and tasks don't finish
            in time.
    """
    task_group = TaskGroup()

    try:
        yield task_group
        await task_group._wait_all(race=race, timeout=exit_timeout)
    finally:
        await task_group._cancel_all()


@contextlib.asynccontextmanager
async def cancel_on_exit(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[None]:
    """Schedule a task, cancelling it when exiting the context manager.

    If the context manager exits successfully but the given coroutine raises
    an exception, that exception is reraised. The exception is suppressed
    if the context manager raises an exception.
    """

    async def stop_immediately():
        pass

    async with open_task_group(race=True) as group:
        group.start_soon(stop_immediately())
        group.start_soon(coro)
        yield


async def race(*coros: Coroutine[Any, Any, Any]) -> None:
    """Wait until the first completed task.

    After any coroutine completes, all others are cancelled.
    If the current task is cancelled, all coroutines are cancelled too.

    If coroutines complete simultaneously and any one of them raises
    an exception, an arbitrary one is propagated. Similarly, if any coroutines
    raise exceptions during cancellation, one of them propagates.

    Args:
        coros: Coroutines to race.
    """
    async with open_task_group(race=True) as tg:
        for coro in coros:
            tg.start_soon(coro)
