import abc
import binascii
from collections import defaultdict
import functools
import gzip
import os
import sys
import threading
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Optional
from typing import TextIO
import weakref

from ddtrace import config
from ddtrace.internal import forksafe
from ddtrace.internal.dist_computing.utils import in_ray_job
from ddtrace.internal.hostname import get_hostname
import ddtrace.internal.native as native
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.settings._agent import config as agent_config
from ddtrace.internal.settings.asm import ai_guard_config
from ddtrace.internal.settings.asm import config as asm_config
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
from ddtrace.version import __version__

from ...constants import _KEEP_SPANS_RATE_KEY
from .. import compat
from .. import periodic
from .. import service
from .._encoding import BufferFull
from .._encoding import BufferItemTooLarge
from ..agent import get_connection
from ..constants import _HTTPLIB_NO_TRACE_REQUEST
from ..dogstatsd import get_dogstatsd_client
from ..encoding import JSONEncoderV2
from ..gitmetadata import get_git_tags
from ..logger import get_logger
from ..serverless import has_aws_lambda_agent_extension
from ..serverless import in_aws_lambda
from ..serverless import in_azure_function
from ..serverless import in_gcp_function
from ..service import ServiceStatusError
from ..sma import SimpleMovingAverage
from ..utils.formats import parse_tags_str
from ..utils.http import Response
from ..utils.http import verify_url
from ..utils.time import StopWatch
from .writer_client import WRITER_CLIENTS
from .writer_client import AgentlessWriterClient
from .writer_client import AgentWriterClientV4
from .writer_client import WriterClientBase


if TYPE_CHECKING:  # pragma: no cover
    from ddtrace.trace import Span  # noqa:F401
    from ddtrace.vendor.dogstatsd import DogStatsd

    from .utils.http import ConnectionType  # noqa:F401


log = get_logger(__name__)

LOG_ERR_INTERVAL = 60


def _safelog(log_func: Callable[..., None], msg: str, *args, **kwargs) -> None:
    """
    Safely log a message, handling closed I/O streams gracefully.

    During interpreter shutdown or when test frameworks (like pytest) close
    captured stdout/stderr streams, logging calls may fail with:
        ValueError: I/O operation on closed file

    This can happen when a background thread (e.g., the periodic writer thread)
    attempts to log after the main process has started shutting down but before
    the Python interpreter has begun finalization (Py_IsFinalizing()).

    This wrapper catches such errors and attempts to print to stderr as a
    fallback. If stderr is also closed, the message is silently dropped.

    Args:
        log_func: The logger method to call (e.g., log.debug, log.warning, log.error)
        msg: The log message format string
        *args: Arguments for the format string
        **kwargs: Keyword arguments passed to the logger (e.g., exc_info, extra)
    """
    try:
        log_func(msg, *args, **kwargs)
    except ValueError:
        try:
            formatted_msg = msg % args if args else msg
            print(f"[ddtrace] I/O closed, could not log: {formatted_msg}", file=sys.stderr)
        except ValueError:
            pass


class NoEncodableSpansError(Exception):
    pass


# The window size should be chosen so that the look-back period is
# greater-equal to the agent API's timeout. Although most tracers have a
# 2s timeout, the java tracer has a 10s timeout, so we set the window size
# to 10 buckets of 1s duration.
DEFAULT_SMA_WINDOW = 10


def make_weak_method_hook(bound_method):
    """
    Wrap a bound method so that it is called via a weakref to its instance.
    If the instance has been garbage-collected, the hook is a no-op.
    """
    if not hasattr(bound_method, "__self__") or bound_method.__self__ is None:
        raise TypeError("make_weak_method_hook expects a bound method")

    instance = bound_method.__self__
    func = bound_method.__func__
    instance_ref = weakref.ref(instance)

    @functools.wraps(func)
    def hook(*args, **kwargs):
        inst = instance_ref()
        if inst is None:
            # The instance was garbage-collected
            return
        return func(inst, *args, **kwargs)

    return hook


def _human_size(nbytes: float) -> str:
    """Return a human-readable size."""
    i = 0
    suffixes = ["B", "KB", "MB", "GB", "TB"]
    while nbytes >= 1000 and i < len(suffixes) - 1:
        nbytes /= 1000.0
        i += 1
    f = ("%.2f" % nbytes).rstrip("0").rstrip(".")
    return "%s%s" % (f, suffixes[i])


class TraceWriter(metaclass=abc.ABCMeta):
    # TODO: `appsec_enabled` is used by ASM to dynamically enable ASM at runtime.
    #       Find an alternative way to do this without having to pass the parameter/recreating the writer
    @abc.abstractmethod
    def recreate(self, appsec_enabled: Optional[bool] = None) -> "TraceWriter":
        pass

    @abc.abstractmethod
    def stop(self, timeout: Optional[float] = None) -> None:
        pass

    @abc.abstractmethod
    def write(self, spans: Optional[list["Span"]] = None) -> None:
        pass

    @abc.abstractmethod
    def flush_queue(self) -> None:
        pass


class LogWriter(TraceWriter):
    def __init__(
        self,
        out: TextIO = sys.stdout,
    ) -> None:
        self.encoder = JSONEncoderV2()
        self.out = out

    def recreate(self, appsec_enabled: Optional[bool] = None) -> "LogWriter":
        """Create a new instance of :class:`LogWriter` using the same settings from this instance

        :rtype: :class:`LogWriter`
        :returns: A new :class:`LogWriter` instance
        """
        writer = self.__class__(out=self.out)
        return writer

    def stop(self, timeout: Optional[float] = None) -> None:
        return

    def write(self, spans: Optional[list["Span"]] = None) -> None:
        if not spans:
            return
        encoded = self.encoder.encode_traces([spans])
        self.out.write(encoded + "\n")
        self.out.flush()

    def flush_queue(self) -> None:
        pass


