from __future__ import annotations

import os
import threading

from wandb import env
from wandb.errors import AuthenticationError, UsageError, term
from wandb.sdk import wandb_setup

from . import prompt, wbnetrc
from .auth import Auth, AuthApiKey, AuthIdentityTokenFile, AuthWithSource
from .host_url import HostUrl

_session_auth_lock = threading.Lock()
_session_auth: Auth | None = None


def session_credentials(*, host: str | HostUrl) -> Auth | None:
    """Returns the configured session credentials.

    Returns None if session credentials are configured for a different host.
    """
    with _session_auth_lock:
        if _session_auth and _session_auth.host.is_same_url(host):
            return _session_auth
        else:
            return None


def _locked_set_session_auth(
    auth: Auth | None,
    *,
    update_settings: bool = True,
) -> None:
    """Update session credentials.

    Updates the global _session_auth variable and the global settings.
    This is a refactoring step to transition away from storing auth in settings.

    Args:
        update_settings: Defaults to true. If false, skips updating the global
            settings (which may cause them to be loaded).
    """
    global _session_auth
    _session_auth = auth

    if not update_settings:
        return

    settings = wandb_setup.singleton().settings

    if auth is None:
        settings.api_key = None
        settings.identity_token_file = None

    elif isinstance(auth, AuthApiKey):
        settings.api_key = auth.api_key
        settings.identity_token_file = None
        settings.base_url = str(auth.host)

    elif isinstance(auth, AuthIdentityTokenFile):
        settings.api_key = None
        settings.identity_token_file = str(auth.path)
        settings.base_url = str(auth.host)

    else:
        raise NotImplementedError(str(auth))


def unauthenticate_session(*, update_settings: bool = True) -> Auth | None:
    """Clear the session credentials.

    Args:
        update_settings: Defaults to true. If false, skips updating the global
            settings (which may cause them to be loaded).

    Returns:
        The previous credentials, if any.
    """
    with _session_auth_lock:
        auth = _session_auth
        _locked_set_session_auth(None, update_settings=update_settings)
        return auth


def authenticate_session(
    *,
    host: str | HostUrl,
    source: str,
    no_offline: bool = False,
    no_create: bool = False,
    input_timeout: float | None = None,
    referrer: str = "models",
    relogin: bool = False,
) -> Auth | None:
    """Returns or configures the session credentials.

    If the session credentials are already configured for the given host,
    returns them. Otherwise, uses system credentials or prompts interactively.

    The return value is only None if the user selected offline mode in
    the interactive prompt.

    Args:
        host: The W&B server URL.
        source: The source to include in printed messages,
            like "wandb.init()".
        no_offline: Whether to show an offline option in interactive prompts.
        no_create: Whether to show a new account option in interactive prompts.
        input_timeout: A timeout for interactive prompts to avoid hanging
            the process if we incorrectly identify it as interactive.
        referrer: Referrer parameter to add to printed URLs for analytics.
        relogin: If true, forces an interactive prompt.

    Raises:
        TimeoutError: If an interactive prompt is shown and input_timeout expires.
        AuthenticationError: If credentials are found but have an invalid format.
        UsageError: If interactive prompting is needed but unavailable.
    """
    if not isinstance(host, HostUrl):
        host = HostUrl(host)

    if not relogin and (auth := session_credentials(host=host)):
        return auth

    if not relogin and (auth := _use_system_auth(host=host, source=source)):
        return auth

    try:
        return _use_prompted_auth(
            host=host,
            no_offline=no_offline,
            no_create=no_create,
            referrer=referrer,
            input_timeout=input_timeout,
        )
    except term.NotATerminalError:
        raise UsageError(
            "No API key configured. Use `wandb login` to log in."
        ) from None


def use_explicit_auth(auth: Auth, *, source: str) -> None:
    """Use explicitly given credentials in the session.

    Args:
        auth: Credentials to use.
        source: The source to include in the printed message,
            like "wandb.init()".
    """
    with _session_auth_lock:
        if _session_auth == auth:
            return

        if _session_auth:
            term.termwarn(
                f"[{source}] Changing session credentials to explicit value"
                + f" for {auth.host}."
            )
        else:
            term.termlog(
                f"[{source}] Using explicit session credentials for {auth.host}."
            )

        _locked_set_session_auth(auth)


def _use_system_auth(*, host: HostUrl, source: str) -> Auth | None:
    """Load (or reload) session credentials from external sources.

    Loads credentials from environment variables or the .netrc file.
    If no credentials are found, the session credentials are unchanged.

    Args:
        host: The W&B server URL.
        source: The source to include in the printed message,
            like "wandb.init()".

    Raises:
        AuthenticationError: If a source of credentials is found but has an
            invalid format.

    Returns:
        The new credentials, if any.
    """
    auth = (
        _try_env_auth(host=host)  #
        or wbnetrc.read_netrc_auth_with_source(host=host)
    )

    with _session_auth_lock:
        if auth:
            term.termlog(
                f"[{source}] Loaded credentials for {auth.auth.host}"
                + f" from {auth.source}."
            )
            _locked_set_session_auth(auth.auth)

        return _session_auth


def _try_env_auth(*, host: HostUrl) -> AuthWithSource | None:
    """Returns credentials from environment variables, if set.

    Raises an authentication error if an invalid combination of environment
    variables is set.
    """
    api_key = os.getenv(env.API_KEY)
    identity_token_file = os.getenv(env.IDENTITY_TOKEN_FILE)

    if api_key and identity_token_file:
        raise AuthenticationError(
            f"Both {env.API_KEY} and {env.IDENTITY_TOKEN_FILE} are set,"
            + " which is not allowed."
        )

    if api_key:
        try:
            return AuthWithSource(
                auth=AuthApiKey(host=host, api_key=api_key),
                source=env.API_KEY,
            )
        except AuthenticationError as e:
            raise AuthenticationError(f"{env.API_KEY} invalid: {e}") from None

    elif identity_token_file:
        return AuthWithSource(
            auth=AuthIdentityTokenFile(host=host, path=identity_token_file),
            source=env.IDENTITY_TOKEN_FILE,
        )

    return None


def _use_prompted_auth(
    *,
    host: HostUrl,
    no_offline: bool,
    no_create: bool,
    referrer: str,
    input_timeout: float | None = None,
) -> Auth | None:
    """Prompt interactively to set session credentials.

    May clear session credentials if the user selects offline mode.

    Args:
        host: The W&B server URL.
        no_offline: If true, do not show an option to skip logging in.
        no_create: If true, do not show an option to create a new account.
        referrer: Referrer parameter to include in printed URLs for analytics.
        input_timeout: How long to wait for user input before timing out.

    Raises:
        NotATerminalError: If interactive prompting is not possible.
        TimeoutError: If input_timeout expires.
    """
    api_key = prompt.prompt_and_save_api_key(
        host=host,
        no_offline=no_offline,
        no_create=no_create,
        referrer=referrer,
        input_timeout=input_timeout,
    )

    with _session_auth_lock:
        if api_key:
            _locked_set_session_auth(AuthApiKey(host=host, api_key=api_key))
        else:
            # Offline mode selected.
            _locked_set_session_auth(None)

        return _session_auth
