from __future__ import annotations

import logging
import secrets
import string
import threading
from typing import Awaitable, Callable

from wandb.proto import wandb_internal_pb2 as pb
from wandb.proto import wandb_server_pb2 as spb
from wandb.sdk.lib import asyncio_manager

from .mailbox_handle import MailboxHandle
from .response_handle import MailboxResponseHandle

_logger = logging.getLogger(__name__)


class MailboxClosedError(Exception):
    """The mailbox has been closed and cannot be used."""


class Mailbox:
    """Matches service responses to requests.

    The mailbox can set an address on a server request and create a handle for
    waiting for a response to that record. Responses are delivered by calling
    `deliver()`. The `close()` method abandons all handles in case the
    service process becomes unreachable.
    """

    def __init__(
        self,
        asyncer: asyncio_manager.AsyncioManager,
        cancel: Callable[[str], Awaitable[None]],
    ) -> None:
        """Create a mailbox.

        Args:
            asyncer: Asyncio runner for scheduling async operations.
            cancel: A callback that can be used to cancel a request by ID.
        """
        self._asyncer = asyncer
        self._cancel = cancel

        self._handles: dict[str, MailboxResponseHandle] = {}
        self._handles_lock = threading.Lock()
        self._closed = False

    def require_response(
        self,
        request: spb.ServerRequest | pb.Record,
    ) -> MailboxHandle[spb.ServerResponse]:
        """Set a response address on a request.

        Args:
            request: The request on which to set a request ID or mailbox slot.
                This is mutated. An address must not already be set.

        Returns:
            A handle for waiting for the response to the request.

        Raises:
            MailboxClosedError: If the mailbox has been closed, in which case
                no new responses are expected to be delivered and new handles
                cannot be created.
        """
        if isinstance(request, spb.ServerRequest):
            if (address := request.request_id) or (
                address := request.record_publish.control.mailbox_slot
            ):
                raise ValueError(f"Request already has an address ({address})")

            address = self._new_address()
            request.request_id = address
            if request.HasField("record_publish"):
                request.record_publish.control.mailbox_slot = address
            if request.HasField("record_communicate"):
                request.record_communicate.control.mailbox_slot = address
        else:
            if address := request.control.mailbox_slot:
                raise ValueError(f"Request already has an address ({address})")

            address = self._new_address()
            request.control.mailbox_slot = address

        with self._handles_lock:
            if self._closed:
                raise MailboxClosedError()

            handle = MailboxResponseHandle(
                address,
                asyncer=self._asyncer,
                cancel=self._cancel,
            )
            self._handles[address] = handle

        return handle

    def _new_address(self) -> str:
        """Returns an unused address for a request.

        Assumes `_handles_lock` is held.
        """

        def generate():
            return "".join(
                secrets.choice(string.ascii_lowercase + string.digits)
                for _ in range(12)
            )

        address = generate()

        # Being extra cautious. This loop will almost never be entered.
        while address in self._handles:
            address = generate()

        return address

    async def deliver(self, response: spb.ServerResponse) -> None:
        """Deliver a response from the service.

        If the response address is invalid, this does nothing.
        It is a no-op if the mailbox has been closed.
        """
        address = response.request_id
        if not address:
            kind: str | None = response.WhichOneof("server_response_type")
            if kind == "result_communicate":
                result_type = response.result_communicate.WhichOneof("result_type")
                kind = f"result_communicate.{result_type}"

            _logger.error(f"Received response with no mailbox slot: {kind}")
            return

        with self._handles_lock:
            # NOTE: If the mailbox is closed, this returns None because
            # we clear the dict.
            handle = self._handles.pop(address, None)

        # It is not an error if there is no handle for the address:
        # handles can be abandoned if the result is no longer needed.
        if handle:
            await handle.deliver(response)

    def close(self) -> None:
        """Indicate no further responses will be delivered.

        Abandons all handles.
        """
        with self._handles_lock:
            self._closed = True

            _logger.info(
                f"Closing mailbox, abandoning {len(self._handles)} handles.",
            )

            for handle in self._handles.values():
                handle.abandon()
            self._handles.clear()