class HTTPWriter(periodic.PeriodicService, TraceWriter):
    """Writer to an arbitrary HTTP intake endpoint."""

    intake_url: str

    RETRY_ATTEMPTS = 3
    HTTP_METHOD = "PUT"
    STATSD_NAMESPACE = "tracer"

    def __init__(
        self,
        intake_url: str,
        clients: list[WriterClientBase],
        processing_interval: Optional[float] = None,
        # Match the payload size since there is no functionality
        # to flush dynamically.
        buffer_size: Optional[int] = None,
        max_payload_size: Optional[int] = None,
        timeout: Optional[float] = None,
        dogstatsd: Optional["DogStatsd"] = None,
        sync_mode: bool = False,
        reuse_connections: Optional[bool] = None,
        headers: Optional[dict[str, str]] = None,
        report_metrics: bool = True,
        use_gzip: bool = False,
    ) -> None:
        if processing_interval is None:
            processing_interval = config._trace_writer_interval_seconds
        if timeout is None:
            timeout = agent_config.trace_agent_timeout_seconds
        super(HTTPWriter, self).__init__(interval=processing_interval)
        self.intake_url = intake_url
        self._intake_accepts_gzip = use_gzip
        self._buffer_size = buffer_size
        self._max_payload_size = max_payload_size
        self._headers = headers or {}
        self._timeout = timeout

        self._clients = clients
        self.dogstatsd = dogstatsd
        self._metrics: dict[str, int] = defaultdict(int)
        self._report_metrics = report_metrics
        self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW)
        self._sync_mode = sync_mode
        self._conn: Optional["ConnectionType"] = None
        # The connection has to be locked since there exists a race between
        # the periodic thread of HTTPWriter and other threads that might
        # force a flush with `flush_queue()`.
        self._conn_lck: threading.RLock = threading.RLock()

        self._send_payload_with_backoff = fibonacci_backoff_with_jitter(  # type ignore[assignment]
            attempts=self.RETRY_ATTEMPTS,
            initial_wait=0.618 * self.interval / (1.618**self.RETRY_ATTEMPTS) / 2,
            until=lambda result: isinstance(result, Response),
        )(self._send_payload)

        self._reuse_connections = (
            config._trace_writer_connection_reuse if reuse_connections is None else reuse_connections
        )

    def _intake_endpoint(self, client=None):
        return "{}/{}".format(self._intake_url(client), client.ENDPOINT if client else self._endpoint)

    @property
    def _endpoint(self):
        return self._clients[0].ENDPOINT

    @property
    def _encoder(self):
        return self._clients[0].encoder

    def _intake_url(self, client=None):
        if client and hasattr(client, "_intake_url"):
            return client._intake_url
        return self.intake_url

    def _metrics_dist(self, name: str, count: int = 1, tags: Optional[list] = None) -> None:
        if not self._report_metrics:
            return
        if config._health_metrics_enabled and self.dogstatsd:
            self.dogstatsd.distribution("datadog.%s.%s" % (self.STATSD_NAMESPACE, name), count, tags=tags)

    def _set_drop_rate(self) -> None:
        accepted = self._metrics["accepted_traces"]
        sent = self._metrics["sent_traces"]
        encoded = sum([len(client.encoder) for client in self._clients])
        # The number of dropped traces is the number of accepted traces minus the number of traces in the encoder
        # This calculation is a best effort. Due to race conditions it may result in a slight underestimate.
        dropped = max(accepted - sent - encoded, 0)  # dropped spans should never be negative
        self._drop_sma.set(dropped, accepted)
        self._metrics["sent_traces"] = 0  # reset sent traces for the next interval
        self._metrics["accepted_traces"] = encoded  # sets accepted traces to number of spans in encoders

    def _set_keep_rate(self, trace):
        if trace:
            trace[0]._metrics[_KEEP_SPANS_RATE_KEY] = (
                1.0 - self._drop_sma.get()
            )  # PERF: avoid setting via Span.set_metric

    def _reset_connection(self) -> None:
        with self._conn_lck:
            if self._conn:
                self._conn.close()
                self._conn = None

    def _put(self, data: bytes, headers: dict[str, str], client: WriterClientBase, no_trace: bool) -> Response:
        sw = StopWatch()
        sw.start()
        with self._conn_lck:
            if self._conn is None:
                _safelog(
                    log.debug,
                    "creating new intake connection to %s with timeout %d",
                    self.intake_url,
                    self._timeout,
                )
                self._conn = get_connection(self._intake_url(client), self._timeout)
                setattr(self._conn, _HTTPLIB_NO_TRACE_REQUEST, no_trace)
            try:
                # Merge client headers with request headers
                final_headers = {}
                if hasattr(client, "_headers") and client._headers:
                    final_headers.update(client._headers)
                final_headers.update(headers)

                _safelog(
                    log.debug,
                    "Sending request: Method=%s Endpoint=%s Headers=%s PayloadSize=%s",
                    self.HTTP_METHOD,
                    client.ENDPOINT,
                    final_headers,
                    _human_size(len(data)),
                )
                self._conn.request(
                    self.HTTP_METHOD,
                    client.ENDPOINT,
                    data,
                    final_headers,
                )
                resp = self._conn.getresponse()
                t = sw.elapsed()
                if t >= self.interval:
                    log_func = log.warning
                else:
                    log_func = log.debug
                _safelog(
                    log_func,
                    "Got response: %d %s sent %s in %.5fs to %s",
                    resp.status,
                    resp.reason,
                    _human_size(len(data)),
                    t,
                    self._intake_endpoint(client),
                )
                # Read response body inside try block to ensure connection
                # is reset if this from_http_response call throws an exception
                # (e.g. IncompleteRead)
                return Response.from_http_response(resp)
            except Exception:
                # Always reset the connection when an exception occurs
                self._reset_connection()
                raise
            finally:
                # Reset the connection if reusing connections is disabled.
                if not self._reuse_connections:
                    self._reset_connection()

    def _get_finalized_headers(self, count: int, client: WriterClientBase) -> dict[str, str]:
        headers = self._headers.copy()
        headers.update({"Content-Type": client.encoder.content_type})
        if hasattr(client, "_headers"):
            headers.update(client._headers)
        return headers

    def _send_payload(self, payload: bytes, count: int, client: WriterClientBase) -> Response:
        headers = self._get_finalized_headers(count, client)

        self._metrics_dist("http.requests")

        response = self._put(payload, headers, client, no_trace=True)

        if response.status >= 400:
            self._metrics_dist("http.errors", tags=["type:%s" % response.status])
        else:
            self._metrics_dist("http.sent.bytes", len(payload))
            self._metrics["sent_traces"] += count

        if response.status not in (404, 415) and response.status >= 400:
            msg = "failed to send traces to intake at %s: HTTP error status %s, reason %s"
            log_args: tuple[Any, Any, Any] = (
                self._intake_endpoint(client),
                response.status,
                response.reason,
            )
            # Append the payload if requested
            if config._trace_writer_log_err_payload:
                msg += ", payload %s"
                # If the payload is bytes then hex encode the value before logging
                if isinstance(payload, bytes):
                    log_args += (binascii.hexlify(payload).decode(),)  # type: ignore
                else:
                    log_args += (payload,)

            _safelog(log.error, msg, *log_args, extra={"send_to_telemetry": False})
            self._metrics_dist("http.dropped.bytes", len(payload))
            self._metrics_dist("http.dropped.traces", count)
        return response

    def write(self, spans=None):
        for client in self._clients:
            self._write_with_client(client, spans=spans)
        if self._sync_mode:
            self.flush_queue()

    def _write_with_client(self, client: WriterClientBase, spans: Optional[list["Span"]] = None) -> None:
        if spans is None:
            return

        if self._sync_mode is False:
            # Start the HTTPWriter on first write.
            try:
                if self.status != service.ServiceStatus.RUNNING:
                    self.start()

            except service.ServiceStatusError:
                pass

        self._metrics_dist("writer.accepted.traces")
        self._metrics["accepted_traces"] += 1
        self._set_keep_rate(spans)

        try:
            client.encoder.put(spans)
        except BufferItemTooLarge as e:
            payload_size = e.args[0]
            _safelog(
                log.warning,
                "trace (%db) larger than payload buffer item limit (%db), dropping",
                payload_size,
                client.encoder.max_item_size,
            )
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:t_too_big"])
            self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:t_too_big"])
        except BufferFull as e:
            payload_size = e.args[0]
            _safelog(
                log.warning,
                "trace buffer (%s traces %db/%db) cannot fit trace of size %db, dropping (writer status: %s)",
                len(client.encoder),
                client.encoder.size,
                client.encoder.max_size,
                payload_size,
                self.status.value,
            )
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:full"])
            self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:full"])
        except NoEncodableSpansError:
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:incompatible"])
        else:
            self._metrics_dist("buffer.accepted.traces", 1)
            self._metrics_dist("buffer.accepted.spans", len(spans))

    def flush_queue(self, raise_exc: bool = False):
        try:
            for client in self._clients:
                self._flush_queue_with_client(client, raise_exc=raise_exc)
        finally:
            self._set_drop_rate()

    def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
        n_traces = len(client.encoder)
        try:
            if not (encoded_traces := client.encoder.encode()):
                return

        except Exception:
            # FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
            _safelog(log.error, "failed to encode trace with encoder %r", client.encoder, exc_info=True)
            self._metrics_dist("encoder.dropped.traces", n_traces)
            return

        for payload in encoded_traces:
            encoded_data, n_traces = payload
            self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc)

    def _flush_single_payload(
        self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False
    ) -> None:
        if encoded is None:
            return

        # Should gzip the payload if intake accepts it
        if self._intake_accepts_gzip:
            try:
                original_size = len(encoded)
                # Replace the value to send with the gzipped the value
                encoded = gzip.compress(encoded, compresslevel=6)
                _safelog(log.debug, "Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))

                # And add the header
                self._headers["Content-Encoding"] = "gzip"
            except Exception:
                _safelog(log.error, "failed to compress traces with encoder %r", client.encoder, exc_info=True)
                self._metrics_dist("encoder.dropped.traces", n_traces)
                return

        try:
            self._send_payload_with_backoff(encoded, n_traces, client)
        except Exception:
            self._metrics_dist("http.errors", tags=["type:err"])
            self._metrics_dist("http.dropped.bytes", len(encoded))
            self._metrics_dist("http.dropped.traces", n_traces)
            if raise_exc:
                raise
            else:
                _safelog(
                    log.error,
                    "failed to send, dropping %d traces to intake at %s after %d retries",
                    n_traces,
                    self._intake_endpoint(client),
                    self.RETRY_ATTEMPTS,
                    exc_info=True,
                    extra={"send_to_telemetry": False},
                )
        finally:
            self._metrics_dist("http.sent.bytes", len(encoded))
            self._metrics_dist("http.sent.traces", n_traces)

    def periodic(self):
        self.flush_queue(raise_exc=False)

    def _stop_service(
        self,
        timeout: Optional[float] = None,
    ) -> None:
        # FIXME: don't join() on stop(), let the caller handle this
        super(HTTPWriter, self)._stop_service()
        self.join(timeout=timeout)

    def on_shutdown(self):
        try:
            self.periodic()
        finally:
            self._reset_connection()


class AgentResponse(object):
    def __init__(self, rate_by_service: dict[str, float]) -> None:
        self.rate_by_service = rate_by_service


class AgentWriterInterface(metaclass=abc.ABCMeta):
    intake_url: str
    _api_version: str
    _sync_mode: bool

    @abc.abstractmethod
    def set_test_session_token(self, token: Optional[str]) -> None:
        pass

    @abc.abstractmethod
    def flush_queue(self, raise_exc: bool = False) -> None:
        pass


class AgentWriter(HTTPWriter, AgentWriterInterface):
    """
    The Datadog Agent supports (at the time of writing this) receiving trace
    payloads up to 50MB. A trace payload is just a list of traces and the agent
    expects a trace to be complete. That is, all spans with the same trace_id
    should be in the same trace.
    """

    RETRY_ATTEMPTS = 3
    HTTP_METHOD = "PUT"
    STATSD_NAMESPACE = "tracer"

    def __init__(
        self,
        intake_url: str,
        processing_interval: Optional[float] = None,
        # Match the payload size since there is no functionality
        # to flush dynamically.
        buffer_size: Optional[int] = None,
        max_payload_size: Optional[int] = None,
        timeout: Optional[float] = None,
        dogstatsd: Optional["DogStatsd"] = None,
        report_metrics: bool = True,
        sync_mode: bool = False,
        api_version: Optional[str] = None,
        reuse_connections: Optional[bool] = None,
        headers: Optional[dict[str, str]] = None,
        response_callback: Optional[Callable[[AgentResponse], None]] = None,
    ) -> None:
        if processing_interval is None:
            processing_interval = config._trace_writer_interval_seconds
        if timeout is None:
            timeout = agent_config.trace_agent_timeout_seconds
        if buffer_size is not None and buffer_size <= 0:
            raise ValueError("Writer buffer size must be positive")
        if max_payload_size is not None and max_payload_size <= 0:
            raise ValueError("Max payload size must be positive")
        # Default to v0.4 if we are on Windows since there is a known compatibility issue
        # https://github.com/DataDog/dd-trace-py/issues/4829
        # DEV: sys.platform on windows should be `win32` or `cygwin`, but using `startswith`
        #      as a safety precaution.
        #      https://docs.python.org/3/library/sys.html#sys.platform
        is_windows = sys.platform.startswith("win") or sys.platform.startswith("cygwin")

        default_api_version = "v0.5"
        if (
            is_windows
            or in_gcp_function()
            or in_azure_function()
            or asm_config._asm_enabled
            or asm_config._iast_enabled
            or ai_guard_config._ai_guard_enabled
        ):
            default_api_version = "v0.4"

        self._api_version = api_version or config._trace_api or default_api_version

        if agent_config.trace_native_span_events:
            log.warning("Setting api version to v0.4; DD_TRACE_NATIVE_SPAN_EVENTS is not compatible with v0.5")
            self._api_version = "v0.4"

        if is_windows and self._api_version == "v0.5":
            raise RuntimeError(
                "There is a known compatibility issue with v0.5 API and Windows, "
                "please see https://github.com/DataDog/dd-trace-py/issues/4829 for more details."
            )

        buffer_size = buffer_size or config._trace_writer_buffer_size
        max_payload_size = max_payload_size or config._trace_writer_payload_size
        if self._api_version not in WRITER_CLIENTS:
            log.warning(
                "Unsupported api version: '%s'. The supported versions are: %r",
                self._api_version,
                ", ".join(sorted(WRITER_CLIENTS.keys())),
            )
            self._api_version = sorted(WRITER_CLIENTS.keys())[-1]
        client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size)

        _headers = {
            "Datadog-Meta-Lang": "python",
            "Datadog-Meta-Lang-Version": compat.PYTHON_VERSION,
            "Datadog-Meta-Lang-Interpreter": compat.PYTHON_INTERPRETER,
            "Datadog-Meta-Tracer-Version": __version__,
            "Datadog-Client-Computed-Top-Level": "yes",
        }
        if headers:
            _headers.update(headers)

        _headers.update({"Content-Type": client.encoder.content_type})
        additional_header_str = os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS")
        if additional_header_str is not None:
            _headers.update(parse_tags_str(additional_header_str))
        self._response_cb = response_callback
        self._report_metrics = report_metrics
        super(AgentWriter, self).__init__(
            intake_url=intake_url,
            clients=[client],
            processing_interval=processing_interval,
            buffer_size=buffer_size,
            max_payload_size=max_payload_size,
            timeout=timeout,
            dogstatsd=dogstatsd,
            sync_mode=sync_mode,
            reuse_connections=reuse_connections,
            headers=_headers,
            report_metrics=report_metrics,
        )

    def recreate(self, appsec_enabled: Optional[bool] = None) -> HTTPWriter:
        # Ensure AppSec metadata is encoded by setting the API version to v0.4.
        try:
            # Stop the writer to ensure it is not running while we reconfigure it.
            self.stop()
        except ServiceStatusError:
            # Writers like AgentWriter may not start until the first trace is encoded.
            # Stopping them before that will raise a ServiceStatusError.
            pass

        api_version = "v0.4" if appsec_enabled else self._api_version

        return self.__class__(
            intake_url=self.intake_url,
            processing_interval=self._interval,
            buffer_size=self._buffer_size,
            max_payload_size=self._max_payload_size,
            timeout=self._timeout,
            dogstatsd=self.dogstatsd,
            sync_mode=self._sync_mode,
            api_version=api_version,
            headers=self._headers,
            report_metrics=self._report_metrics,
            response_callback=self._response_cb,
        )

    @property
    def _agent_endpoint(self):
        return self._intake_endpoint(client=None)

    def _downgrade(self, response, client):
        if client.ENDPOINT == "v0.5/traces":
            self._clients = [AgentWriterClientV4(self._buffer_size, self._max_payload_size)]
            # Since we have to change the encoding in this case, the payload
            # would need to be converted to the downgraded encoding before
            # sending it, but we chuck it away instead.
            _safelog(
                log.warning,
                "Calling endpoint '%s' but received %s; downgrading API. "
                "Dropping trace payload due to the downgrade to an incompatible API version (from v0.5 to v0.4). To "
                "avoid this from happening in the future, either ensure that the Datadog agent has a v0.5/traces "
                "endpoint available, or explicitly set the trace API version to, e.g., v0.4.",
                client.ENDPOINT,
                response.status,
            )
        else:
            _safelog(
                log.error,
                "unsupported endpoint '%s': received response %s from intake (%s)",
                client.ENDPOINT,
                response.status,
                self.intake_url,
            )

    def _send_payload(self, payload, count, client) -> Response:
        response = super(AgentWriter, self)._send_payload(payload, count, client)
        if response.status in [404, 415]:
            self._downgrade(response, client)
        elif response.status < 400:
            if self._response_cb:
                raw_resp = response.get_json()
                if raw_resp and "rate_by_service" in raw_resp:
                    self._response_cb(
                        AgentResponse(
                            rate_by_service=raw_resp["rate_by_service"],
                        )
                    )
        return response

    def start(self):
        super(AgentWriter, self).start()
        try:
            # appsec remote config should be enabled/started after the global tracer and configs
            # are initialized
            if asm_config._asm_rc_enabled:
                from ddtrace.appsec._remoteconfiguration import enable_appsec_rc

                enable_appsec_rc()
        except service.ServiceStatusError:
            pass

    def _get_finalized_headers(self, count: int, client: WriterClientBase) -> dict[str, str]:
        headers = super(AgentWriter, self)._get_finalized_headers(count, client)
        headers["X-Datadog-Trace-Count"] = str(count)
        return headers

    def set_test_session_token(self, token: Optional[str]) -> None:
        self._headers["X-Datadog-Test-Session-Token"] = token or ""


