import abc
import fnmatch
import glob
import logging
import os
import queue
import time
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, MutableSet, Optional

from wandb import util
from wandb.sdk.lib.filesystem import GlobStr
from wandb.sdk.lib.paths import LogicalPath

if TYPE_CHECKING:
    import wandb.vendor.watchdog_0_9_0.observers.api as wd_api
    import wandb.vendor.watchdog_0_9_0.observers.polling as wd_polling
    import wandb.vendor.watchdog_0_9_0.watchdog.events as wd_events
    from wandb.sdk.internal.file_pusher import FilePusher
    from wandb.sdk.internal.settings_static import SettingsStatic
    from wandb.sdk.lib.filesystem import PolicyName
else:
    wd_polling = util.vendor_import("wandb_watchdog.observers.polling")
    wd_events = util.vendor_import("wandb_watchdog.events")

PathStr = str  # TODO(spencerpearson): would be nice to use Path here


logger = logging.getLogger(__name__)


class FileEventHandler(abc.ABC):
    def __init__(
        self,
        file_path: PathStr,
        save_name: LogicalPath,
        file_pusher: "FilePusher",
        *args: Any,
        **kwargs: Any,
    ) -> None:
        self.file_path = file_path
        # Convert windows paths to unix paths
        self.save_name = LogicalPath(save_name)
        self._file_pusher = file_pusher
        self._last_sync: Optional[float] = None

    @property
    @abc.abstractmethod
    def policy(self) -> "PolicyName":
        raise NotImplementedError

    @abc.abstractmethod
    def on_modified(self, force: bool = False) -> None:
        raise NotImplementedError

    @abc.abstractmethod
    def finish(self) -> None:
        raise NotImplementedError

    def on_renamed(self, new_path: PathStr, new_name: LogicalPath) -> None:
        self.file_path = new_path
        self.save_name = new_name
        self.on_modified()


class PolicyNow(FileEventHandler):
    """This policy only uploads files now."""

    def on_modified(self, force: bool = False) -> None:
        # only upload if we've never uploaded or when .save is called
        if self._last_sync is None or force:
            self._file_pusher.file_changed(self.save_name, self.file_path)
            self._last_sync = os.path.getmtime(self.file_path)

    def finish(self) -> None:
        pass

    @property
    def policy(self) -> "PolicyName":
        return "now"


class PolicyEnd(FileEventHandler):
    """This policy only updates at the end of the run."""

    def on_modified(self, force: bool = False) -> None:
        pass

    # TODO: make sure we call this
    def finish(self) -> None:
        # We use copy=False to avoid possibly expensive copies, and because
        # user files shouldn't still be changing at the end of the run.
        self._last_sync = os.path.getmtime(self.file_path)
        self._file_pusher.file_changed(self.save_name, self.file_path, copy=False)

    @property
    def policy(self) -> "PolicyName":
        return "end"


