import itertools
from typing import ClassVar  # noqa:F401
from typing import Optional  # noqa:F401

import ddtrace
from ddtrace.internal import atexit
from ddtrace.internal.constants import EXPERIMENTAL_FEATURES
from ddtrace.internal.threads import Lock
from ddtrace.vendor.dogstatsd import DogStatsd

from .. import periodic
from ..dogstatsd import get_dogstatsd_client
from ..logger import get_logger
from .constants import DEFAULT_RUNTIME_METRICS
from .constants import DEFAULT_RUNTIME_METRICS_INTERVAL
from .metric_collectors import GCRuntimeMetricCollector
from .metric_collectors import PSUtilRuntimeMetricCollector
from .tag_collectors import PlatformTagCollector
from .tag_collectors import PlatformTagCollectorV2
from .tag_collectors import ProcessTagCollector
from .tag_collectors import TracerTagCollector


log = get_logger(__name__)


class RuntimeCollectorsIterable(object):
    def __init__(self, enabled=None):
        self._enabled = enabled or self.ENABLED
        # Initialize the collectors.
        self._collectors = [c() for c in self.COLLECTORS]

    def __iter__(self):
        collected = (collector.collect(self._enabled) for collector in self._collectors)
        return itertools.chain.from_iterable(collected)

    def __repr__(self):
        return "{}(enabled={})".format(
            self.__class__.__name__,
            self._enabled,
        )


class PlatformTags(RuntimeCollectorsIterable):
    # DEV: `None` means to allow all tags generated by PlatformTagCollector and TracerTagCollector
    ENABLED = None
    COLLECTORS = [PlatformTagCollector]


class PlatformTagsV2(RuntimeCollectorsIterable):
    # DEV: `None` means to allow all tags generated by PlatformTagCollector and TracerTagCollector
    ENABLED = None
    COLLECTORS = [PlatformTagCollectorV2]


class TracerTags(RuntimeCollectorsIterable):
    # DEV: `None` means to allow all tags generated by PlatformTagCollector and TracerTagCollector
    ENABLED = None
    COLLECTORS = [TracerTagCollector]


class ProcessTags(RuntimeCollectorsIterable):
    # DEV: `None` means to allow all tags generated by ProcessTagsCollector
    ENABLED = None
    COLLECTORS = [ProcessTagCollector]


class RuntimeMetrics(RuntimeCollectorsIterable):
    ENABLED = DEFAULT_RUNTIME_METRICS
    COLLECTORS = [
        GCRuntimeMetricCollector,
        PSUtilRuntimeMetricCollector,
    ]


class RuntimeWorker(periodic.PeriodicService):
    """Worker thread for collecting and writing runtime metrics to a DogStatsd client."""

    enabled = False
    _instance = None  # type: ClassVar[Optional[RuntimeWorker]]
    _lock = Lock()

    def __init__(self, interval=DEFAULT_RUNTIME_METRICS_INTERVAL, tracer=None, dogstatsd_url=None) -> None:
        super().__init__(interval=interval)
        self.dogstatsd_url: Optional[str] = dogstatsd_url
        self._dogstatsd_client: DogStatsd = get_dogstatsd_client(
            self.dogstatsd_url or ddtrace.internal.settings._agent.config.dogstatsd_url
        )
        self.tracer: ddtrace.trace.Tracer = tracer or ddtrace.tracer
        self._runtime_metrics: RuntimeMetrics = RuntimeMetrics()
        if EXPERIMENTAL_FEATURES.RUNTIME_METRICS in ddtrace.config._experimental_features_enabled:
            # Enables sending runtime metrics as gauges (instead of distributions with a new metric name)
            self.send_metric = self._dogstatsd_client.gauge
        else:
            self.send_metric = self._dogstatsd_client.distribution

        if ddtrace.config._runtime_metrics_runtime_id_enabled:
            # Enables tagging runtime metrics with runtime-id (as well as all the v1 tags)
            self._platform_tags = self._format_tags(PlatformTagsV2())
        else:
            self._platform_tags = self._format_tags(PlatformTags())

        self._process_tags: list[str] = list(ProcessTags())

    @classmethod
    def disable(cls) -> None:
        with cls._lock:
            if cls._instance is None:
                return

            cls._instance.stop()
            # DEV: Use timeout to avoid locking on shutdown. This seems to be
            # required on some occasions by Python 2.7. Deadlocks seem to happen
            # when some functionalities (e.g. platform.architecture) are used
            # which end up calling
            #    _execute_child (/usr/lib/python2.7/subprocess.py:1023)
            # This is a continuous attempt to read:
            #    _eintr_retry_call (/usr/lib/python2.7/subprocess.py:125)
            # which is the eventual cause of the deadlock.
            cls._instance.join(1)
            cls._instance = None
            cls.enabled = False

    @classmethod
    def enable(
        cls,
        tracer: Optional[ddtrace.trace.Tracer] = None,
        dogstatsd_url: Optional[str] = None,
    ) -> None:
        with cls._lock:
            if cls._instance is not None:
                return
            runtime_worker = cls(DEFAULT_RUNTIME_METRICS_INTERVAL, tracer, dogstatsd_url)
            runtime_worker.start()

            atexit.register(cls.disable)

            cls._instance = runtime_worker
            cls.enabled = True

    def flush(self) -> None:
        # Ensure runtime metrics have up-to-date tags (ex: service, env, version)
        runtime_tags = self._format_tags(TracerTags()) + self._platform_tags + self._process_tags
        log.debug("Sending runtime metrics with the following tags: %s", runtime_tags)
        self._dogstatsd_client.constant_tags = runtime_tags

        with self._dogstatsd_client:
            for key, value in self._runtime_metrics:
                log.debug("Sending ddtrace runtime metric %s:%s", key, value)
                self.send_metric(key, value)

    def _format_tags(self, tags: RuntimeCollectorsIterable) -> list[str]:
        # DEV: ddstatsd expects tags in the form ['key1:value1', 'key2:value2', ...]
        return ["{}:{}".format(k, v) for k, v in tags]

    periodic = flush
    on_shutdown = flush