class AgentlessTraceWriter(HTTPWriter):
    """
    HTTP writer for agentless JSON span intake. Used when _DD_APM_TRACING_AGENTLESS_ENABLED is true.
    """

    HTTP_METHOD = "POST"
    # Base URL for the agentless trace JSON intake (EvP / track_type:spans).
    INTAKE_HOST = "public-trace-http-intake.logs"
    # Agentless payloads must be under 15 MB.
    MAX_BUFFER_SIZE = 15 << 20  # 15 MB

    def __init__(
        self,
        intake_url: str,
        api_key: str,
        processing_interval: Optional[float] = None,
        buffer_size: Optional[int] = None,
        max_payload_size: Optional[int] = None,
        timeout: Optional[float] = None,
        dogstatsd: Optional["DogStatsd"] = None,
        report_metrics: bool = True,
        sync_mode: bool = False,
        reuse_connections: Optional[bool] = None,
    ) -> None:
        buffer_size = min(buffer_size or config._trace_writer_buffer_size, self.MAX_BUFFER_SIZE)
        max_payload_size = max_payload_size or config._trace_writer_payload_size
        client = AgentlessWriterClient(buffer_size, max_payload_size)
        headers = {
            "Content-Type": client.encoder.content_type,
            "dd-api-key": api_key,
            "Datadog-Meta-Lang": "python",
            "Datadog-Meta-Lang-Version": compat.PYTHON_VERSION,
            "Datadog-Meta-Lang-Interpreter": compat.PYTHON_INTERPRETER,
            "Datadog-Meta-Tracer-Version": __version__,
        }
        super(AgentlessTraceWriter, self).__init__(
            intake_url=intake_url,
            clients=[client],
            processing_interval=processing_interval,
            buffer_size=buffer_size,
            max_payload_size=max_payload_size,
            timeout=timeout,
            dogstatsd=dogstatsd,
            sync_mode=sync_mode,
            reuse_connections=reuse_connections,
            headers=headers,
            report_metrics=report_metrics,
        )

    def recreate(self, appsec_enabled: Optional[bool] = None) -> "AgentlessTraceWriter":
        try:
            self.stop()
        except ServiceStatusError:
            pass
        return self.__class__(
            intake_url=self.intake_url,
            api_key=self._headers["dd-api-key"],
            processing_interval=self._interval,
            buffer_size=self._buffer_size,
            max_payload_size=self._max_payload_size,
            timeout=self._timeout,
            dogstatsd=self.dogstatsd,
            sync_mode=self._sync_mode,
            reuse_connections=self._reuse_connections,
            report_metrics=self._report_metrics,
        )