class PolicyLive(FileEventHandler):
    """Event handler that uploads respecting throttling.

    Uploads files every RATE_LIMIT_SECONDS, which changes as the size increases to deal
    with throttling.
    """

    RATE_LIMIT_SECONDS = 15
    unit_dict = dict(util.POW_10_BYTES)
    # Wait to upload until size has increased 20% from last upload
    RATE_LIMIT_SIZE_INCREASE = 1.2

    def __init__(
        self,
        file_path: PathStr,
        save_name: LogicalPath,
        file_pusher: "FilePusher",
        settings: Optional["SettingsStatic"] = None,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        super().__init__(file_path, save_name, file_pusher, *args, **kwargs)
        self._last_uploaded_time: Optional[float] = None
        self._last_uploaded_size: int = 0
        if settings is not None:
            if settings.x_live_policy_rate_limit is not None:
                self.RATE_LIMIT_SECONDS = settings.x_live_policy_rate_limit
            self._min_wait_time: Optional[float] = settings.x_live_policy_wait_time
        else:
            self._min_wait_time = None

    @property
    def current_size(self) -> int:
        return os.path.getsize(self.file_path)

    @classmethod
    def min_wait_for_size(cls, size: int) -> float:
        if size < 10 * cls.unit_dict["MB"]:
            return 60
        elif size < 100 * cls.unit_dict["MB"]:
            return 5 * 60
        elif size < cls.unit_dict["GB"]:
            return 10 * 60
        else:
            return 20 * 60

    def should_update(self) -> bool:
        if self._last_uploaded_time is not None:
            # Check rate limit by time elapsed
            time_elapsed = time.time() - self._last_uploaded_time
            # if more than 15 seconds has passed potentially upload it
            if time_elapsed < self.RATE_LIMIT_SECONDS:
                return False

            # Check rate limit by size increase
            if float(self._last_uploaded_size) > 0:
                size_increase = self.current_size / float(self._last_uploaded_size)
                if size_increase < self.RATE_LIMIT_SIZE_INCREASE:
                    return False
            return time_elapsed > (
                self._min_wait_time or self.min_wait_for_size(self.current_size)
            )

        # if the file has never been uploaded, we'll upload it
        return True

    def on_modified(self, force: bool = False) -> None:
        if self.current_size == 0:
            return
        if self._last_sync == os.path.getmtime(self.file_path):
            return
        if force or self.should_update():
            self.save_file()

    def save_file(self) -> None:
        self._last_sync = os.path.getmtime(self.file_path)
        self._last_uploaded_time = time.time()
        self._last_uploaded_size = self.current_size
        self._file_pusher.file_changed(self.save_name, self.file_path)

    def finish(self) -> None:
        self.on_modified(force=True)

    @property
    def policy(self) -> "PolicyName":
        return "live"


class DirWatcher:
    def __init__(
        self,
        settings: "SettingsStatic",
        file_pusher: "FilePusher",
        file_dir: Optional[PathStr] = None,
    ) -> None:
        self._file_count = 0
        self._dir = file_dir or settings.files_dir
        self._settings = settings
        self._savename_file_policies: MutableMapping[LogicalPath, PolicyName] = {}
        self._user_file_policies: Mapping[PolicyName, MutableSet[GlobStr]] = {
            "end": set(),
            "live": set(),
            "now": set(),
        }
        self._file_pusher = file_pusher
        self._file_event_handlers: MutableMapping[LogicalPath, FileEventHandler] = {}
        self._file_observer = wd_polling.PollingObserver()
        self._file_observer.schedule(
            self._per_file_event_handler(), self._dir, recursive=True
        )
        self._file_observer.start()
        logger.info("watching files in: %s", settings.files_dir)

    @property
    def emitter(self) -> Optional["wd_api.EventEmitter"]:
        try:
            return next(iter(self._file_observer.emitters))
        except StopIteration:
            return None

    def update_policy(self, path: GlobStr, policy: "PolicyName") -> None:
        # When we're dealing with one of our own media files, there's no need
        # to store the policy in memory.  _get_file_event_handler will always
        # return PolicyNow.  Using the path makes syncing historic runs much
        # faster if the name happens to include glob escapable characters.  In
        # the future we may add a flag to "files" records that indicates it's
        # policy is not dynamic and doesn't need to be stored / checked.
        save_name = LogicalPath(
            os.path.relpath(os.path.join(self._dir, path), self._dir)
        )
        if save_name.startswith("media/"):
            pass
        elif path == glob.escape(path):
            self._savename_file_policies[save_name] = policy
        else:
            self._user_file_policies[policy].add(path)

        for src_path in glob.glob(os.path.join(self._dir, path)):
            save_name = LogicalPath(os.path.relpath(src_path, self._dir))
            feh = self._get_file_event_handler(src_path, save_name)
            # handle the case where the policy changed
            if feh.policy != policy:
                try:
                    del self._file_event_handlers[save_name]
                except KeyError:
                    # TODO: probably should do locking, but this handles moved files for now
                    pass
                feh = self._get_file_event_handler(src_path, save_name)
            feh.on_modified(force=True)

    def _per_file_event_handler(self) -> "wd_events.FileSystemEventHandler":
        """Create a Watchdog file event handler that does different things for every file."""
        file_event_handler = wd_events.PatternMatchingEventHandler()
        file_event_handler.on_created = self._on_file_created
        file_event_handler.on_modified = self._on_file_modified
        file_event_handler.on_moved = self._on_file_moved
        file_event_handler._patterns = [os.path.join(self._dir, os.path.normpath("*"))]
        # Ignore hidden files/folders
        #  TODO: what other files should we skip?
        file_event_handler._ignore_patterns = [
            "*.tmp",
            "*.wandb",
            "wandb-summary.json",
            os.path.join(self._dir, ".*"),
            os.path.join(self._dir, "*/.*"),
        ]
        for glb in self._settings.ignore_globs:
            file_event_handler._ignore_patterns.append(os.path.join(self._dir, glb))

        return file_event_handler

    def _on_file_created(self, event: "wd_events.FileCreatedEvent") -> None:
        logger.info("file/dir created: %s", event.src_path)
        if os.path.isdir(event.src_path):
            return None
        self._file_count += 1
        # We do the directory scan less often as it grows
        if self._file_count % 100 == 0:
            emitter = self.emitter
            if emitter:
                emitter._timeout = int(self._file_count / 100) + 1
        save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
        self._get_file_event_handler(event.src_path, save_name).on_modified()

    # TODO(spencerpearson): this pattern repeats so many times we should have a method/function for it
    # def _save_name(self, path: PathStr) -> LogicalPath:
    #     return LogicalPath(os.path.relpath(path, self._dir))

    def _on_file_modified(self, event: "wd_events.FileModifiedEvent") -> None:
        logger.info(f"file/dir modified: {event.src_path}")
        if os.path.isdir(event.src_path):
            return None
        save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
        self._get_file_event_handler(event.src_path, save_name).on_modified()

    def _on_file_moved(self, event: "wd_events.FileMovedEvent") -> None:
        # TODO: test me...
        logger.info(f"file/dir moved: {event.src_path} -> {event.dest_path}")
        if os.path.isdir(event.dest_path):
            return None
        old_save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
        new_save_name = LogicalPath(os.path.relpath(event.dest_path, self._dir))

        # We have to move the existing file handler to the new name
        handler = self._get_file_event_handler(event.src_path, old_save_name)
        self._file_event_handlers[new_save_name] = handler
        del self._file_event_handlers[old_save_name]

        handler.on_renamed(event.dest_path, new_save_name)

    def _get_file_event_handler(
        self, file_path: PathStr, save_name: LogicalPath
    ) -> FileEventHandler:
        """Get or create an event handler for a particular file.

        file_path: the file's actual path
        save_name: its path relative to the run directory (aka the watch directory)
        """
        # Always return PolicyNow for any of our media files.
        if save_name.startswith("media/"):
            return PolicyNow(file_path, save_name, self._file_pusher, self._settings)
        if save_name not in self._file_event_handlers:
            # TODO: we can use PolicyIgnore if there are files we never want to sync
            if "tfevents" in save_name or "graph.pbtxt" in save_name:
                self._file_event_handlers[save_name] = PolicyLive(
                    file_path, save_name, self._file_pusher, self._settings
                )
            elif save_name in self._savename_file_policies:
                policy_name = self._savename_file_policies[save_name]
                make_handler = (
                    PolicyLive
                    if policy_name == "live"
                    else PolicyNow
                    if policy_name == "now"
                    else PolicyEnd
                )
                self._file_event_handlers[save_name] = make_handler(
                    file_path, save_name, self._file_pusher, self._settings
                )
            else:
                make_handler = PolicyEnd
                for policy, globs in self._user_file_policies.items():
                    if policy == "end":
                        continue
                    # Convert set to list to avoid RuntimeError's
                    # TODO: we may need to add locks
                    for g in list(globs):
                        paths = glob.glob(os.path.join(self._dir, g))
                        if any(save_name in p for p in paths):
                            if policy == "live":
                                make_handler = PolicyLive
                            elif policy == "now":
                                make_handler = PolicyNow
                self._file_event_handlers[save_name] = make_handler(
                    file_path, save_name, self._file_pusher, self._settings
                )
        return self._file_event_handlers[save_name]

    def finish(self) -> None:
        logger.info("shutting down directory watcher")
        try:
            # avoid hanging if we crashed before the observer was started
            if self._file_observer.is_alive():
                # rather unfortunately we need to manually do a final scan of the dir
                # with `queue_events`, then iterate through all events before stopping
                # the observer to catch all files written.  First we need to prevent the
                # existing thread from consuming our final events, then we process them
                self._file_observer._timeout = 0
                self._file_observer._stopped_event.set()
                self._file_observer.join()
                self.emitter.queue_events(0)  # type: ignore[union-attr]
                while True:
                    try:
                        self._file_observer.dispatch_events(
                            self._file_observer.event_queue, 0
                        )
                    except queue.Empty:
                        break
                # Calling stop unschedules any inflight events so we handled them above
                self._file_observer.stop()
        # TODO: py2 TypeError: PyCObject_AsVoidPtr called with null pointer
        except TypeError:
            pass
        # TODO: py3 SystemError: <built-in function stop> returned an error
        except SystemError:
            pass

        # Ensure we've at least noticed every file in the run directory. Sometimes
        # we miss things because asynchronously watching filesystems isn't reliable.
        logger.info("scan: %s", self._dir)

        for dirpath, _, filenames in os.walk(self._dir):
            for fname in filenames:
                file_path = os.path.join(dirpath, fname)
                save_name = LogicalPath(os.path.relpath(file_path, self._dir))
                ignored = False
                for glb in self._settings.ignore_globs:
                    if len(fnmatch.filter([save_name], glb)) > 0:
                        ignored = True
                        logger.info("ignored: %s matching glob %s", save_name, glb)
                        break
                if ignored:
                    continue
                logger.info("scan save: %s %s", file_path, save_name)
                self._get_file_event_handler(file_path, save_name).finish()