class NativeWriter(periodic.PeriodicService, TraceWriter, AgentWriterInterface):
    """Writer using a native trace exporter to send traces to an agent."""

    STATSD_NAMESPACE = "tracer"

    def __init__(
        self,
        intake_url: str,
        processing_interval: Optional[float] = None,
        compute_stats_enabled: bool = False,
        # Match the payload size since there is no functionality
        # to flush dynamically.
        buffer_size: Optional[int] = None,
        max_payload_size: Optional[int] = None,
        dogstatsd: Optional["DogStatsd"] = None,
        sync_mode: bool = False,
        api_version: Optional[str] = None,
        report_metrics: bool = True,
        response_callback: Optional[Callable[[AgentResponse], None]] = None,
        test_session_token: Optional[str] = None,
        # Mark stats as computed, without computing them, skipping trace exporter stats computation.
        # This setting overrides the `compute_stats_enabled` parameter.
        stats_opt_out: Optional[bool] = False,
    ) -> None:
        if processing_interval is None:
            processing_interval = config._trace_writer_interval_seconds
        if buffer_size is not None and buffer_size <= 0:
            raise ValueError("Writer buffer size must be positive")
        if max_payload_size is not None and max_payload_size <= 0:
            raise ValueError("Max payload size must be positive")

        # Default to v0.4 if we are on Windows since there is a known compatibility issue
        # https://github.com/DataDog/dd-trace-py/issues/4829
        # DEV: sys.platform on windows should be `win32` or `cygwin`, but using `startswith`
        #      as a safety precaution.
        #      https://docs.python.org/3/library/sys.html#sys.platform
        is_windows = sys.platform.startswith("win") or sys.platform.startswith("cygwin")

        default_api_version = "v0.5"
        if (
            is_windows
            or in_gcp_function()
            or in_azure_function()
            or asm_config._asm_enabled
            or asm_config._iast_enabled
            or ai_guard_config._ai_guard_enabled
        ):
            default_api_version = "v0.4"

        self._api_version = api_version or config._trace_api or default_api_version

        if agent_config.trace_native_span_events:
            log.warning("Setting api version to v0.4; DD_TRACE_NATIVE_SPAN_EVENTS is not compatible with v0.5")
            self._api_version = "v0.4"

        if is_windows and self._api_version == "v0.5":
            raise RuntimeError(
                "There is a known compatibility issue with v0.5 API and Windows, "
                "please see https://github.com/DataDog/dd-trace-py/issues/4829 for more details."
            )

        buffer_size = buffer_size or config._trace_writer_buffer_size
        max_payload_size = max_payload_size or config._trace_writer_payload_size
        if self._api_version not in WRITER_CLIENTS:
            log.warning(
                "Unsupported api version: '%s'. The supported versions are: %r",
                self._api_version,
                ", ".join(sorted(WRITER_CLIENTS.keys())),
            )
            self._api_version = sorted(WRITER_CLIENTS.keys())[-1]
        client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size)

        additional_header_str = os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS")
        if test_session_token is None and additional_header_str is not None:
            additional_header = parse_tags_str(additional_header_str)
            if "X-Datadog-Test-Session-Token" in additional_header:
                test_session_token = additional_header["X-Datadog-Test-Session-Token"]

        super(NativeWriter, self).__init__(interval=processing_interval)
        self.intake_url = intake_url
        self._buffer_size = buffer_size
        self._max_payload_size = max_payload_size
        self._test_session_token = test_session_token

        self._clients = [client]
        self.dogstatsd = dogstatsd
        self._metrics: dict[str, int] = defaultdict(int)
        self._report_metrics = report_metrics
        self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW)
        self._sync_mode = sync_mode
        self._compute_stats_enabled = compute_stats_enabled
        self._response_cb = response_callback
        self._stats_opt_out = stats_opt_out

        before_fork_hook = make_weak_method_hook(self.before_fork_hook)
        self._fork_hook = before_fork_hook
        forksafe.register_before_fork(before_fork_hook)

        self._exporter = self._create_exporter()

    def before_fork_hook(self):
        """
        This hook is used to shut down the native runtime before forking when the service is not running.
        When the PeriodicService is running, the native runtime is shut down by the PeriodicThread logic.
        """
        if self.status != service.ServiceStatus.RUNNING:
            self._exporter.stop_worker()

    def __del__(self):
        if hasattr(self, "_fork_hook") and self._fork_hook:
            forksafe.unregister_before_fork(self._fork_hook)

    def _create_exporter(self) -> native.TraceExporter:
        """
        Create a new TraceExporter with the current configuration.
        :return: A configured TraceExporter instance.
        """
        _, commit_sha, _ = get_git_tags()

        builder = (
            native.TraceExporterBuilder()
            .set_url(self.intake_url)
            .set_hostname(get_hostname())
            .set_language("python")
            .set_language_version(compat.PYTHON_VERSION)
            .set_language_interpreter(compat.PYTHON_INTERPRETER)
            .set_tracer_version(__version__)
            .set_git_commit_sha(commit_sha)
            .set_client_computed_top_level()
            .set_input_format(self._api_version)
            .set_output_format(self._api_version)
        )
        if config.service:
            builder.set_service(config.service)
        if config.env:
            builder.set_env(config.env)
        if config.version:
            builder.set_app_version(config.version)
        if self._test_session_token is not None:
            builder.set_test_session_token(self._test_session_token)
        if self._stats_opt_out:
            builder.set_client_computed_stats()
        elif self._compute_stats_enabled:
            stats_interval = float(os.getenv("_DD_TRACE_STATS_WRITER_INTERVAL") or 10.0)
            bucket_size_ns: int = int(stats_interval * 1e9)
            builder.enable_stats(bucket_size_ns)

        # TODO (APMSP-2204): Enable telemetry for all platforms, currently only enabled for Linux.
        if config._telemetry_enabled and sys.platform.startswith("linux"):
            heartbeat_ms = int(
                config._telemetry_heartbeat_interval * 1000
            )  # Convert DD_TELEMETRY_HEARTBEAT_INTERVAL to milliseconds
            builder.enable_telemetry(heartbeat_ms, get_runtime_id())
        if config._health_metrics_enabled:
            builder.enable_health_metrics()

        return builder.build()

    def set_test_session_token(self, token: Optional[str]) -> None:
        """
        Set the test session token and recreate the exporter with the new configuration.
        :param token: The test session token to use for authentication.
        """
        self._test_session_token = token
        self._exporter.stop_worker()
        self._exporter = self._create_exporter()

    def recreate(self, appsec_enabled: Optional[bool] = None) -> "NativeWriter":
        # Ensure AppSec metadata is encoded by setting the API version to v0.4.
        try:
            # Stop the writer to ensure it is not running while we reconfigure it.
            self.stop()
        except ServiceStatusError:
            # Writers like AgentWriter may not start until the first trace is encoded.
            # Stopping them before that will raise a ServiceStatusError.
            pass

        api_version = "v0.4" if appsec_enabled else self._api_version
        return self.__class__(
            intake_url=self.intake_url,
            processing_interval=self._interval,
            compute_stats_enabled=self._compute_stats_enabled,
            buffer_size=self._buffer_size,
            max_payload_size=self._max_payload_size,
            dogstatsd=self.dogstatsd,
            sync_mode=self._sync_mode,
            api_version=api_version,
            report_metrics=self._report_metrics,
            response_callback=self._response_cb,
            test_session_token=self._test_session_token,
            stats_opt_out=self._stats_opt_out,
        )

    def _downgrade(self, status, client):
        if client.ENDPOINT == "v0.5/traces":
            self._clients = [AgentWriterClientV4(self._buffer_size, self._max_payload_size)]
            self._api_version = "v0.4"
            self._exporter = self._create_exporter()

            # Since we have to change the encoding in this case, the payload
            # would need to be converted to the downgraded encoding before
            # sending it, but we chuck it away instead.
            _safelog(
                log.warning,
                "Calling endpoint '%s' but received %s; downgrading API. "
                "Dropping trace payload due to the downgrade to an incompatible API version (from v0.5 to v0.4). To "
                "avoid this from happening in the future, either ensure that the Datadog agent has a v0.5/traces "
                "endpoint available, or explicitly set the trace API version to, e.g., v0.4.",
                client.ENDPOINT,
                status,
            )
        else:
            _safelog(
                log.error,
                "unsupported endpoint '%s': received response %s from intake (%s)",
                client.ENDPOINT,
                status,
                self.intake_url,
            )

    def _intake_endpoint(self, client=None):
        return "{}/{}".format(self.intake_url, client.ENDPOINT if client else self._endpoint)

    @property
    def _endpoint(self):
        return self._clients[0].ENDPOINT

    @property
    def _encoder(self):
        return self._clients[0].encoder

    def _metrics_dist(self, name: str, count: int = 1, tags: Optional[list] = None) -> None:
        if not self._report_metrics:
            return
        if config._health_metrics_enabled and self.dogstatsd:
            self.dogstatsd.distribution("datadog.%s.%s" % (self.STATSD_NAMESPACE, name), count, tags=tags)

    def _set_drop_rate(self) -> None:
        accepted = self._metrics["accepted_traces"]
        sent = self._metrics["sent_traces"]
        encoded = sum([len(client.encoder) for client in self._clients])
        # The number of dropped traces is the number of accepted traces minus the number of traces in the encoder
        # This calculation is a best effort. Due to race conditions it may result in a slight underestimate.
        dropped = max(accepted - sent - encoded, 0)  # dropped spans should never be negative
        self._drop_sma.set(dropped, accepted)
        self._metrics["sent_traces"] = 0  # reset sent traces for the next interval
        self._metrics["accepted_traces"] = encoded  # sets accepted traces to number of spans in encoders

    def _set_keep_rate(self, trace):
        if trace:
            # PERF: avoid setting via Span.set_metric
            trace[0]._metrics[_KEEP_SPANS_RATE_KEY] = 1.0 - self._drop_sma.get()

    def _send_payload(self, payload: bytes, count: int, client: WriterClientBase):
        try:
            response_body = self._exporter.send(payload)
        except native.RequestError as e:
            try:
                # Request errors are formatted as "Error code: {code}, Response: {response}"
                code = int(str(e).split(",")[0].split(":", maxsplit=1)[1])
            except:  # noqa:E722 if the error message is invalid we want to log the full error
                raise e
            if code == 404 or code == 415:
                self._downgrade(code, client)
            else:
                raise e
        finally:
            self._metrics["sent_traces"] += count

        if self._response_cb:
            response = Response(body=response_body)
            raw_resp = response.get_json()

            if raw_resp and "rate_by_service" in raw_resp:
                self._response_cb(
                    AgentResponse(
                        rate_by_service=raw_resp["rate_by_service"],
                    )
                )

    def write(self, spans: Optional[list["Span"]] = None) -> None:
        for client in self._clients:
            self._write_with_client(client, spans=spans)
        if self._sync_mode:
            self.flush_queue()

    def _write_with_client(self, client: WriterClientBase, spans: Optional[list["Span"]] = None) -> None:
        if spans is None:
            return

        if self._sync_mode is False:
            # Start the Writer on first write.
            try:
                if self.status != service.ServiceStatus.RUNNING:
                    self.start()

            except service.ServiceStatusError:
                _safelog(log.warning, "failed to start writer service")

        self._metrics_dist("writer.accepted.traces")
        self._metrics["accepted_traces"] += 1
        self._set_keep_rate(spans)

        try:
            client.encoder.put(spans)
        except BufferItemTooLarge as e:
            payload_size = e.args[0]
            _safelog(
                log.warning,
                "trace (%db) larger than payload buffer item limit (%db), dropping",
                payload_size,
                client.encoder.max_item_size,
            )
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:t_too_big"])
            self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:t_too_big"])
        except BufferFull as e:
            payload_size = e.args[0]
            _safelog(
                log.warning,
                "trace buffer (%s traces %db/%db) cannot fit trace of size %db, dropping (writer status: %s)",
                len(client.encoder),
                client.encoder.size,
                client.encoder.max_size,
                payload_size,
                self.status.value,
            )
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:full"])
            self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:full"])
        except NoEncodableSpansError:
            self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:incompatible"])
        else:
            self._metrics_dist("buffer.accepted.traces", 1)
            self._metrics_dist("buffer.accepted.spans", len(spans))

    def flush_queue(self, raise_exc: bool = False):
        try:
            for client in self._clients:
                self._flush_queue_with_client(client, raise_exc=raise_exc)
        finally:
            self._set_drop_rate()

    def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
        n_traces = len(client.encoder)
        try:
            if not (encoded_traces := client.encoder.encode()):
                return
        except Exception:
            # FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
            _safelog(log.error, "failed to encode trace with encoder %r", client.encoder, exc_info=True)
            self._metrics_dist("encoder.dropped.traces", n_traces)
            return

        for payload in encoded_traces:
            encoded_data, n_traces = payload
            self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc)

    def _flush_single_payload(
        self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False
    ) -> None:
        if encoded is None:
            return
        try:
            self._send_payload(encoded, n_traces, client)
        except Exception as e:
            if raise_exc:
                raise

            msg = "failed to send, dropping %d traces to intake at %s: %s"
            log_args = (
                n_traces,
                self._intake_endpoint(client),
                str(e),
            )
            # Append the payload if requested
            if config._trace_writer_log_err_payload:
                msg += ", payload %s"
                log_args += (binascii.hexlify(encoded).decode(),)  # type: ignore

            _safelog(log.error, msg, *log_args, extra={"send_to_telemetry": False})

    def periodic(self):
        self.flush_queue(raise_exc=False)

    def _stop_service(
        self,
        timeout: Optional[float] = None,
    ) -> None:
        try:
            # FIXME: don't join() on stop(), let the caller handle this
            super(NativeWriter, self)._stop_service()
            self.join(timeout=timeout)
        # Native threads should be stopped even if the writer is not running
        finally:
            self._exporter.stop_worker()
            if self._fork_hook:
                forksafe.unregister_before_fork(self._fork_hook)
                self._fork_hook = None

    def _start_service(self, *args, **kwargs):
        super()._start_service(*args, **kwargs)

        def _before_fork(worker: periodic.PeriodicThread) -> None:
            super(periodic.PeriodicThread, worker)._before_fork()
            super(periodic.PeriodicThread, worker).join()
            self._exporter.stop_worker()

        assert self._worker is not None  # nosec

        self._worker._before_fork = _before_fork.__get__(self._worker, type(self._worker))

    def on_shutdown(self):
        try:
            self.periodic()
        finally:
            self._exporter.shutdown(3_000_000_000)  # 3 seconds timeout


def _use_log_writer() -> bool:
    """Returns whether the LogWriter should be used in the environment by
    default.

    The LogWriter is required by default in AWS Lambdas when the Datadog Agent extension
    is not available in the Lambda.
    """
    if (
        os.environ.get("DD_AGENT_HOST")
        or os.environ.get("DATADOG_TRACE_AGENT_HOSTNAME")
        or os.environ.get("DD_TRACE_AGENT_URL")
    ):
        # If one of these variables are set, we definitely have an agent
        return False
    elif in_aws_lambda() and has_aws_lambda_agent_extension():
        # If the Agent Lambda extension is available then an AgentWriter is used.
        return False
    elif in_gcp_function() or in_azure_function():
        return False
    else:
        return in_aws_lambda()


def _use_sync_mode() -> bool:
    """Returns, if an `AgentWriter` is to be used, whether it should be run
     in synchronous mode by default.

    There are only three cases in which this is desirable:

    - AWS Lambdas can have the Datadog agent installed via an extension.
      When it's available traces must be sent synchronously to ensure all
      are received before the Lambda terminates.
    - Google Cloud Functions and Azure Functions have a mini-agent spun up by the tracer.
      Similarly to AWS Lambdas, sync mode should be used to avoid data loss.
    - Ray Job run different processes that can be killed at any time. Traces must be sent
      synchronously to ensure all are received before an actor/a worker is killed
    """
    return (
        (in_aws_lambda() and has_aws_lambda_agent_extension())
        or in_gcp_function()
        or in_azure_function()
        or in_ray_job()
    )


def create_trace_writer(response_callback: Optional[Callable[[AgentResponse], None]] = None) -> TraceWriter:
    if _use_log_writer():
        return LogWriter()

    if config._trace_agentless_enabled:
        if config._dd_api_key:
            intake_url = "https://{}.{}".format(AgentlessTraceWriter.INTAKE_HOST, config._dd_site)
            verify_url(intake_url)
            return AgentlessTraceWriter(
                intake_url=intake_url,
                api_key=config._dd_api_key,
                dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
                sync_mode=_use_sync_mode(),
                report_metrics=not asm_config._apm_opt_out,
            )
        log.warning("APM Agentless enabled but DD_API_KEY is not set. Agentless mode will be disabled.")

    verify_url(agent_config.trace_agent_url)

    if config._trace_writer_native:
        return NativeWriter(
            intake_url=agent_config.trace_agent_url,
            dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
            sync_mode=_use_sync_mode(),
            compute_stats_enabled=config._trace_compute_stats,
            report_metrics=not asm_config._apm_opt_out,
            response_callback=response_callback,
            stats_opt_out=asm_config._apm_opt_out,
        )
    else:
        headers: dict[str, str] = {}
        if config._trace_compute_stats or asm_config._apm_opt_out:
            headers["Datadog-Client-Computed-Stats"] = "yes"

        return AgentWriter(
            intake_url=agent_config.trace_agent_url,
            dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
            sync_mode=_use_sync_mode(),
            headers=headers,
            report_metrics=not asm_config._apm_opt_out,
            response_callback=response_callback,
        )
