import asyncio
import csv
from dataclasses import dataclass
from dataclasses import field
import inspect
import json
import math
import os
import sys
import time
from typing import Any
from typing import Callable
from typing import Literal
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast
import urllib.parse

import ddtrace
from ddtrace import config
from ddtrace import patch
from ddtrace._trace.apm_filter import APMTracingEnabledFilter
from ddtrace._trace.context import Context
from ddtrace._trace.span import Span
from ddtrace._trace.tracer import Tracer
from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
from ddtrace.constants import ERROR_TYPE
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal import core
from ddtrace.internal import forksafe
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.logger import get_logger
from ddtrace.internal.native import generate_128bit_trace_id
from ddtrace.internal.native import rand64bits
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import Service
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.telemetry import get_config as _get_config
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.threads import RLock
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.llmobs import _constants as constants
from ddtrace.llmobs import _telemetry as telemetry
from ddtrace.llmobs._constants import AGENT_MANIFEST
from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID
from ddtrace.llmobs._constants import DECORATOR
from ddtrace.llmobs._constants import DEFAULT_PROJECT_NAME
from ddtrace.llmobs._constants import DEFAULT_PROMPTS_CACHE_TTL
from ddtrace.llmobs._constants import DEFAULT_PROMPTS_TIMEOUT
from ddtrace.llmobs._constants import DISPATCH_ON_GUARDRAIL_SPAN_START
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_SPAN_FINISH
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_OPENAI_AGENT_SPAN_FINISH
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import EXPERIMENT_CONFIG
from ddtrace.llmobs._constants import EXPERIMENT_CSV_FIELD_MAX_SIZE
from ddtrace.llmobs._constants import EXPERIMENT_DATASET_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_EXPECTED_OUTPUT
from ddtrace.llmobs._constants import EXPERIMENT_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_PROJECT_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_PROJECT_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ITERATION_KEY
from ddtrace.llmobs._constants import EXPERIMENTS_INPUT
from ddtrace.llmobs._constants import EXPERIMENTS_OUTPUT
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PROMPT
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import INSTRUMENTATION_METHOD_ANNOTATED
from ddtrace.llmobs._constants import INTEGRATION
from ddtrace.llmobs._constants import LLMOBS_STRUCT
from ddtrace.llmobs._constants import LLMOBS_TRACE_ID
from ddtrace.llmobs._constants import MCP_TOOL_CALL_INTENT
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import ML_APP
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_DOCUMENTS
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROMPT_TRACKING_INSTRUMENTATION_METHOD
from ddtrace.llmobs._constants import PROPAGATED_LLMOBS_TRACE_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_ML_APP_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._constants import ROOT_PARENT_ID
from ddtrace.llmobs._constants import SESSION_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_LINKS
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._constants import TOOL_DEFINITIONS
from ddtrace.llmobs._context import LLMObsContextProvider
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._experiment import AsyncEvaluatorType
from ddtrace.llmobs._experiment import AsyncSummaryEvaluatorType
from ddtrace.llmobs._experiment import AsyncTaskType
from ddtrace.llmobs._experiment import BaseAsyncEvaluator
from ddtrace.llmobs._experiment import BaseAsyncSummaryEvaluator
from ddtrace.llmobs._experiment import BaseEvaluator
from ddtrace.llmobs._experiment import BaseSummaryEvaluator
from ddtrace.llmobs._experiment import ConfigType
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import DatasetRecordInputType
from ddtrace.llmobs._experiment import EvaluatorType
from ddtrace.llmobs._experiment import Experiment
from ddtrace.llmobs._experiment import ExperimentResult
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._experiment import Project
from ddtrace.llmobs._experiment import SummaryEvaluatorType
from ddtrace.llmobs._experiment import SyncExperiment
from ddtrace.llmobs._experiment import TaskType
from ddtrace.llmobs._experiment import _deep_eval_async_evaluator_wrapper
from ddtrace.llmobs._experiment import _deep_eval_evaluator_wrapper
from ddtrace.llmobs._experiment import _get_base_url
from ddtrace.llmobs._experiment import _is_deep_eval_evaluator
from ddtrace.llmobs._prompt_optimization import PromptOptimization
from ddtrace.llmobs._prompt_optimization import validate_dataset
from ddtrace.llmobs._prompt_optimization import validate_dataset_split
from ddtrace.llmobs._prompt_optimization import validate_evaluators
from ddtrace.llmobs._prompt_optimization import validate_optimization_task
from ddtrace.llmobs._prompt_optimization import validate_task
from ddtrace.llmobs._prompt_optimization import validate_test_dataset
from ddtrace.llmobs._prompts import ManagedPrompt
from ddtrace.llmobs._prompts.cache import WarmCache
from ddtrace.llmobs._prompts.manager import PromptManager
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import _batched
from ddtrace.llmobs._utils import _get_llmobs_data_metastruct
from ddtrace.llmobs._utils import _get_ml_app
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.llmobs._utils import _get_parent_prompt
from ddtrace.llmobs._utils import _get_session_id
from ddtrace.llmobs._utils import _get_span_kind
from ddtrace.llmobs._utils import _get_span_name
from ddtrace.llmobs._utils import _is_evaluation_span
from ddtrace.llmobs._utils import _validate_prompt
from ddtrace.llmobs._utils import add_span_link
from ddtrace.llmobs._utils import enforce_message_role
from ddtrace.llmobs._utils import get_span_links
from ddtrace.llmobs._utils import safe_json
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs._writer import LLMObsSpanData
from ddtrace.llmobs._writer import LLMObsSpanEvent
from ddtrace.llmobs._writer import LLMObsSpanWriter
from ddtrace.llmobs._writer import should_use_agentless
from ddtrace.llmobs.types import ExportedLLMObsSpan
from ddtrace.llmobs.types import Message
from ddtrace.llmobs.types import Prompt
from ddtrace.llmobs.types import PromptFallback
from ddtrace.llmobs.types import _ErrorField
from ddtrace.llmobs.types import _Meta
from ddtrace.llmobs.types import _MetaIO
from ddtrace.llmobs.types import _SpanField
from ddtrace.llmobs.utils import Documents
from ddtrace.llmobs.utils import Messages
from ddtrace.llmobs.utils import extract_tool_definitions
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.version import __version__


log = get_logger(__name__)


SUPPORTED_LLMOBS_INTEGRATIONS = {
    "anthropic": "anthropic",
    "bedrock": "botocore",
    "openai": "openai",
    "langchain": "langchain",
    "google_adk": "google_adk",
    "google_genai": "google_genai",
    "vertexai": "vertexai",
    "langgraph": "langgraph",
    "litellm": "litellm",
    "crewai": "crewai",
    "openai_agents": "openai_agents",
    "mcp": "mcp",
    "pydantic_ai": "pydantic_ai",
    "claude_agent_sdk": "claude_agent_sdk",
    # requests/concurrent frameworks for distributed injection/extraction
    "requests": "requests",
    "httpx": "httpx",
    "urllib3": "urllib3",
    "grpc": "grpc",
    "flask": "flask",
    "starlette": "starlette",
    "fastapi": "fastapi",
    "aiohttp": "aiohttp",
    "asyncio": "asyncio",
    "futures": "futures",
}

# Constants for validation
_TASK_REQUIRED_PARAMS = {"input_data", "config"}
_EVALUATOR_REQUIRED_PARAMS = ("input_data", "output_data", "expected_output")
_SUMMARY_EVALUATOR_REQUIRED_PARAMS = (
    "inputs",
    "outputs",
    "expected_outputs",
    "evaluators_results",
)


def _validate_task_signature(task: Callable, is_async: bool) -> None:
    if not callable(task):
        raise TypeError("task must be a callable function.")
    if is_async and not asyncio.iscoroutinefunction(task):
        raise TypeError("task must be an async function (coroutine function).")
    sig = inspect.signature(task)
    params = sig.parameters
    if not all(param in params for param in _TASK_REQUIRED_PARAMS):
        raise TypeError("Task function must have 'input_data' and 'config' parameters.")


def _validate_evaluator_signature(evaluator: Any, is_async: bool) -> None:
    valid_base_classes: tuple[type, ...] = (BaseEvaluator,)
    if is_async:
        # async experiment allows both sync and async evaluators
        valid_base_classes = (BaseEvaluator, BaseAsyncEvaluator)

    if isinstance(evaluator, valid_base_classes):
        return

    if _is_deep_eval_evaluator(evaluator):
        return

    if not callable(evaluator):
        if is_async:
            raise TypeError(
                f"Evaluator {evaluator} must be callable or an instance of BaseEvaluator/BaseAsyncEvaluator."
            )
        else:
            raise TypeError(f"Evaluator {evaluator} must be callable or an instance of BaseEvaluator.")

    sig = inspect.signature(evaluator)
    params = sig.parameters
    if not all(param in params for param in _EVALUATOR_REQUIRED_PARAMS):
        raise TypeError("Evaluator function must have parameters {}.".format(tuple(_EVALUATOR_REQUIRED_PARAMS)))


def _validate_summary_evaluator_signature(evaluator: Any, is_async: bool) -> None:
    valid_base_classes: tuple[type, ...] = (BaseSummaryEvaluator,)
    if is_async:
        # async experiment allows both sync and async summary evaluators
        valid_base_classes = (BaseSummaryEvaluator, BaseAsyncSummaryEvaluator)

    if isinstance(evaluator, valid_base_classes):
        return

    if not callable(evaluator):
        if is_async:
            raise TypeError(
                f"Summary evaluator {evaluator} must be callable "
                "or an instance of BaseSummaryEvaluator/BaseAsyncSummaryEvaluator."
            )
        else:
            raise TypeError(f"Summary evaluator {evaluator} must be callable or an instance of BaseSummaryEvaluator.")

    sig = inspect.signature(evaluator)
    params = sig.parameters
    if not all(param in params for param in _SUMMARY_EVALUATOR_REQUIRED_PARAMS):
        raise TypeError(
            "Summary evaluator function must have parameters {}.".format(tuple(_SUMMARY_EVALUATOR_REQUIRED_PARAMS))
        )


class LLMObsExportSpanError(Exception):
    """Error raised when exporting a span."""

    pass


class LLMObsAnnotateSpanError(Exception):
    """Error raised when annotating a span."""

    pass


class LLMObsSubmitEvaluationError(Exception):
    """Error raised when submitting an evaluation."""

    pass


class LLMObsInjectDistributedHeadersError(Exception):
    """Error raised when injecting distributed headers."""

    pass


class LLMObsActivateDistributedHeadersError(Exception):
    """Error raised when activating distributed headers."""

    pass


@dataclass
class LLMObsSpan:
    """LLMObs span object.

    Passed to the `span_processor` function in the `enable` or `register_processor` methods.

    Example::
        def span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
            # Modify input/output
            if span.get_tag("omit_span") == "1":
                return None
            if span.get_tag("no_input") == "1":
                span.input = []
            return span
    """

    input: list[Message] = field(default_factory=list)
    output: list[Message] = field(default_factory=list)
    _tags: dict[str, str] = field(default_factory=dict)

    def get_tag(self, key: str) -> Optional[str]:
        """Get a tag from the span.

        :param str key: The key of the tag to get.
        :return: The value of the tag or None if the tag does not exist.
        :rtype: Optional[str]
        """
        return self._tags.get(key)


def _build_llmobs_span(
    span_kind: str,
    llmobs_input: _MetaIO,
    llmobs_output: _MetaIO,
) -> tuple[LLMObsSpan, Literal["value", "messages", ""], Literal["value", "messages", ""]]:
    """Build an LLMObsSpan populated for the user span processor.

    Routes input/output to messages or value depending on span kind.
    Returns (llmobs_span, input_type, output_type).
    """
    llmobs_span = LLMObsSpan()
    input_type: Literal["value", "messages", ""] = ""
    output_type: Literal["value", "messages", ""] = ""

    input_value = llmobs_input.get(LLMOBS_STRUCT.VALUE)
    if input_value is not None:
        input_type = "value"
        llmobs_span.input = [Message(content=safe_json(input_value, ensure_ascii=False) or "", role="")]

    input_messages = llmobs_input.get(LLMOBS_STRUCT.MESSAGES)
    if span_kind == "llm" and input_messages is not None:
        input_type = "messages"
        llmobs_span.input = enforce_message_role(input_messages)

    output_value = llmobs_output.get(LLMOBS_STRUCT.VALUE)
    if output_value is not None:
        output_type = "value"
        llmobs_span.output = [Message(content=safe_json(output_value, ensure_ascii=False) or "", role="")]

    output_messages = llmobs_output.get(LLMOBS_STRUCT.MESSAGES)
    if span_kind == "llm" and output_messages is not None:
        output_type = "messages"
        llmobs_span.output = enforce_message_role(output_messages)

    return llmobs_span, input_type, output_type


def _build_span_meta(
    span: Span,
    llmobs_span: LLMObsSpan,
    llmobs_meta: _Meta,
    span_kind: str,
    input_type: Literal["value", "messages", ""],
    output_type: Literal["value", "messages", ""],
) -> _Meta:
    """Build and return the full meta dict for a span event."""
    llmobs_input: _MetaIO = llmobs_meta.get(LLMOBS_STRUCT.INPUT) or _MetaIO()
    llmobs_output: _MetaIO = llmobs_meta.get(LLMOBS_STRUCT.OUTPUT) or _MetaIO()
    meta = _Meta(
        span=_SpanField(kind=span_kind),
        input=llmobs_input,
        output=llmobs_output,
        model_name=llmobs_meta.get(LLMOBS_STRUCT.MODEL_NAME) or "",
        model_provider=(llmobs_meta.get(LLMOBS_STRUCT.MODEL_PROVIDER) or "custom").lower(),
        metadata=llmobs_meta.get(LLMOBS_STRUCT.METADATA) or {},
        tool_definitions=llmobs_meta.get(LLMOBS_STRUCT.TOOL_DEFINITIONS) or [],
        intent=str(llmobs_meta.get(LLMOBS_STRUCT.INTENT) or ""),
        error=_ErrorField(
            message=span.get_tag(ERROR_MSG) or "",
            stack=span.get_tag(ERROR_STACK) or "",
            type=span.get_tag(ERROR_TYPE) or "",
        ),
    )

    input_prompt = llmobs_input.get(LLMOBS_STRUCT.PROMPT)
    if input_prompt is not None and span_kind != "llm":
        log.warning("Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds.")
        meta["input"].pop(LLMOBS_STRUCT.PROMPT, None)
    elif input_prompt is None and span_kind == "llm":
        parent_prompt = _get_parent_prompt(span)
        if parent_prompt is not None:
            meta["input"]["prompt"] = parent_prompt

    expected_output = llmobs_meta.get(LLMOBS_STRUCT.EXPECTED_OUTPUT)
    if span.context.get_baggage_item(EXPERIMENT_ID_KEY) and span_kind == "experiment" and expected_output is not None:
        meta["expected_output"] = expected_output

    if input_type == "messages":
        meta["input"]["messages"] = llmobs_span.input
    elif input_type == "value" and llmobs_span.input:
        meta["input"]["value"] = llmobs_span.input[0].get("content", "")
    if output_type == "messages":
        meta["output"]["messages"] = llmobs_span.output
    elif output_type == "value" and llmobs_span.output:
        meta["output"]["value"] = llmobs_span.output[0].get("content", "")

    return meta


class LLMObs(Service):
    _instance = None  # type: LLMObs
    enabled = False
    _app_key: str = os.getenv("DD_APP_KEY", "")
    _project_name: str = os.getenv("DD_LLMOBS_PROJECT_NAME", DEFAULT_PROJECT_NAME)

    def __init__(
        self,
        tracer: Optional[Tracer] = None,
        span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
    ) -> None:
        super(LLMObs, self).__init__()
        self.tracer = tracer or ddtrace.tracer
        self._llmobs_context_provider = LLMObsContextProvider()
        self._user_span_processor = span_processor
        agentless_enabled = config._llmobs_agentless_enabled if config._llmobs_agentless_enabled is not None else True
        self._llmobs_span_writer = LLMObsSpanWriter(
            interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
            timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
            is_agentless=agentless_enabled,
        )
        self._llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
            interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
            timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
            is_agentless=agentless_enabled,
        )
        self._evaluator_runner = EvaluatorRunner(
            interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
            llmobs_service=self,
        )
        self._dne_client = LLMObsExperimentsClient(
            interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
            timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
            _app_key=self._app_key,
            _default_project=Project(name=self._project_name, _id=""),
            is_agentless=True,  # agent proxy doesn't seem to work for experiments
        )

        forksafe.register(self._child_after_fork)

        self._link_tracker = LinkTracker()
        self._annotations: list[tuple[str, str, dict[str, Any]]] = []
        self._annotation_context_lock = RLock()

    def _on_span_start(self, span: Span) -> None:
        if self.enabled and span.span_type == SpanTypes.LLM:
            self._activate_llmobs_span(span)
            telemetry.record_span_started()
            self._do_annotations(span)

    def _on_span_finish(self, span: Span) -> None:
        if self.enabled and span.span_type == SpanTypes.LLM:
            self._submit_llmobs_span(span)
            telemetry.record_span_created(span)

    def _submit_llmobs_span(self, span: Span) -> None:
        """Generate and submit an LLMObs span event to be sent to LLMObs."""
        span_event = None
        try:
            span_event = self._llmobs_span_event(span)
            if span_event is None:
                return
            self._llmobs_span_writer.enqueue(span_event)
        except (KeyError, TypeError, ValueError):
            log.error(
                "Error generating LLMObs span event for span %s, likely due to malformed span",
                span,
                exc_info=True,
            )
        finally:
            if span_event and span._get_ctx_item(SPAN_KIND) == "llm" and not _is_evaluation_span(span):
                if self._evaluator_runner:
                    self._evaluator_runner.enqueue(span_event, span)

    def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]:
        """Generate LLMObs span event using either the meta_struct path or the legacy _store path."""
        llmobs_data = _get_llmobs_data_metastruct(span)
        if llmobs_data:
            return self._build_span_event_from_meta_struct(span, llmobs_data)
        return self._build_span_event_from_ctx_items(span)

    def _apply_user_span_processor(self, llmobs_span: LLMObsSpan, llmobs_data: LLMObsSpanData) -> Optional[LLMObsSpan]:
        """Run the user span processor.

        Returns the possibly mutated span, or None if the span should be dropped.
        On error, logs and returns the original span unchanged.
        """
        if self._user_span_processor is None:
            return llmobs_span
        error = False
        try:
            llmobs_span._tags = cast(dict[str, str], llmobs_data.get(LLMOBS_STRUCT.TAGS, {}))
            result = self._user_span_processor(llmobs_span)
            if result is None:
                return None
            if not isinstance(result, LLMObsSpan):
                raise TypeError("User span processor must return an LLMObsSpan or None, got %r" % type(result))
            return result
        except Exception as e:
            log.error("Error in LLMObs span processor (%r): %r", self._user_span_processor, e)
            error = True
            return llmobs_span
        finally:
            telemetry.record_llmobs_user_processor_called(error)

    def _build_span_event_from_meta_struct(self, span: Span, llmobs_data: LLMObsSpanData) -> Optional[LLMObsSpanEvent]:
        llmobs_meta = llmobs_data.get(LLMOBS_STRUCT.META) or _Meta()
        llmobs_input = llmobs_meta.get(LLMOBS_STRUCT.INPUT) or _MetaIO()
        llmobs_output = llmobs_meta.get(LLMOBS_STRUCT.OUTPUT) or _MetaIO()

        span_kind = _get_span_kind(span)
        if not span_kind:
            raise KeyError("Span kind not found in span context")

        ml_app = _get_ml_app(span)
        if ml_app is None:
            raise ValueError(
                "ML app is required for sending LLM Observability data. "
                "Ensure this configuration is set before running your application."
            )
        span._set_ctx_item(ML_APP, ml_app)

        parent_id = llmobs_data.get(LLMOBS_STRUCT.PARENT_ID) or ROOT_PARENT_ID
        llmobs_trace_id = llmobs_data.get(LLMOBS_STRUCT.TRACE_ID)
        if llmobs_trace_id is None:
            raise ValueError("Failed to extract LLMObs trace ID from span context.")

        if span_kind == "llm":
            core.dispatch(DISPATCH_ON_LLM_SPAN_FINISH, (span,))

        llmobs_span, input_type, output_type = _build_llmobs_span(span_kind, llmobs_input, llmobs_output)
        user_processed_span = self._apply_user_span_processor(llmobs_span, llmobs_data)
        if user_processed_span is None:
            return None
        llmobs_span = user_processed_span

        # Wait to build meta until after user processors apply and potentially mutate I/O
        meta = _build_span_meta(span, llmobs_span, llmobs_meta, span_kind, input_type, output_type)
        metrics = llmobs_data.get(LLMOBS_STRUCT.METRICS) or {}
        session_id = _get_session_id(span)
        tags = self._llmobs_tags(span, ml_app, session_id, True, llmobs_data)
        span_links = get_span_links(span)
        _dd_attrs = {
            "span_id": str(span.span_id),
            "trace_id": format_trace_id(span.trace_id),
            "apm_trace_id": format_trace_id(span.trace_id),
        }
        if span.context.get_baggage_item(EXPERIMENT_ID_KEY):
            _dd_attrs["scope"] = "experiments"

        llmobs_span_event: LLMObsSpanEvent = {
            "trace_id": llmobs_trace_id,
            "span_id": str(span.span_id),
            "parent_id": parent_id,
            "name": _get_span_name(span),
            "start_ns": span.start_ns,
            "duration": cast(int, span.duration_ns),
            "status": "error" if span.error else "ok",
            "meta": meta,
            "metrics": metrics,
            "session_id": session_id or "",
            "tags": tags,
            "span_links": span_links,
            "_dd": _dd_attrs,
        }

        experiment_config = llmobs_data.get(LLMOBS_STRUCT.CONFIG)
        if experiment_config:
            llmobs_span_event["config"] = experiment_config

        return llmobs_span_event

    def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEvent]:
        """Build span event from ctx_item data (legacy path for backward compatibility)."""
        span_kind = span._get_ctx_item(SPAN_KIND)
        if not span_kind:
            raise KeyError("Span kind not found in span context")

        if span_kind == "llm":
            core.dispatch(DISPATCH_ON_LLM_SPAN_FINISH, (span,))

        llmobs_span = LLMObsSpan()
        _dd_attrs = {
            "span_id": str(span.span_id),
            "trace_id": format_trace_id(span.trace_id),
            "apm_trace_id": format_trace_id(span.trace_id),
        }

        meta = _Meta(span=_SpanField(kind=span_kind), input=_MetaIO(), output=_MetaIO())
        if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None:
            meta["model_name"] = span._get_ctx_item(MODEL_NAME) or ""
            meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower()
        metadata = span._get_ctx_item(METADATA) or {}
        if span_kind == "agent" and span._get_ctx_item(AGENT_MANIFEST) is not None:
            metadata_dd = _dd_val if isinstance(_dd_val := metadata.get("_dd"), dict) else {}
            metadata_dd["agent_manifest"] = span._get_ctx_item(AGENT_MANIFEST)
            metadata["_dd"] = metadata_dd
        meta["metadata"] = metadata

        input_type: Literal["value", "messages", ""] = ""
        output_type: Literal["value", "messages", ""] = ""
        if span._get_ctx_item(INPUT_VALUE) is not None:
            input_type = "value"
            llmobs_span.input = [
                Message(
                    content=safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False) or "",
                    role="",
                )
            ]

        if span.context.get_baggage_item(EXPERIMENT_ID_KEY):
            _dd_attrs["scope"] = "experiments"
            if span_kind == "experiment":
                expected_output = span._get_ctx_item(EXPERIMENT_EXPECTED_OUTPUT)
                if expected_output:
                    meta["expected_output"] = expected_output

                input_data = span._get_ctx_item(EXPERIMENTS_INPUT)
                if input_data:
                    meta["input"] = input_data

                output_data = span._get_ctx_item(EXPERIMENTS_OUTPUT)
                if output_data:
                    meta["output"] = output_data

        input_messages = span._get_ctx_item(INPUT_MESSAGES)
        if span_kind == "llm" and input_messages is not None:
            input_type = "messages"
            llmobs_span.input = cast(list[Message], enforce_message_role(input_messages))

        if span._get_ctx_item(OUTPUT_VALUE) is not None:
            output_type = "value"
            llmobs_span.output = [
                Message(
                    content=safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False) or "",
                    role="",
                )
            ]

        output_messages = span._get_ctx_item(OUTPUT_MESSAGES)
        if span_kind == "llm" and output_messages is not None:
            output_type = "messages"
            llmobs_span.output = cast(list[Message], enforce_message_role(output_messages))

        if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None:
            meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) or []
        if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None:
            meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) or []

        if span._get_ctx_item(INPUT_PROMPT) is not None:
            prompt_json_str = span._get_ctx_item(INPUT_PROMPT)
            if span_kind != "llm":
                log.warning(
                    "Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds."
                )
            else:
                prompt_dict = cast(Prompt, prompt_json_str)
                meta["input"]["prompt"] = prompt_dict
        elif span_kind == "llm":
            parent_span = _get_nearest_llmobs_ancestor(span)
            if parent_span is not None:
                parent_llmobs_data = _get_llmobs_data_metastruct(parent_span)
                if parent_llmobs_data:
                    parent_llmobs_input = parent_llmobs_data.get(LLMOBS_STRUCT.META, {}).get(LLMOBS_STRUCT.INPUT, {})
                    parent_prompt = (
                        parent_llmobs_input.get(LLMOBS_STRUCT.PROMPT) if isinstance(parent_llmobs_input, dict) else None
                    )
                else:
                    parent_prompt = parent_span._get_ctx_item(INPUT_PROMPT)
                if parent_prompt is not None:
                    meta["input"]["prompt"] = parent_prompt

        if span._get_ctx_item(TOOL_DEFINITIONS) is not None:
            meta["tool_definitions"] = span._get_ctx_item(TOOL_DEFINITIONS) or []
        intent = span._get_ctx_item(MCP_TOOL_CALL_INTENT)
        if intent is not None:
            meta["intent"] = str(intent)
        if span.error:
            meta["error"] = _ErrorField(
                message=span.get_tag(ERROR_MSG) or "",
                stack=span.get_tag(ERROR_STACK) or "",
                type=span.get_tag(ERROR_TYPE) or "",
            )

        if self._user_span_processor:
            error = False
            try:
                llmobs_span._tags = cast(dict[str, str], span._get_ctx_item(TAGS))
                user_llmobs_span = self._user_span_processor(llmobs_span)
                if user_llmobs_span is None:
                    return None
                if not isinstance(user_llmobs_span, LLMObsSpan):
                    raise TypeError(
                        "User span processor must return an LLMObsSpan or None, got %r" % type(user_llmobs_span)
                    )
                llmobs_span = user_llmobs_span
            except Exception as e:
                log.error(
                    "Error in LLMObs span processor (%r): %r",
                    self._user_span_processor,
                    e,
                )
                error = True
            finally:
                telemetry.record_llmobs_user_processor_called(error)

        if llmobs_span.input is not None:
            if input_type == "messages":
                meta["input"]["messages"] = llmobs_span.input
            elif input_type == "value":
                meta["input"]["value"] = llmobs_span.input[0].get("content", "")
        if llmobs_span.output is not None:
            if output_type == "messages":
                meta["output"]["messages"] = llmobs_span.output
            elif output_type == "value":
                meta["output"]["value"] = llmobs_span.output[0].get("content", "")

        if not meta["input"]:
            meta.pop("input")
        if not meta["output"]:
            meta.pop("output")
        metrics = span._get_ctx_item(METRICS) or {}
        ml_app = _get_ml_app(span)

        if ml_app is None:
            raise ValueError(
                "ML app is required for sending LLM Observability data. "
                "Ensure this configuration is set before running your application."
            )

        span._set_ctx_item(ML_APP, ml_app)
        parent_id = span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID

        llmobs_trace_id = span._get_ctx_item(LLMOBS_TRACE_ID)
        if llmobs_trace_id is None:
            raise ValueError("Failed to extract LLMObs trace ID from span context.")

        llmobs_span_event: LLMObsSpanEvent = {
            "trace_id": format_trace_id(llmobs_trace_id),
            "span_id": str(span.span_id),
            "parent_id": parent_id,
            "name": _get_span_name(span),
            "start_ns": span.start_ns,
            "duration": cast(int, span.duration_ns),
            "status": "error" if span.error else "ok",
            "meta": meta,
            "metrics": metrics,
            "tags": [],
            "_dd": _dd_attrs,
        }
        session_id = _get_session_id(span)
        if session_id is not None:
            span._set_ctx_item(SESSION_ID, session_id)
            llmobs_span_event["session_id"] = session_id

        llmobs_span_event["tags"] = self._llmobs_tags(span, ml_app, session_id, False, None)

        span_links = span._get_ctx_item(SPAN_LINKS)
        if isinstance(span_links, list) and span_links:
            llmobs_span_event["span_links"] = span_links

        experiment_config = span._get_ctx_item(EXPERIMENT_CONFIG)
        if experiment_config:
            llmobs_span_event["config"] = experiment_config

        return llmobs_span_event

    @staticmethod
    def _llmobs_tags(
        span: Span,
        ml_app: str,
        session_id: Optional[str] = None,
        use_meta_struct: bool = False,
        llmobs_data: Optional[LLMObsSpanData] = None,
    ) -> list[str]:
        dd_tags = config.tags
        tags = {
            **dd_tags,
            "version": config.version or "",
            "env": config.env or "",
            "service": span.service or "",
            "source": "integration",
            "ml_app": ml_app,
            "ddtrace.version": __version__,
            "language": "python",
            "error": span.error,
        }
        err_type = span.get_tag(ERROR_TYPE)
        if err_type:
            tags["error_type"] = err_type
        if session_id:
            tags["session_id"] = session_id

        existing_tags: Optional[dict[str, str]] = None
        if use_meta_struct and llmobs_data:
            llmobs_tags = llmobs_data.get(LLMOBS_STRUCT.TAGS, {})
            if llmobs_tags.get("integration"):
                tags["integration"] = llmobs_tags.get("integration")
            existing_tags = llmobs_tags
        else:
            if span._get_ctx_item(INTEGRATION):
                tags["integration"] = span._get_ctx_item(INTEGRATION)
            existing_tags = span._get_ctx_item(TAGS)

        if _is_evaluation_span(span):
            tags[constants.RUNNER_IS_INTEGRATION_SPAN_TAG] = "ragas"
        if existing_tags is not None:
            tags.update(existing_tags)

        # set experiment tags on children spans if the tags do not already exist
        experiment_id = span.context.get_baggage_item(EXPERIMENT_ID_KEY)
        if experiment_id and "experiment_id" not in tags:
            tags["experiment_id"] = experiment_id

        run_id = span.context.get_baggage_item(EXPERIMENT_RUN_ID_KEY)
        if run_id and "run_id" not in tags:
            tags["run_id"] = run_id

        run_iteration = span.context.get_baggage_item(EXPERIMENT_RUN_ITERATION_KEY)
        if run_iteration and "run_iteration" not in tags:
            tags["run_iteration"] = run_iteration

        dataset_name = span.context.get_baggage_item(EXPERIMENT_DATASET_NAME_KEY)
        if dataset_name and "dataset_name" not in tags:
            tags["dataset_name"] = dataset_name

        project_name = span.context.get_baggage_item(EXPERIMENT_PROJECT_NAME_KEY)
        if project_name and "project_name" not in tags:
            tags["project_name"] = project_name

        project_id = span.context.get_baggage_item(EXPERIMENT_PROJECT_ID_KEY)
        if project_id and "project_id" not in tags:
            tags["project_id"] = project_id

        experiment_name = span.context.get_baggage_item(EXPERIMENT_NAME_KEY)
        if experiment_name and "experiment_name" not in tags:
            tags["experiment_name"] = experiment_name

        return ["{}:{}".format(k, v) for k, v in tags.items()]

    def _do_annotations(self, span: Span) -> None:
        # get the current span context
        # only do the annotations if it matches the context
        if span.span_type != SpanTypes.LLM:  # do this check to avoid the warning log in `annotate`
            return
        current_context = self._instance.tracer.current_trace_context()
        if current_context is None:
            return
        current_context_id = current_context.get_baggage_item(ANNOTATIONS_CONTEXT_ID)
        with self._annotation_context_lock:
            for _, context_id, annotation_kwargs in self._instance._annotations:
                if current_context_id == context_id:
                    self.annotate(span, **annotation_kwargs, _suppress_span_kind_error=True)

    def _child_after_fork(self) -> None:
        self._llmobs_span_writer = self._llmobs_span_writer.recreate()
        self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
        self._evaluator_runner = self._evaluator_runner.recreate()
        LLMObs._prompt_manager = None
        if self.enabled:
            self._start_service()

    def _start_service(self) -> None:
        try:
            self._llmobs_span_writer.start()
            self._llmobs_eval_metric_writer.start()
        except ServiceStatusError:
            log.debug("Error starting LLMObs writers")

        try:
            self._evaluator_runner.start()
        except ServiceStatusError:
            log.debug("Error starting evaluator runner")

    def _stop_service(self) -> None:
        try:
            self._evaluator_runner.stop()
            # flush remaining evaluation spans & evaluations
            self._instance._llmobs_span_writer.periodic()
            self._instance._llmobs_eval_metric_writer.periodic()
        except ServiceStatusError:
            log.debug("Error stopping evaluator runner")

        try:
            self._llmobs_span_writer.stop()
            self._llmobs_eval_metric_writer.stop()
        except ServiceStatusError:
            log.debug("Error stopping LLMObs writers")

        # Remove listener hooks for span events
        core.reset_listeners("trace.span_start", self._on_span_start)
        core.reset_listeners("trace.span_finish", self._on_span_finish)
        core.reset_listeners("http.span_inject", self._inject_llmobs_context)
        core.reset_listeners(
            "http.activate_distributed_headers",
            self._activate_llmobs_distributed_context_soft_fail,
        )
        core.reset_listeners("threading.submit", self._current_trace_context)
        core.reset_listeners("threading.execution", self._llmobs_context_provider.activate)
        core.reset_listeners("asyncio.create_task", self._on_asyncio_create_task)
        core.reset_listeners("asyncio.execute_task", self._on_asyncio_execute_task)

        core.reset_listeners(DISPATCH_ON_LLM_TOOL_CHOICE, self._link_tracker.on_llm_tool_choice)
        core.reset_listeners(DISPATCH_ON_TOOL_CALL, self._link_tracker.on_tool_call)
        core.reset_listeners(
            DISPATCH_ON_TOOL_CALL_OUTPUT_USED,
            self._link_tracker.on_tool_call_output_used,
        )

        core.reset_listeners(DISPATCH_ON_GUARDRAIL_SPAN_START, self._link_tracker.on_guardrail_span_start)
        core.reset_listeners(DISPATCH_ON_LLM_SPAN_FINISH, self._link_tracker.on_llm_span_finish)
        core.reset_listeners(
            DISPATCH_ON_OPENAI_AGENT_SPAN_FINISH,
            self._link_tracker.on_openai_agent_span_finish,
        )

        forksafe.unregister(self._child_after_fork)

    @classmethod
    def enable(
        cls,
        ml_app: Optional[str] = None,
        integrations_enabled: bool = True,
        agentless_enabled: Optional[bool] = None,
        instrumented_proxy_urls: Optional[set[str]] = None,
        site: Optional[str] = None,
        api_key: Optional[str] = None,
        app_key: Optional[str] = None,
        project_name: Optional[str] = None,
        env: Optional[str] = None,
        service: Optional[str] = None,
        span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
        _tracer: Optional[Tracer] = None,
        _auto: bool = False,
    ) -> None:
        """
        Enable LLM Observability tracing.

        :param str ml_app: The name of your ml application.
        :param bool integrations_enabled: set to `true` to enable LLM integrations.
        :param bool agentless_enabled: set to `true` to disable sending data that requires a Datadog Agent.
        :param set[str] instrumented_proxy_urls: A set of instrumented proxy URLs to help detect when to emit LLM spans.
        :param str site: Your datadog site.
        :param str api_key: Your datadog api key.
        :param str app_key: Your datadog application key.
        :param str project_name: Your project name used for experiments.
        :param str env: Your environment name.
        :param str service: Your service name.
        :param Callable[[LLMObsSpan], Optional[LLMObsSpan]] span_processor: A function that takes an LLMObsSpan and
            returns an LLMObsSpan or None. If None is returned, the span will be omitted and not sent to LLMObs.
        """
        if cls.enabled:
            log.debug("%s already enabled", cls.__name__)
            return

        cls._warn_if_litellm_was_imported()

        if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")):
            log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
            return
        # grab required values for LLMObs
        config._dd_site = site or config._dd_site
        config._dd_api_key = api_key or config._dd_api_key
        cls._app_key = app_key or cls._app_key
        cls._project_name = project_name or cls._project_name or DEFAULT_PROJECT_NAME
        config.env = env or config.env
        config.service = service or config.service
        config._llmobs_ml_app = ml_app or config._llmobs_ml_app
        config._llmobs_instrumented_proxy_urls = instrumented_proxy_urls or config._llmobs_instrumented_proxy_urls

        error = None
        start_ns = time.time_ns()
        try:
            config._llmobs_agentless_enabled = should_use_agentless(
                user_defined_agentless_enabled=(
                    agentless_enabled if agentless_enabled is not None else config._llmobs_agentless_enabled
                )
            )

            if config._llmobs_agentless_enabled:
                # validate required values for agentless LLMObs
                if not config._dd_api_key:
                    error = "missing_api_key"
                    raise ValueError(
                        "DD_API_KEY is required for sending LLMObs data when agentless mode is enabled. "
                        "Ensure this configuration is set before running your application."
                    )
                if not config._dd_site:
                    error = "missing_site"
                    raise ValueError(
                        "DD_SITE is required for sending LLMObs data when agentless mode is enabled. "
                        "Ensure this configuration is set before running your application."
                    )
                if not os.getenv("DD_REMOTE_CONFIGURATION_ENABLED"):
                    config._remote_config_enabled = False
                    log.debug("Remote configuration disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.")
                    remoteconfig_poller.disable()

                # Since the API key can be set programmatically and TelemetryWriter is already initialized by now,
                # we need to force telemetry to use agentless configuration
                telemetry_writer.enable_agentless_client(True)

            if integrations_enabled:
                cls._patch_integrations()

            # override the default _instance with a new tracer
            cls._instance = cls(tracer=_tracer, span_processor=span_processor)

            # Add APM trace filter to drop all APM traces when DD_APM_TRACING_ENABLED is falsy
            apm_filter = APMTracingEnabledFilter()
            cls._instance.tracer._span_aggregator.dd_processors.append(apm_filter)

            cls.enabled = True
            cls._instance.start()

            # Register hooks for span events
            core.on("trace.span_start", cls._instance._on_span_start)
            core.on("trace.span_finish", cls._instance._on_span_finish)
            core.on("http.span_inject", cls._inject_llmobs_context)
            core.on(
                "http.activate_distributed_headers",
                cls._activate_llmobs_distributed_context_soft_fail,
            )
            core.on("threading.submit", cls._instance._current_trace_context, "llmobs_ctx")
            core.on("threading.execution", cls._instance._llmobs_context_provider.activate)
            core.on("asyncio.create_task", cls._instance._on_asyncio_create_task)
            core.on("asyncio.execute_task", cls._instance._on_asyncio_execute_task)

            core.on(
                DISPATCH_ON_LLM_TOOL_CHOICE,
                cls._instance._link_tracker.on_llm_tool_choice,
            )
            core.on(DISPATCH_ON_TOOL_CALL, cls._instance._link_tracker.on_tool_call)
            core.on(
                DISPATCH_ON_TOOL_CALL_OUTPUT_USED,
                cls._instance._link_tracker.on_tool_call_output_used,
            )

            core.on(
                DISPATCH_ON_GUARDRAIL_SPAN_START,
                cls._instance._link_tracker.on_guardrail_span_start,
            )
            core.on(
                DISPATCH_ON_LLM_SPAN_FINISH,
                cls._instance._link_tracker.on_llm_span_finish,
            )
            core.on(
                DISPATCH_ON_OPENAI_AGENT_SPAN_FINISH,
                cls._instance._link_tracker.on_openai_agent_span_finish,
            )

            atexit.register(cls.disable)
            telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True)

            log.debug(
                "%s enabled; instrumented_proxy_urls: %s",
                cls.__name__,
                config._llmobs_instrumented_proxy_urls,
            )

            llmobs_info = {
                "llmobs_enabled": cls.enabled,
                "llmobs_ml_app": config._llmobs_ml_app,
                "integrations_enabled": integrations_enabled,
                "llmobs_agentless_enabled": config._llmobs_agentless_enabled,
            }
            log.debug("LLMObs configurations: %s", llmobs_info)

        finally:
            telemetry.record_llmobs_enabled(
                error,
                config._llmobs_agentless_enabled,
                config._dd_site,
                start_ns,
                _auto,
                config._llmobs_instrumented_proxy_urls,
                config._llmobs_ml_app,
            )

    @staticmethod
    def _warn_if_litellm_was_imported() -> None:
        if "litellm" in sys.modules:
            import litellm

            if not getattr(litellm, "_datadog_patch", False):
                log.warning(
                    "LLMObs.enable() called after litellm was imported but before it was patched. "
                    "This may cause tracing issues if you are importing patched methods like 'litellm.completion' "
                    "directly. To ensure proper tracing, either run your application with ddtrace-run, "
                    "call ddtrace.patch_all() before importing litellm, or "
                    "enable LLMObs before importing other modules."
                )

    def _on_asyncio_create_task(self, task_data: dict[str, Any]) -> None:
        """Propagates llmobs active trace context across asyncio tasks."""
        task_data["llmobs_ctx"] = self._current_trace_context()

    def _on_asyncio_execute_task(self, task_data: dict[str, Any]) -> None:
        """Activates llmobs active trace context across asyncio task execution."""
        llmobs_ctx = task_data.get("llmobs_ctx")
        if llmobs_ctx is not None:
            self._llmobs_context_provider.activate(llmobs_ctx)

    @classmethod
    def publish_evaluator(
        cls,
        evaluator: BaseEvaluator,
        ml_app: str,
        eval_name: Optional[str] = None,
        variable_mapping: Optional[dict[str, str]] = None,
    ) -> dict[str, str]:
        if not cls._instance or not cls._instance.enabled:
            raise ValueError("LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`")

        evaluation_payload = evaluator._build_publish_payload(
            ml_app=ml_app, eval_name=eval_name, variable_mapping=variable_mapping
        )

        cls._instance._dne_client.publish_custom_evaluator(evaluation_payload)

        base_url = _get_base_url()
        query = urllib.parse.urlencode({"evalName": evaluation_payload["eval_name"], "applicationName": ml_app.strip()})
        return {"ui_url": f"{base_url}/llm/evaluations/custom?{query}"}

    @classmethod
    def pull_dataset(
        cls,
        dataset_name: str,
        project_name: Optional[str] = None,
        version: Optional[int] = None,
        tags: Optional[list[str]] = None,
    ) -> Dataset:
        if tags is not None and not isinstance(tags, list):
            raise ValueError(
                "tags must be a list of strings in the format of tag key value pairs. "
                'Example: tags=["key1:value1", "key2:value2"]'
            )
        ds = cls._instance._dne_client.dataset_get_with_records(
            dataset_name, (project_name or cls._project_name), version, tags
        )
        return ds

    @classmethod
    def create_dataset(
        cls,
        dataset_name: str,
        project_name: Optional[str] = None,
        description: str = "",
        records: Optional[list[DatasetRecord]] = None,
        bulk_upload: bool = False,
        deduplicate: bool = True,
    ) -> Dataset:
        """Creates a Dataset to run Experiments on.

        :param dataset_name: The name of the dataset.
        :param project_name: The name of the project to save the dataset to.
        :param description: The description of the dataset.
        :param records: Optional records to initialize the dataset with.
        :param deduplicate:
            Wether to deduplicate the records or not. If bulk_upload is True, deduplication occurs
            within the uploaded data, not existing data already stored on the sever.
        :param bulk_upload:
            - True:
                Uploads all records in a single request. This method does not support deduplication
                against existing data and is best suited for initial uploads.
            - False:
                Splits the data into batches and uploads them individually. This method supports
                deduplication against existing records but does not provide transactional guarantees
                when the same dataset is modified concurrently by multiple clients.
        """
        if records is None:
            records = []
        ds = cls._instance._dne_client.dataset_create(dataset_name, project_name, description)

        if len(records) > 0:
            if bulk_upload:
                for record in records:
                    ds.append(record)
                ds.push(deduplicate=deduplicate, bulk_upload=True)
            else:
                num_batches = math.ceil(len(safe_json(records)) / ds.BATCH_UPDATE_THRESHOLD)
                batch_size = math.ceil(len(records) / num_batches)
                log.debug(
                    "batched upload num_batches :%d, batch_size: %d",
                    num_batches,
                    batch_size,
                )
                create_new_version = True  # wether the server should attempt to bump the data version or not
                for record_batch in _batched(records, batch_size):
                    for record in record_batch:
                        ds.append(record)
                    data_changed = ds._push(
                        deduplicate=deduplicate,
                        create_new_version=create_new_version,
                        bulk_upload=False,
                    )
                    if data_changed:
                        # Since we are batching a single upload, we should only bump the version at most once
                        create_new_version = False

        return ds

    @classmethod
    def create_dataset_from_csv(
        cls,
        csv_path: str,
        dataset_name: str,
        input_data_columns: list[str],
        expected_output_columns: Optional[list[str]] = None,
        metadata_columns: Optional[list[str]] = None,
        csv_delimiter: str = ",",
        description: str = "",
        project_name: Optional[str] = None,
        deduplicate: bool = True,
    ) -> Dataset:
        if expected_output_columns is None:
            expected_output_columns = []
        if metadata_columns is None:
            metadata_columns = []

        # Store the original field size limit to restore it later
        original_field_size_limit = csv.field_size_limit()

        csv.field_size_limit(EXPERIMENT_CSV_FIELD_MAX_SIZE)  # 10mb

        records = []
        try:
            with open(csv_path, mode="r") as csvfile:
                content = csvfile.readline().strip()
                if not content:
                    raise ValueError("CSV file appears to be empty or header is missing.")

                csvfile.seek(0)

                rows = csv.DictReader(csvfile, delimiter=csv_delimiter)

                if rows.fieldnames is None:
                    raise ValueError("CSV file appears to be empty or header is missing.")

                header_columns = rows.fieldnames
                missing_input_columns = [col for col in input_data_columns if col not in header_columns]
                missing_output_columns = [col for col in expected_output_columns if col not in header_columns]
                missing_metadata_columns = [col for col in metadata_columns if col not in metadata_columns]

                if any(col not in header_columns for col in input_data_columns):
                    raise ValueError(f"Input columns not found in CSV header: {missing_input_columns}")
                if any(col not in header_columns for col in expected_output_columns):
                    raise ValueError(f"Expected output columns not found in CSV header: {missing_output_columns}")
                if any(col not in header_columns for col in metadata_columns):
                    raise ValueError(f"Metadata columns not found in CSV header: {missing_metadata_columns}")

                for row in rows:
                    records.append(
                        DatasetRecord(
                            input_data={col: row[col] for col in input_data_columns},
                            expected_output={col: row[col] for col in expected_output_columns},
                            metadata={col: row[col] for col in metadata_columns},
                            tags=[],
                            record_id="",
                            canonical_id=None,
                        )
                    )

        finally:
            # Always restore the original field size limit
            csv.field_size_limit(original_field_size_limit)

        ds = cls._instance._dne_client.dataset_create(dataset_name, project_name, description)
        for r in records:
            ds.append(r)
        if len(ds) > 0:
            cls._instance._dne_client.dataset_bulk_upload(ds._id, ds._records, deduplicate=deduplicate)
        return ds

    @classmethod
    def _delete_dataset(cls, dataset_id: str) -> None:
        return cls._instance._dne_client.dataset_delete(dataset_id)

    @classmethod
    def _prompt_optimization(
        cls,
        name: str,
        task: Callable[[DatasetRecordInputType, Optional[ConfigType]], JSONType],
        optimization_task: Callable[[str, str, ConfigType], str],
        dataset: Dataset,
        evaluators: Sequence[EvaluatorType],
        summary_evaluators: Sequence[SummaryEvaluatorType],
        labelization_function: Optional[Callable[[dict[str, Any]], str]],
        compute_score: Callable[[dict[str, dict[str, Any]]], float],
        config: ConfigType,
        project_name: Optional[str] = None,
        tags: Optional[dict[str, str]] = None,
        max_iterations: int = 5,
        stopping_condition: Optional[Callable[[dict[str, dict[str, Any]]], bool]] = None,
        dataset_split: Union[bool, tuple[float, ...]] = False,
        test_dataset: Optional[str] = None,
    ) -> PromptOptimization:
        """Initialize a PromptOptimization to iteratively improve prompts using experiments.

        PromptOptimization runs a baseline experiment with an initial prompt, then uses an
        optimization task to iteratively suggest improvements based on evaluation results.

        :param name: The name of the prompt optimization run.
        :param task: The task function to execute on the dataset. Must accept parameters ``input_data``
                     (dict with input data for the task) and ``config`` (configuration dictionary).
                     Should return the task output as a JSON-serializable value.
        :param optimization_task: Function that calls an LLM to generate improved prompts.
                                  Must accept parameters ``system_prompt`` (str), ``user_prompt`` (str),
                                  and ``config`` (dict). Should return the new prompt (str).
                                  The system_prompt contains optimization instructions
                                  loaded from template, and user_prompt contains the current prompt with
                                  evaluation examples.
        :param dataset: The dataset to run experiments on, created with ``LLMObs.create_dataset()``
                       or ``LLMObs.pull_dataset()``.
        :param evaluators: A list of evaluators to measure task performance. Can be either
                          class-based evaluators (inheriting from BaseEvaluator) or function-based
                          evaluators that accept (input_data, output_data, expected_output) parameters.
                          Should return a JSON-serializable value or an EvaluatorResult with the evaluation results.
        :param summary_evaluators: list of summary evaluators (REQUIRED). Can be either
                                   class-based evaluators (inheriting from BaseSummaryEvaluator) or function-based
                                   evaluators that accept (inputs: list, outputs: list, expected_outputs: list,
                                   evaluations: dict) and return aggregated metrics.
        :param labelization_function: Function to generate labels from individual experiment results (REQUIRED).
                                     Takes an individual result dict (containing "evaluations" key) and returns
                                     a string label. Used to categorize examples shown to the optimization LLM.
                                     Example: ``lambda r: "Very good" if r["evaluations"]["score"] >= 0.8 else "Bad"``
        :param compute_score: Function to compute the score for each iteration (REQUIRED).
                             Takes summary_evaluations dict from the experiment result and returns a float score.
                             Used to determine which iteration performed best.
        :param config: Configuration dictionary for the optimization. Must contain:
                      - ``prompt`` (mandatory): Initial prompt template
                      - ``model_name`` (optional): Model to use for task execution
                      - ``evaluation_output_format`` (optional): the output format wanted
                      - ``runs`` (optional): The number of times to run the experiment, or, run the task for every
                                             dataset record the defined number of times.
                      Additional config values are passed through to the task function.
        :param project_name: The name of the project to organize optimization runs. Defaults to the
                            project name set in ``LLMObs.enable()``.
        :param tags: A dictionary of string key-value tag pairs to associate with the optimization.
        :param max_iterations: Maximum number of optimization iterations to run. Default is 5.
        :param stopping_condition: Optional function to determine when to stop optimization early.
                                   Takes summary_evaluations dict from the experiment result and returns True if
                                   optimization should stop.
        :param dataset_split: Controls dataset splitting. Accepts:
            - ``False`` (default): No splitting, use full dataset for everything.
            - ``True``: Split with default ratios (60/20/20 without test_dataset, 80/20 with).
            - ``(train, valid, test)`` tuple: Custom 3-way split ratios. Must sum to 1.0.
              Cannot be combined with ``test_dataset``.
            - ``(train, valid)`` tuple: Custom 2-way split ratios. Must sum to 1.0.
              Requires ``test_dataset`` for the test set.
        :param test_dataset: Optional name of a separate test dataset. When provided, the dataset is
                            pulled automatically, the main dataset is split into train/valid (80/20),
                            and the test dataset is used for the final unbiased score.
                            Implicitly enables dataset splitting.
        :return: PromptOptimization object. Call ``.run()`` to execute the optimization.
        :raises TypeError: If task, optimization_task, evaluators, or dataset have incorrect types
                          or signatures.
        :raises ValueError: If config is missing required keys.

        Example::

            def my_task(input_data, config):
                prompt = config["prompt"]
                question = input_data["question"]
                # Use prompt and question to generate answer
                return {"answer": generate_answer(prompt, question)}

            def optimization_task(system_prompt, user_prompt, config):
                # Call LLM to analyze results and generate improved prompt
                import openai
                client = openai.OpenAI(api_key="your-api-key")
                response = client.chat.completions.create(
                    model="gpt-4",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": user_prompt},
                    ],
                    response_format={"type": "json_object"},
                )
                new_prompt = json.loads(response.choices[0].message.content)["new_prompt"]
                return new_prompt

            def accuracy_evaluator(input_data, output_data, expected_output):
                is_correct = output_data["answer"] == expected_output["answer"]
                return "correct" if is_correct else "incorrect"

            def summary_evaluator(inputs, outputs, expected_outputs, evaluations):
                correct_count = sum(1 for e in evaluations if e == "correct")
                return {"accuracy": correct_count / len(evaluations)}

            def compute_score(summary_evaluations):
                return summary_evaluations["summary_evaluator"]["value"]["accuracy"]

            def labelization_function(individual_result):
                eval_value = individual_result["evaluations"]["accuracy_evaluator"]["value"]
                return "Correct answer" if eval_value == "correct" else "Incorrect answer"

            dataset = LLMObs.create_dataset(name="qa_pairs", ...)
            opt = LLMObs._prompt_optimization(
                name="optimize_qa",
                task=my_task,
                optimization_task=optimization_task,
                dataset=dataset,
                evaluators=[accuracy_evaluator],
                labelization_function=labelization_function,
                compute_score=compute_score,
                summary_evaluators=[summary_evaluator],
                config={
                    "prompt": "Answer the question: {question}",
                    "model_name": "gpt-3.5-turbo",
                    "runs": 10
                }
            )
            results = opt.run()
        """
        validate_task(task)
        validate_optimization_task(optimization_task)
        validate_dataset(dataset)
        validate_test_dataset(test_dataset)
        validate_dataset_split(dataset_split, test_dataset)
        validate_evaluators(evaluators)

        pulled_test_dataset = None
        if test_dataset is not None:
            pulled_test_dataset = cls.pull_dataset(dataset_name=test_dataset, project_name=project_name)

        return PromptOptimization(
            name=name,
            task=task,
            optimization_task=optimization_task,
            dataset=dataset,
            evaluators=evaluators,
            project_name=project_name or cls._project_name,
            config=config,
            summary_evaluators=summary_evaluators,
            compute_score=compute_score,
            labelization_function=labelization_function,
            _llmobs_instance=cls._instance,
            tags=tags,
            max_iterations=max_iterations,
            stopping_condition=stopping_condition,
            dataset_split=dataset_split,
            test_dataset=pulled_test_dataset,
        )

    @classmethod
    def experiment(
        cls,
        name: str,
        task: TaskType,
        dataset: Dataset,
        evaluators: Sequence[EvaluatorType],
        description: str = "",
        project_name: Optional[str] = None,
        tags: Optional[dict[str, str]] = None,
        config: Optional[ConfigType] = None,
        summary_evaluators: Optional[Sequence[SummaryEvaluatorType]] = None,
        runs: Optional[int] = 1,
    ) -> SyncExperiment:
        """Initializes an Experiment to run a task on a Dataset and evaluators.

        :param name: The name of the experiment.
        :param task: The task function to run. Must accept parameters ``input_data`` and ``config``.
        :param dataset: The dataset to run the experiment on, created with LLMObs.pull/create_dataset().
        :param evaluators: A list of evaluator functions or BaseEvaluator instances to evaluate the task output.
                           Function-based evaluators must accept parameters ``input_data``, ``output_data``,
                           and ``expected_output``.
                           Class-based evaluators must inherit from BaseEvaluator and implement the evaluate method.
        :param project_name: The name of the project to save the experiment to.
        :param description: A description of the experiment.
        :param tags: A dictionary of string key-value tag pairs to associate with the experiment.
        :param config: A configuration dictionary describing the experiment.
        :param summary_evaluators: A list of summary evaluator functions or BaseSummaryEvaluator instances to evaluate
                                   the task results and evaluations to produce a single value.
                                   Function-based summary evaluators must accept parameters ``inputs``, ``outputs``,
                                   ``expected_outputs``, ``evaluators_results``.
                                   Class-based summary evaluators must inherit from BaseSummaryEvaluator and implement
                                   the evaluate method which receives a SummaryEvaluatorContext.
        :param runs: The number of times to run the experiment, or, run the task for every dataset record the defined
                     number of times.
        """
        _validate_task_signature(task, is_async=False)
        if not isinstance(dataset, Dataset):
            raise TypeError("Dataset must be an LLMObs Dataset object.")
        if not evaluators:
            raise TypeError("Evaluators must be a list of callable functions or BaseEvaluator instances.")
        evaluators_list = list(evaluators)
        for idx, evaluator in enumerate(evaluators_list):
            _validate_evaluator_signature(evaluator, is_async=False)
            if _is_deep_eval_evaluator(evaluator):
                evaluators_list[idx] = _deep_eval_evaluator_wrapper(evaluator)
                continue
        if summary_evaluators and not all(
            callable(summary_evaluator) or isinstance(summary_evaluator, BaseSummaryEvaluator)
            for summary_evaluator in summary_evaluators
        ):
            raise TypeError(
                "Summary evaluators must be a list of callable functions or BaseSummaryEvaluator instances."
            )
        if summary_evaluators:
            for summary_evaluator in summary_evaluators:
                _validate_summary_evaluator_signature(summary_evaluator, is_async=False)
        return SyncExperiment(
            name,
            task,
            dataset,
            evaluators_list,
            project_name=project_name or cls._project_name,
            tags=tags,
            description=description,
            config=config,
            _llmobs_instance=cls._instance,
            summary_evaluators=summary_evaluators,
            runs=runs,
        )

    @classmethod
    def async_experiment(
        cls,
        name: str,
        task: AsyncTaskType,
        dataset: Dataset,
        evaluators: Sequence[Union[EvaluatorType, AsyncEvaluatorType]],
        description: str = "",
        project_name: Optional[str] = None,
        tags: Optional[dict[str, str]] = None,
        config: Optional[ConfigType] = None,
        summary_evaluators: Optional[Sequence[Union[SummaryEvaluatorType, AsyncSummaryEvaluatorType]]] = None,
        runs: Optional[int] = 1,
    ) -> Experiment:
        """Initializes an Experiment to run an async task on a Dataset with evaluators.

        This is the async version of experiment() that supports async tasks, evaluators, and summary evaluators.
        Sync evaluators are also supported and will be run via asyncio.to_thread().

        :param name: The name of the experiment.
        :param task: The async task function to run. Must be an async function accepting parameters
                     ``input_data`` and ``config``.
        :param dataset: The dataset to run the experiment on, created with LLMObs.pull/create_dataset().
        :param evaluators: A list of evaluator functions or BaseEvaluator/BaseAsyncEvaluator instances.
                           Supports both sync and async evaluators. Sync evaluators will be run in a thread pool.
                           Function-based evaluators must accept parameters ``input_data``, ``output_data``,
                           and ``expected_output``.
                           Class-based evaluators must inherit from BaseEvaluator or BaseAsyncEvaluator
                           and implement the evaluate method.
        :param project_name: The name of the project to save the experiment to.
        :param description: A description of the experiment.
        :param tags: A dictionary of string key-value tag pairs to associate with the experiment.
        :param config: A configuration dictionary describing the experiment.
        :param summary_evaluators: A list of summary evaluator functions or BaseSummaryEvaluator/
                                   BaseAsyncSummaryEvaluator instances. Supports both sync and async.
                                   Function-based summary evaluators must accept parameters ``inputs``, ``outputs``,
                                   ``expected_outputs``, ``evaluators_results``.
                                   Class-based summary evaluators must inherit from BaseSummaryEvaluator or
                                   BaseAsyncSummaryEvaluator and implement the evaluate method.
        :param runs: The number of times to run the experiment.
        """
        _validate_task_signature(task, is_async=True)
        if not isinstance(dataset, Dataset):
            raise TypeError("Dataset must be an LLMObs Dataset object.")
        if not evaluators:
            raise TypeError(
                "Evaluators must be a list of callable functions, BaseEvaluator, or BaseAsyncEvaluator instances."
            )
        evaluators_list = list(evaluators)
        for idx, evaluator in enumerate(evaluators_list):
            _validate_evaluator_signature(evaluator, is_async=True)
            if _is_deep_eval_evaluator(evaluator):
                evaluators_list[idx] = _deep_eval_async_evaluator_wrapper(evaluator)
                continue
        if summary_evaluators:
            for summary_evaluator in summary_evaluators:
                _validate_summary_evaluator_signature(summary_evaluator, is_async=True)
        return Experiment(
            name,
            task,
            dataset,
            evaluators_list,
            project_name=project_name or cls._project_name,
            tags=tags,
            description=description,
            config=config,
            _llmobs_instance=cls._instance,
            summary_evaluators=summary_evaluators,
            runs=runs,
        )

    @classmethod
    def _distributed_experiment(
        cls,
        name: str,
        dataset: Dataset,
        description: str = "",
        project_name: Optional[str] = None,
        tags: Optional[dict[str, str]] = None,
        config: Optional[ConfigType] = None,
        runs: Optional[int] = 1,
    ) -> Experiment:
        experiment = Experiment(
            name,
            Experiment._NO_OP_TASK,
            dataset,
            [],
            project_name=project_name or cls._project_name,
            tags=tags,
            description=description,
            config=config,
            _llmobs_instance=cls._instance,
            runs=runs,
            is_distributed=True,
        )
        experiment._setup_experiment(
            "LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`",
            ensure_unique=False,
        )
        return experiment

    @classmethod
    def _run_for_experiment(
        cls,
        experiment_id: str,
        task: Callable[[DatasetRecordInputType, Optional[ConfigType]], JSONType],
        dataset_records: list[DatasetRecord],
        evaluators: Sequence[Union[EvaluatorType, AsyncEvaluatorType]],
        jobs: int = 1,
        raise_errors: bool = False,
        run_iteration: Optional[int] = 0,
        tags: Optional[dict[str, str]] = None,
    ) -> tuple[Experiment, ExperimentResult]:
        if not cls._instance or not cls._instance.enabled:
            raise ValueError("LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`")
        experiment = cls._instance._dne_client.experiment_get(experiment_id)
        experiment._llmobs_instance = cls._instance
        experiment._dataset._records = dataset_records
        experiment._task = task
        experiment._evaluators = evaluators

        coro = experiment._run_task_single_iteration(jobs, raise_errors, run_iteration)
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            results = asyncio.run(coro)
        else:
            import concurrent.futures

            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
                results = pool.submit(asyncio.run, coro).result()
        return experiment, results

    @classmethod
    def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None) -> None:
        """Register a processor to be called on each LLMObs span.

        This can be used to modify the span before it is sent to LLMObs. For example, you can modify the input/output.
        You can also return None to omit the span entirely from being sent to LLMObs.

        To deregister the processor, call `register_processor(None)`.

        :param processor: A function that takes an LLMObsSpan and returns an LLMObsSpan or None.
                         If None is returned, the span will be omitted and not sent to LLMObs.
        """
        cls._instance._user_span_processor = processor

    @classmethod
    def _integration_is_enabled(cls, integration: str) -> bool:
        if integration not in SUPPORTED_LLMOBS_INTEGRATIONS:
            return False
        return SUPPORTED_LLMOBS_INTEGRATIONS[integration] in ddtrace._monkey._get_patched_modules()

    @classmethod
    def disable(cls) -> None:
        if not cls.enabled:
            log.debug("%s not enabled", cls.__name__)
            return
        log.debug("Disabling %s", cls.__name__)
        atexit.unregister(cls.disable)

        cls._instance.stop()
        cls.enabled = False
        cls._prompt_manager = None
        telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, False)

        log.debug("%s disabled", cls.__name__)

    def _tag_span_links(self, span, span_links):
        if not span_links:
            return
        span_links = [
            span_link
            for span_link in span_links
            if span_link["span_id"] != LLMObs.export_span(span)["span_id"]
            and span_link["trace_id"] == LLMObs.export_span(span)["trace_id"]
        ]
        current_span_links = span._get_ctx_item(SPAN_LINKS)
        if current_span_links:
            span_links = current_span_links + span_links
        span._set_ctx_item(SPAN_LINKS, span_links)

    @classmethod
    def annotation_context(
        cls,
        tags: Optional[dict[str, Any]] = None,
        prompt: Optional[Union[dict, Prompt]] = None,
        name: Optional[str] = None,
        _linked_spans: Optional[list[ExportedLLMObsSpan]] = None,
    ) -> AnnotationContext:
        """
        Sets specified attributes on all LLMObs spans created while the returned AnnotationContext is active.
        Annotations are applied in the order in which annotation contexts are entered.

        :param tags: Dictionary of JSON serializable key-value tag pairs to set or update on the LLMObs span
                     regarding the span's context.
        :param prompt: A dictionary that represents the prompt used for an LLM call in the following form:
                        `{
                            "id": "...",
                            "version": "...",
                            "chat_template": [{"content": "...", "role": "..."}, ...],
                            "variables": {"variable_1": "...", ...}}`.
                            "tags": {"key1": "value1", "key2": "value2"},
                        }`
                        Can also be set using the `ddtrace.llmobs.utils.Prompt` constructor class.
                        For managed prompts, use `prompt.to_annotation_dict(**variables)`.
                        - This argument is only applicable to LLM spans.
                        - The dictionary may contain optional keys relevant to Templates and RAG applications:
                            `rag_context_variables` - a list of variable key names that contain ground
                                                        truth context information
                            `rag_query_variables` - a list of variable key names that contains query
                                                        information for an LLM call
        :param name: set to override the span name for any spans annotated within the returned context.
        """
        # id to track an annotation for registering / de-registering
        annotation_id = rand64bits()
        # Track context we create so we can clean up _reactivate on exit.
        # Using a dict as a mutable container to share state between closures.
        state = {"created_context": None}

        def get_annotations_context_id():
            current_ctx = cls._instance.tracer.current_trace_context()
            # default the context id to the annotation id
            ctx_id = annotation_id
            if current_ctx is None:
                # No context exists - create one and enable reactivation so spans finishing
                # within this annotation_context don't clear the context for subsequent operations
                current_ctx = Context(is_remote=False)
                current_ctx.set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
                current_ctx._reactivate = True
                cls._instance.tracer.context_provider.activate(current_ctx)
                state["created_context"] = current_ctx
            elif not current_ctx.get_baggage_item(ANNOTATIONS_CONTEXT_ID):
                current_ctx.set_baggage_item(ANNOTATIONS_CONTEXT_ID, ctx_id)
            else:
                ctx_id = current_ctx.get_baggage_item(ANNOTATIONS_CONTEXT_ID)
            return ctx_id

        def register_annotation():
            with cls._instance._annotation_context_lock:
                ctx_id = get_annotations_context_id()
                cls._instance._annotations.append(
                    (
                        annotation_id,
                        ctx_id,
                        {
                            "tags": tags,
                            "prompt": prompt,
                            "_name": name,
                            "_linked_spans": _linked_spans,
                        },
                    )
                )

        def deregister_annotation():
            with cls._instance._annotation_context_lock:
                for i, (key, _, _) in enumerate(cls._instance._annotations):
                    if key == annotation_id:
                        cls._instance._annotations.pop(i)
                        break
                else:
                    log.debug("Failed to pop annotation context")
            # Disable reactivation on context we created to prevent it from being
            # restored after exiting the annotation_context block
            if state["created_context"] is not None:
                state["created_context"]._reactivate = False
                # DEV: Deactivate the context we created so subsequent annotation_contexts
                # don't see the stale context with the old ANNOTATIONS_CONTEXT_ID.
                # Only deactivate if this context is still the active one (not a Span).
                current_active = cls._instance.tracer.context_provider.active()
                if current_active is state["created_context"]:
                    cls._instance.tracer.context_provider.activate(None)

        return AnnotationContext(register_annotation, deregister_annotation)

    _prompt_manager: Optional[PromptManager] = None
    _prompt_manager_lock = RLock()

    @classmethod
    def _ensure_prompt_manager(cls) -> PromptManager:
        """Thread-safe get-or-initialize for the prompt manager."""
        with cls._prompt_manager_lock:
            manager = cls._prompt_manager
            if manager is None:
                manager = cls._initialize_prompt_manager()
                cls._prompt_manager = manager
            return manager

    @classmethod
    def get_prompt(
        cls,
        prompt_id: str,
        label: Optional[Literal["development", "production"]] = None,
        fallback: PromptFallback = None,
    ) -> ManagedPrompt:
        """
        Retrieve a prompt template from the Datadog Prompt Registry.

        :param prompt_id: The unique identifier of the prompt in the registry
        :param label: Deployment label (e.g., "production", "development"). If not provided, returns the latest version.
        :param fallback: Fallback to use if prompt cannot be fetched (cold start + API failure).
                         Can be a template string, message list, Prompt dict, or a callable that
                         returns any of those.

        :returns: A ManagedPrompt object with template and rendering methods
        :raises ValueError: If the prompt cannot be fetched and no fallback is provided

        Example::

            # Simple usage - returns the latest version
            prompt = LLMObs.get_prompt("greeting")
            messages = prompt.format(user="Alice")

            # With explicit label and fallback
            prompt = LLMObs.get_prompt(
                "greeting",
                label="production",
                fallback="Hello {{user}}, how can I help?"
            )

            # Use with annotation_context for observability
            # Pass the same variables to both format() and to_annotation_dict()
            prompt = LLMObs.get_prompt("greeting")
            variables = {"user": "Alice"}
            with LLMObs.annotation_context(prompt=prompt.to_annotation_dict(**variables)):
                openai.chat.completions.create(
                    messages=prompt.format(**variables)
                )
        """
        prompt_manager = cls._ensure_prompt_manager()
        return prompt_manager.get_prompt(prompt_id, label, fallback)

    @classmethod
    def clear_prompt_cache(cls, hot: bool = True, warm: bool = True) -> None:
        """Clear the prompt cache.

        Args:
            hot: If True, clear the hot (in-memory) cache. Defaults to True.
            warm: If True, clear the warm (file-based) cache. Defaults to True.
        """
        if cls._prompt_manager is not None:
            cls._prompt_manager.clear_cache(hot=hot, warm=warm)
        elif warm:
            # Clear file cache even if manager is not initialized
            cache_dir = _get_config("DD_LLMOBS_PROMPTS_CACHE_DIR")
            warm_cache = WarmCache(cache_dir=cache_dir)
            warm_cache.clear()

    @classmethod
    def refresh_prompt(
        cls,
        prompt_id: str,
        label: Optional[Literal["development", "production"]] = None,
    ) -> Optional[ManagedPrompt]:
        """Force refresh a specific prompt from the registry.

        Fetches the prompt synchronously and updates both caches.

        Args:
            prompt_id: The prompt identifier.
            label: The prompt label. If not provided, returns the latest version.

        Returns:
            The refreshed prompt, or None if fetch failed.
        """
        prompt_manager = cls._ensure_prompt_manager()
        return prompt_manager.refresh_prompt(prompt_id, label)

    @classmethod
    def _initialize_prompt_manager(cls) -> PromptManager:
        """Initialize the prompt manager with configuration."""
        api_key = config._dd_api_key

        if not api_key:
            raise ValueError("DD_API_KEY is required for the Prompt Registry")

        cache_ttl = _get_config("DD_LLMOBS_PROMPTS_CACHE_TTL", DEFAULT_PROMPTS_CACHE_TTL, float)
        file_cache_enabled = _get_config("DD_LLMOBS_PROMPTS_FILE_CACHE_ENABLED", False, asbool)
        cache_dir = _get_config("DD_LLMOBS_PROMPTS_CACHE_DIR")
        timeout = _get_config("DD_LLMOBS_PROMPTS_TIMEOUT", DEFAULT_PROMPTS_TIMEOUT, float)
        base_url = _get_config("DD_LLMOBS_OVERRIDE_ORIGIN") or f"https://api.{config._dd_site}"

        return PromptManager(
            api_key=api_key,
            base_url=base_url,
            cache_ttl=cache_ttl,
            file_cache_enabled=file_cache_enabled,
            cache_dir=cache_dir,
            timeout=timeout,
        )

    @classmethod
    def flush(cls) -> None:
        """
        Flushes any remaining spans and evaluation metrics to the LLMObs backend.
        """
        if cls.enabled is False:
            log.warning("flushing when LLMObs is disabled. No spans or evaluation metrics will be sent.")
            return

        error = None
        try:
            cls._instance._evaluator_runner.periodic()
        except Exception:
            error = "evaluator_flush_error"
            log.warning("Failed to run evaluator runner.", exc_info=True)

        try:
            cls._instance._llmobs_span_writer.periodic()
            cls._instance._llmobs_eval_metric_writer.periodic()
        except Exception:
            error = "writer_flush_error"
            log.warning("Failed to flush LLMObs spans and evaluation metrics.", exc_info=True)

        telemetry.record_user_flush(error)

    @staticmethod
    def _patch_integrations() -> None:
        """
        Patch LLM integrations. Ensure that we do not ignore DD_TRACE_<MODULE>_ENABLED or DD_PATCH_MODULES settings.
        """
        integrations_to_patch: dict[str, Union[list[str], bool]] = {
            integration: ["bedrock-runtime", "bedrock-agent-runtime"] if integration == "botocore" else True
            for integration in SUPPORTED_LLMOBS_INTEGRATIONS.values()
        }
        for module, _ in integrations_to_patch.items():
            env_var = "DD_TRACE_%s_ENABLED" % module.upper()
            if env_var in os.environ:
                integrations_to_patch[module] = asbool(os.environ[env_var])
        dd_patch_modules = os.getenv("DD_PATCH_MODULES")
        dd_patch_modules_to_str = parse_tags_str(dd_patch_modules)
        integrations_to_patch.update(
            {k: asbool(v) for k, v in dd_patch_modules_to_str.items() if k in SUPPORTED_LLMOBS_INTEGRATIONS.values()}
        )
        patch(raise_errors=True, **integrations_to_patch)
        llm_patched_modules = [k for k, v in integrations_to_patch.items() if v]
        log.debug("Patched LLM integrations: %s", llm_patched_modules)

    @classmethod
    def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan]:
        """Returns a simple representation of a span to export its span and trace IDs.
        If no span is provided, the current active LLMObs-type span will be used.
        """
        if not cls.enabled:
            log.warning("LLMObs.export_span() called when LLMObs is disabled. No span will be exported.")
            return None
        if span is None:
            span = cls._instance._current_span()
            if span is None:
                telemetry.record_span_exported(span, "no_active_span")
                raise LLMObsExportSpanError(
                    "No span provided and no active LLMObs-generated span found. "
                    "Ensure you pass the span explicitly using LLMObs.export_span(span=<your_span>) "
                    "when exporting from a different thread or async task than where the span was created."
                )
        error = None
        try:
            if span.span_type != SpanTypes.LLM:
                error = "invalid_span"
                raise LLMObsExportSpanError("Span must be an LLMObs-generated span.")
            return ExportedLLMObsSpan(
                span_id=str(span.span_id),
                trace_id=format_trace_id(span._get_ctx_item(LLMOBS_TRACE_ID) or span.trace_id),
            )
        except (TypeError, AttributeError):
            error = "invalid_span"
            raise LLMObsExportSpanError("Failed to export span. Span must be a valid Span object.") from None
        finally:
            telemetry.record_span_exported(span, error)

    def _current_span(self) -> Optional[Span]:
        """Returns the currently active LLMObs-generated span.
        Note that there may be an active span represented by a context object
        (i.e. a distributed trace) which will not be returned by this method.
        """
        active = self._llmobs_context_provider.active()
        return active if isinstance(active, Span) else None

    def _current_trace_context(self) -> Optional[Context]:
        """Returns the context for the current LLMObs trace."""
        active = self._llmobs_context_provider.active()
        if isinstance(active, Context):
            return active
        elif isinstance(active, Span):
            context = active.context
            context._meta[PROPAGATED_LLMOBS_TRACE_ID_KEY] = str(active._get_ctx_item(LLMOBS_TRACE_ID)) or str(
                active.trace_id
            )
            return context
        return None

    def _activate_llmobs_span(self, span: Span) -> None:
        """Propagate the llmobs parent span's ID as the new span's parent ID and activate the new span."""
        llmobs_parent = self._llmobs_context_provider.active()
        if llmobs_parent:
            span._set_ctx_item(PARENT_ID_KEY, str(llmobs_parent.span_id))
            parent_llmobs_trace_id = (
                llmobs_parent._get_ctx_item(LLMOBS_TRACE_ID)
                if isinstance(llmobs_parent, Span)
                else llmobs_parent._meta.get(PROPAGATED_LLMOBS_TRACE_ID_KEY)
            )
            llmobs_trace_id = (
                int(parent_llmobs_trace_id) if parent_llmobs_trace_id is not None else llmobs_parent.trace_id
            )
            span._set_ctx_item(LLMOBS_TRACE_ID, llmobs_trace_id)
        else:
            span._set_ctx_item(PARENT_ID_KEY, ROOT_PARENT_ID)
            span._set_ctx_item(LLMOBS_TRACE_ID, generate_128bit_trace_id())
        self._llmobs_context_provider.activate(span)

    def _start_span(
        self,
        operation_kind: str,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        model_name: Optional[str] = None,
        model_provider: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        if name is None:
            name = operation_kind
        span = self.tracer.trace(name, resource=operation_kind, span_type=SpanTypes.LLM)

        if not self.enabled:
            return span

        span._set_ctx_item(SPAN_KIND, operation_kind)
        if model_name is not None:
            span._set_ctx_item(MODEL_NAME, model_name)
        if model_provider is not None:
            span._set_ctx_item(MODEL_PROVIDER, model_provider)
        session_id = session_id if session_id is not None else _get_session_id(span)
        if session_id is not None:
            span._set_ctx_item(SESSION_ID, session_id)

        ml_app = ml_app if ml_app is not None else _get_ml_app(span)
        if ml_app is None:
            raise ValueError(
                "ml_app is required for sending LLM Observability data. "
                "Ensure the name of your LLM application is set via `DD_LLMOBS_ML_APP` or `LLMObs.enable(ml_app='...')`"
                "before running your application."
            )
        span._set_ctx_items({DECORATOR: _decorator, SPAN_KIND: operation_kind, ML_APP: ml_app})
        log.debug(
            "Starting LLMObs span: %s, span_kind: %s, ml_app: %s",
            name,
            operation_kind,
            ml_app,
        )
        return span

    @classmethod
    def llm(
        cls,
        model_name: Optional[str] = None,
        name: Optional[str] = None,
        model_provider: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace an invocation call to an LLM where inputs and outputs are represented as text.

        :param str model_name: The name of the invoked LLM. If not provided, a default value of "custom" will be set.
        :param str name: The name of the traced operation. If not provided, a default value of "llm" will be set.
        :param str model_provider: The name of the invoked LLM provider (ex: openai, bedrock).
                                   If not provided, a default value of "custom" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        if model_name is None:
            model_name = "custom"
        if model_provider is None:
            model_provider = "custom"
        return cls._instance._start_span(
            "llm",
            name,
            model_name=model_name,
            model_provider=model_provider,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def tool(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a call to an external interface or API.

        :param str name: The name of the traced operation. If not provided, a default value of "tool" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        return cls._instance._start_span(
            "tool",
            name=name,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def task(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a standalone non-LLM operation which does not involve an external request.

        :param str name: The name of the traced operation. If not provided, a default value of "task" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        return cls._instance._start_span(
            "task",
            name=name,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def agent(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a dynamic workflow in which an embedded language model (agent) decides what sequence of actions to take.

        :param str name: The name of the traced operation. If not provided, a default value of "agent" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        return cls._instance._start_span(
            "agent",
            name=name,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def workflow(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a predefined or static sequence of operations.

        :param str name: The name of the traced operation. If not provided, a default value of "workflow" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        return cls._instance._start_span(
            "workflow",
            name=name,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def embedding(
        cls,
        model_name: Optional[str] = None,
        name: Optional[str] = None,
        model_provider: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a call to an embedding model or function to create an embedding.

        :param str model_name: The name of the invoked embedding model.
                               If not provided, a default value of "custom" will be set.
        :param str name: The name of the traced operation. If not provided, a default value of "embedding" will be set.
        :param str model_provider: The name of the invoked LLM provider (ex: openai, bedrock).
                                   If not provided, a default value of "custom" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        if model_name is None:
            model_name = "custom"
        if model_provider is None:
            model_provider = "custom"
        return cls._instance._start_span(
            "embedding",
            name,
            model_name=model_name,
            model_provider=model_provider,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def retrieval(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        _decorator: bool = False,
    ) -> Span:
        """
        Trace a vector search operation involving a list of documents being returned from an external knowledge base.

        :param str name: The name of the traced operation. If not provided, a default value of "workflow" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.

        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        return cls._instance._start_span(
            "retrieval",
            name=name,
            session_id=session_id,
            ml_app=ml_app,
            _decorator=_decorator,
        )

    @classmethod
    def _experiment(
        cls,
        name: Optional[str] = None,
        session_id: Optional[str] = None,
        ml_app: Optional[str] = None,
        experiment_id: Optional[str] = None,
        run_id: Optional[str] = None,
        run_iteration: Optional[int] = None,
        dataset_name: Optional[str] = None,
        project_name: Optional[str] = None,
        project_id: Optional[str] = None,
        experiment_name: Optional[str] = None,
    ) -> Span:
        """
        Trace an LLM experiment, only used internally by the experiments SDK.
        :param str name: The name of the traced operation. If not provided, a default value of "agent" will be set.
        :param str session_id: The ID of the underlying user session. Required for tracking sessions.
        :param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
                           value will be set to the value of `DD_LLMOBS_ML_APP`.
        :param str experiment_id: The ID of the experiment to associate with this span and its children.
        :returns: The Span object representing the traced operation.
        """
        if cls.enabled is False:
            log.warning(SPAN_START_WHILE_DISABLED_WARNING)
        span = cls._instance._start_span("experiment", name=name, session_id=session_id, ml_app=ml_app)

        # set experiment_id in baggage if provided
        if experiment_id:
            span.context.set_baggage_item(EXPERIMENT_ID_KEY, experiment_id)

        if run_id:
            span.context.set_baggage_item(EXPERIMENT_RUN_ID_KEY, run_id)

        if run_iteration is not None:
            span.context.set_baggage_item(EXPERIMENT_RUN_ITERATION_KEY, run_iteration)

        if dataset_name:
            span.context.set_baggage_item(EXPERIMENT_DATASET_NAME_KEY, dataset_name)

        if project_id:
            span.context.set_baggage_item(EXPERIMENT_PROJECT_ID_KEY, project_id)

        if project_name:
            span.context.set_baggage_item(EXPERIMENT_PROJECT_NAME_KEY, project_name)

        if experiment_name:
            span.context.set_baggage_item(EXPERIMENT_NAME_KEY, experiment_name)

        return span

    @classmethod
    def annotate(
        cls,
        span: Optional[Span] = None,
        prompt: Optional[dict] = None,
        input_data: Optional[Any] = None,
        output_data: Optional[Any] = None,
        metadata: Optional[dict[str, Any]] = None,
        metrics: Optional[dict[str, Any]] = None,
        tags: Optional[dict[str, Any]] = None,
        tool_definitions: Optional[list[dict[str, Any]]] = None,
        _name: Optional[str] = None,
        _linked_spans: Optional[list[ExportedLLMObsSpan]] = None,
        _suppress_span_kind_error: bool = False,
    ) -> None:
        """
        Sets metadata, inputs, outputs, tags, and metrics as provided for a given LLMObs span.
        Note that with the exception of tags, this method will override any existing values for the provided fields.

        :param Span span: Span to annotate. If no span is provided, the current active span will be used.
                          Must be an LLMObs-type span, i.e. generated by the LLMObs SDK.
        :param prompt: A dictionary that represents the prompt used for an LLM call in the following form:
                        `{
                            "id": "...",
                            "template": "...",
                            "chat_template": [{"content": "...", "role": "..."}, ...])
                            "version": "...",
                            "variables": {"variable_1": "...", ...},
                            tags": {"tag_1": "...", ...},
                        }`.
                        Can also be set using the `ddtrace.llmobs.utils.Prompt` constructor class.
                        - This argument is only applicable to LLM spans.
                        - The dictionary may contain two optional keys relevant to RAG applications:
                            `rag_context_variables` - a list of variable key names that contain ground
                                                        truth context information
                            `rag_query_variables` - a list of variable key names that contains query
                                                        information for an LLM call
        :param input_data: A single input string, dictionary, or a list of dictionaries based on the span kind:
                           - llm spans: accepts a string, or a dictionary of form {"content": "...", "role": "...",
                                        "tool_calls": ..., "tool_results": ...}, where "tool_calls" are an optional
                                        list of tool call dictionaries with required keys: "name", "arguments", and
                                        optional keys: "tool_id", "type", and "tool_results" are an optional list of
                                        tool result dictionaries with required key: "result", and optional keys:
                                        "name", "tool_id", "type" for function calling scenarios.
                           - embedding spans: accepts a string, list of strings, or a dictionary of form
                                              {"text": "...", ...} or a list of dictionaries with the same signature.
                           - other: any JSON serializable type.
        :param output_data: A single output string, dictionary, or a list of dictionaries based on the span kind:
                           - llm spans: accepts a string, or a dictionary of form {"content": "...", "role": "...",
                                        "tool_calls": ...}, where "tool_calls" are an optional list of tool call
                                        dictionaries with required keys: "name", "arguments", and optional keys:
                                        "tool_id", "type" for function calling scenarios.
                           - retrieval spans: a dictionary containing any of the key value pairs
                                              {"name": str, "id": str, "text": str, "score": float},
                                              or a list of dictionaries with the same signature.
                           - other: any JSON serializable type.
        :param metadata: Dictionary of JSON serializable key-value metadata pairs relevant to the input/output operation
                         described by the LLMObs span.
        :param tags: Dictionary of JSON serializable key-value tag pairs to set or update on the LLMObs span
                     regarding the span's context.
        :param tool_definitions: list of tool definition dictionaries for tool calling scenarios.
                            - This argument is only applicable to LLM spans.
                            - Each tool definition is a dictionary containing a required "name" (string),
                                   and optional "description" (string) and "schema" (JSON serializable dictionary) keys.
        :param metrics: Dictionary of JSON serializable key-value metric pairs,
                        such as `{prompt,completion,total}_tokens`.
        """
        error = None
        try:
            if span is None:
                span = cls._instance._current_span()
                if span is None:
                    error = "invalid_span_no_active_spans"
                    raise LLMObsExportSpanError(
                        "No span provided and no active LLMObs-generated span found. "
                        "Ensure you pass the span explicitly using LLMObs.annotate(span=<your_span>, ...) "
                        "when annotating from a different thread or async task than where the span was created."
                    )
            if span.span_type != SpanTypes.LLM:
                error = "invalid_span_type"
                raise LLMObsExportSpanError("Span must be an LLMObs-generated span.")
            if span.finished:
                error = "invalid_finished_span"
                raise LLMObsAnnotateSpanError("Cannot annotate a finished span.")
            if metadata is not None:
                if not isinstance(metadata, dict):
                    error = "invalid_metadata"
                    raise LLMObsAnnotateSpanError("metadata must be a dictionary")
                else:
                    cls._set_dict_attribute(span, METADATA, metadata)
            if metrics is not None:
                if not isinstance(metrics, dict) or not all(isinstance(v, (int, float)) for v in metrics.values()):
                    error = "invalid_metrics"
                    raise LLMObsAnnotateSpanError("metrics must be a dictionary of string key - numeric value pairs.")
                else:
                    cls._set_dict_attribute(span, METRICS, metrics)
            if tags is not None:
                if not isinstance(tags, dict):
                    error = "invalid_tags"
                    raise LLMObsAnnotateSpanError(
                        "span tags must be a dictionary of string key - primitive value pairs."
                    )
                else:
                    session_id = tags.get("session_id")
                    if session_id:
                        span._set_ctx_item(SESSION_ID, str(session_id))
                    cls._set_dict_attribute(span, TAGS, tags)
            if tool_definitions is not None:
                validated_tool_definitions = extract_tool_definitions(tool_definitions)
                if validated_tool_definitions:
                    span._set_ctx_item(TOOL_DEFINITIONS, validated_tool_definitions)
            span_kind = span._get_ctx_item(SPAN_KIND)
            if _name is not None:
                span.name = _name
            if prompt is not None:
                try:
                    validated_prompt = _validate_prompt(prompt, strict_validation=False)
                    cls._set_dict_attribute(span, INPUT_PROMPT, validated_prompt)
                    cls._set_dict_attribute(
                        span,
                        TAGS,
                        {PROMPT_TRACKING_INSTRUMENTATION_METHOD: INSTRUMENTATION_METHOD_ANNOTATED},
                    )
                except (ValueError, TypeError) as e:
                    error = "invalid_prompt"
                    raise LLMObsAnnotateSpanError("Failed to validate prompt with error:", str(e))
            if (
                not span_kind and not _suppress_span_kind_error
            ):  # TODO(sabrenner): we should figure out how to remove this check for annotation contexts
                raise LLMObsAnnotateSpanError("Span kind not specified, skipping annotation for input/output data")

            annotation_error_message = None
            if input_data is not None or output_data is not None:
                if span_kind == "llm":
                    annotation_error_message, error = cls._tag_llm_io(
                        span, input_messages=input_data, output_messages=output_data
                    )
                elif span_kind == "embedding":
                    annotation_error_message, error = cls._tag_embedding_io(
                        span, input_documents=input_data, output_text=output_data
                    )
                elif span_kind == "retrieval":
                    annotation_error_message, error = cls._tag_retrieval_io(
                        span, input_text=input_data, output_documents=output_data
                    )
                elif span_kind == "experiment":
                    cls._tag_freeform_io(span, input_value=input_data, output_value=output_data)
                else:
                    cls._tag_text_io(span, input_value=input_data, output_value=output_data)
            if _linked_spans and isinstance(_linked_spans, list):
                for linked_span in _linked_spans:
                    # for now, assume all span links are output to input as we do not currently use this for anything
                    add_span_link(
                        span,
                        linked_span.get("span_id", ""),
                        linked_span.get("trace_id", ""),
                        "output",
                        "input",
                    )
            if annotation_error_message:
                raise LLMObsAnnotateSpanError(annotation_error_message)
        finally:
            telemetry.record_llmobs_annotate(span, error)

    @classmethod
    def _tag_llm_io(cls, span, input_messages=None, output_messages=None) -> tuple[Optional[str], Optional[str]]:
        """Tags input/output messages for LLM-kind spans.
        Will be mapped to span's `meta.{input,output}.messages` fields.
        """
        if input_messages is not None:
            try:
                if not isinstance(input_messages, Messages):
                    input_messages = Messages(input_messages)
                if input_messages.messages:
                    span._set_ctx_item(INPUT_MESSAGES, input_messages.messages)
            except TypeError:
                return "Failed to parse input messages.", "invalid_io_messages"
        if output_messages is None:
            return None, None
        try:
            if not isinstance(output_messages, Messages):
                output_messages = Messages(output_messages)
            if not output_messages.messages:
                return None, None
            span._set_ctx_item(OUTPUT_MESSAGES, output_messages.messages)
        except TypeError:
            return "Failed to parse output messages.", "invalid_io_messages"
        return None, None

    @classmethod
    def _tag_embedding_io(cls, span, input_documents=None, output_text=None) -> tuple[Optional[str], Optional[str]]:
        """Tags input documents and output text for embedding-kind spans.
        Will be mapped to span's `meta.{input,output}.text` fields.
        """
        if input_documents is not None:
            try:
                if not isinstance(input_documents, Documents):
                    input_documents = Documents(input_documents)
                if input_documents.documents:
                    span._set_ctx_item(INPUT_DOCUMENTS, input_documents.documents)
            except TypeError:
                return "Failed to parse input documents.", "invalid_embedding_io"
        if output_text is None:
            return None, None
        span._set_ctx_item(OUTPUT_VALUE, str(output_text))
        return None, None

    @classmethod
    def _tag_retrieval_io(cls, span, input_text=None, output_documents=None) -> tuple[Optional[str], Optional[str]]:
        """Tags input text and output documents for retrieval-kind spans.
        Will be mapped to span's `meta.{input,output}.text` fields.
        """
        if input_text is not None:
            span._set_ctx_item(INPUT_VALUE, safe_json(input_text))
        if output_documents is None:
            return None, None
        try:
            if not isinstance(output_documents, Documents):
                output_documents = Documents(output_documents)
            if not output_documents.documents:
                return None, None
            span._set_ctx_item(OUTPUT_DOCUMENTS, output_documents.documents)
        except TypeError:
            return "Failed to parse output documents.", "invalid_retrieval_io"
        return None, None

    @classmethod
    def _tag_text_io(cls, span, input_value=None, output_value=None):
        """Tags input/output values for non-LLM kind spans.
        Will be mapped to span's `meta.{input,output}.values` fields.
        """
        if input_value is not None:
            span._set_ctx_item(INPUT_VALUE, safe_json(input_value))
        if output_value is not None:
            span._set_ctx_item(OUTPUT_VALUE, safe_json(output_value))

    @classmethod
    def _tag_freeform_io(cls, span, input_value=None, output_value=None):
        """Tags input/output values for experient spans.
        Will be mapped to span's `meta.{input,output}` fields.
        this is meant to be non restrictive on user's data, experiments allow
        arbitrary structured or non structured IO values in its spans
        """
        if input_value is not None:
            span._set_ctx_item(EXPERIMENTS_INPUT, input_value)
        if output_value is not None:
            span._set_ctx_item(EXPERIMENTS_OUTPUT, output_value)

    @staticmethod
    def _set_dict_attribute(span: Span, key, value: dict[str, Any]) -> None:
        """Sets a given LLM Obs span attribute with a dictionary key/values.
        If the attribute is already set on the span, the new dict with be merged with the existing
        dict.
        """
        existing_value = span._get_ctx_item(key) or {}
        existing_value.update(value)
        span._set_ctx_item(key, existing_value)

    @classmethod
    def submit_evaluation(
        cls,
        label: str,
        metric_type: str,
        value: Union[str, int, float, bool],
        span: Optional[dict] = None,
        span_with_tag_value: Optional[dict[str, str]] = None,
        tags: Optional[dict[str, str]] = None,
        ml_app: Optional[str] = None,
        timestamp_ms: Optional[int] = None,
        metadata: Optional[dict[str, object]] = None,
        assessment: Optional[str] = None,
        reasoning: Optional[str] = None,
    ) -> None:
        """
        Submits a custom evaluation metric for a given span.

        :param str label: The name of the evaluation metric.
        :param str metric_type: The type of the evaluation metric. One of "categorical", "score", "boolean".
        :param value: The value of the evaluation metric.
                      Must be a string (categorical), integer (score), float (score), or boolean (boolean).
        :param dict span: A dictionary of shape {'span_id': str, 'trace_id': str} uniquely identifying
                            the span associated with this evaluation.
        :param dict span_with_tag_value: A dictionary with the format {'tag_key': str, 'tag_value': str}
                            uniquely identifying the span associated with this evaluation.
        :param tags: A dictionary of string key-value pairs to tag the evaluation metric with.
        :param str ml_app: The name of the ML application
        :param int timestamp_ms: The unix timestamp in milliseconds when the evaluation metric result was generated.
                                    If not set, the current time will be used.
        :param dict metadata: A JSON serializable dictionary of key-value metadata pairs relevant to the
                                evaluation metric.
        :param str assessment: An assessment of this evaluation. Must be either "pass" or "fail".
        :param str reasoning: An explanation of the evaluation result.
        """
        if cls.enabled is False:
            log.debug(
                "LLMObs.submit_evaluation() called when LLMObs is not enabled. ",
                "Evaluation metric data will not be sent.",
            )
            return

        error = None
        join_on = {}
        try:
            has_exactly_one_joining_key = (span is not None) ^ (span_with_tag_value is not None)

            if not has_exactly_one_joining_key:
                error = "provided_both_span_and_tag_joining_key"
                raise ValueError(
                    "Exactly one of `span` or `span_with_tag_value` must be specified to submit an evaluation metric."
                )

            if span is not None:
                if (
                    not isinstance(span, dict)
                    or not isinstance(span.get("span_id"), str)
                    or not isinstance(span.get("trace_id"), str)
                ):
                    error = "invalid_span"
                    raise TypeError(
                        "`span` must be a dictionary containing both span_id and trace_id keys. "
                        "LLMObs.export_span() can be used to generate this dictionary from a given span."
                    )
                join_on["span"] = span
            elif span_with_tag_value is not None:
                if (
                    not isinstance(span_with_tag_value, dict)
                    or not isinstance(span_with_tag_value.get("tag_key"), str)
                    or not isinstance(span_with_tag_value.get("tag_value"), str)
                ):
                    error = "invalid_joining_key"
                    raise TypeError(
                        "`span_with_tag_value` must be a dict with keys 'tag_key' and 'tag_value' "
                        "containing string values"
                    )
                join_on["tag"] = {
                    "key": span_with_tag_value.get("tag_key"),
                    "value": span_with_tag_value.get("tag_value"),
                }

            timestamp_ms = timestamp_ms if timestamp_ms else int(time.time() * 1000)

            if not isinstance(timestamp_ms, int) or timestamp_ms < 0:
                error = "invalid_timestamp"
                raise ValueError("timestamp_ms must be a non-negative integer. Evaluation metric data will not be sent")

            if not label:
                error = "invalid_metric_label"
                raise ValueError("label must be the specified name of the evaluation metric.")

            if "." in label:
                error = "invalid_label_value"
                raise ValueError("label value must not contain a '.'.")

            metric_type = metric_type.lower()
            if metric_type not in ("categorical", "score", "boolean", "json"):
                error = "invalid_metric_type"
                raise ValueError("metric_type must be one of 'categorical', 'score', 'boolean', or 'json'.")

            if metric_type == "categorical" and not isinstance(value, str):
                error = "invalid_metric_value"
                raise TypeError("value must be a string for a categorical metric.")
            if metric_type == "score" and not isinstance(value, (int, float)):
                error = "invalid_metric_value"
                raise TypeError("value must be an integer or float for a score metric.")
            if metric_type == "boolean" and not isinstance(value, bool):
                error = "invalid_metric_value"
                raise TypeError("value must be a boolean for a boolean metric.")
            if metric_type == "json" and not isinstance(value, dict):
                error = "invalid_metric_value"
                raise TypeError("value must be a dict for a json metric.")

            if tags is not None and not isinstance(tags, dict):
                raise LLMObsSubmitEvaluationError("tags must be a dictionary of string key-value pairs.")

            ml_app = ml_app if ml_app else config._llmobs_ml_app
            if not ml_app:
                error = "missing_ml_app"
                raise LLMObsSubmitEvaluationError(
                    "ML App name is required for sending evaluation metrics. Evaluation metric data will not be sent. "
                    "Ensure this configuration is set before running your application."
                )

            evaluation_tags = {
                "ddtrace.version": __version__,
                "ml_app": ml_app,
            }

            if tags:
                for k, v in tags.items():
                    try:
                        evaluation_tags[ensure_text(k)] = ensure_text(v)
                    except TypeError:
                        error = "invalid_tags"
                        raise LLMObsSubmitEvaluationError(
                            "Failed to parse tags. Tags for evaluation metrics must be strings."
                        )

            # Auto-add source:otel tag when OTel tracing is enabled
            # This allows the backend to wait for OTel span conversion
            if config._otel_trace_enabled:
                evaluation_tags["source"] = "otel"

            evaluation_metric: LLMObsEvaluationMetricEvent = {
                "join_on": join_on,
                "label": str(label),
                "metric_type": metric_type,
                "timestamp_ms": timestamp_ms,
                "{}_value".format(metric_type): value,  # type: ignore
                "ml_app": ml_app,
                "tags": ["{}:{}".format(k, v) for k, v in evaluation_tags.items()],
            }

            if assessment:
                if not isinstance(assessment, str) or assessment not in (
                    "pass",
                    "fail",
                ):
                    error = "invalid_assessment"
                    raise LLMObsSubmitEvaluationError(
                        "Failed to parse assessment. assessment must be either 'pass' or 'fail'."
                    )
                else:
                    evaluation_metric["assessment"] = assessment
            if reasoning:
                if not isinstance(reasoning, str):
                    error = "invalid_reasoning"
                    raise LLMObsSubmitEvaluationError("Failed to parse reasoning. reasoning must be a string.")
                else:
                    evaluation_metric["reasoning"] = reasoning

            if metadata:
                if not isinstance(metadata, dict):
                    error = "invalid_metadata"
                    raise LLMObsSubmitEvaluationError("metadata must be json serializable dictionary.")
                else:
                    metadata = safe_json(metadata)
                    if metadata and isinstance(metadata, str):
                        evaluation_metric["metadata"] = json.loads(metadata)

            cls._instance._llmobs_eval_metric_writer.enqueue(evaluation_metric)
        finally:
            telemetry.record_llmobs_submit_evaluation(join_on, metric_type, error)

    @classmethod
    def _inject_llmobs_context(cls, span_context: Context, request_headers: dict[str, str]) -> None:
        if cls.enabled is False:
            return

        active_span = cls._instance._llmobs_context_provider.active()
        active_context = active_span.context if isinstance(active_span, Span) else active_span

        parent_id = str(active_context.span_id) if active_context is not None else ROOT_PARENT_ID

        ml_app = None
        if isinstance(active_span, Span):
            ml_app = active_span._get_ctx_item(ML_APP)
            llmobs_trace_id = active_span._get_ctx_item(LLMOBS_TRACE_ID)
        elif active_context is not None:
            ml_app = active_context._meta.get(PROPAGATED_ML_APP_KEY) or config._llmobs_ml_app
            llmobs_trace_id = active_context._meta.get(PROPAGATED_LLMOBS_TRACE_ID_KEY) or generate_128bit_trace_id()
        else:
            ml_app = config._llmobs_ml_app
            llmobs_trace_id = generate_128bit_trace_id()

        span_context._meta[PROPAGATED_PARENT_ID_KEY] = parent_id
        span_context._meta[PROPAGATED_LLMOBS_TRACE_ID_KEY] = str(llmobs_trace_id)

        if ml_app is not None:
            span_context._meta[PROPAGATED_ML_APP_KEY] = ml_app

    @classmethod
    def inject_distributed_headers(cls, request_headers: dict[str, str], span: Optional[Span] = None) -> dict[str, str]:
        """Injects the span's distributed context into the given request headers."""
        if cls.enabled is False:
            log.warning(
                "LLMObs.inject_distributed_headers() called when LLMObs is not enabled. "
                "Distributed context will not be injected."
            )
            return request_headers
        error = None
        try:
            if not isinstance(request_headers, dict):
                error = "invalid_request_headers"
                raise LLMObsInjectDistributedHeadersError(
                    "request_headers must be a dictionary of string key-value pairs."
                )
            if span is None:
                span = cls._instance.tracer.current_span()
            if span is None:
                error = "no_active_span"
                raise LLMObsInjectDistributedHeadersError("No span provided and no currently active span found.")
            if not isinstance(span, Span):
                raise LLMObsInjectDistributedHeadersError(
                    "span must be a valid Span object. Distributed context will not be injected."
                )
            HTTPPropagator.inject(span.context, request_headers)
            return request_headers
        finally:
            telemetry.record_inject_distributed_headers(error)

    @classmethod
    def _activate_llmobs_distributed_context_soft_fail(cls, request_headers: dict[str, str], context: Context) -> None:
        cls._activate_llmobs_distributed_context(request_headers, context, _soft_fail=True)

    @classmethod
    def _activate_llmobs_distributed_context(
        cls, request_headers: dict[str, str], context: Context, _soft_fail: bool = False
    ) -> None:
        error = None
        try:
            if cls.enabled is False:
                return
            if not context.trace_id or not context.span_id:
                error = "missing_context"
                if _soft_fail:
                    log.warning("Failed to extract trace/span ID from request headers.")
                    return
                raise LLMObsActivateDistributedHeadersError("Failed to extract trace/span ID from request headers.")
            _parent_id = context._meta.get(PROPAGATED_PARENT_ID_KEY)
            if _parent_id is None:
                error = "missing_parent_id"
                log.debug("Failed to extract LLMObs parent ID from request headers.")
                return
            try:
                parent_id = int(_parent_id)
            except ValueError:
                error = "invalid_parent_id"
                log.warning("Failed to parse LLMObs parent ID from request headers.")
                return
            parent_llmobs_trace_id = context._meta.get(PROPAGATED_LLMOBS_TRACE_ID_KEY)
            if parent_llmobs_trace_id is None:
                log.debug(
                    "Failed to extract LLMObs trace ID from request headers. Expected string, got None. "
                    "Defaulting to the corresponding APM trace ID."
                )
                llmobs_context = Context(trace_id=context.trace_id, span_id=parent_id)
                llmobs_context._meta[PROPAGATED_LLMOBS_TRACE_ID_KEY] = str(context.trace_id)
                cls._instance._llmobs_context_provider.activate(llmobs_context)
                error = "missing_parent_llmobs_trace_id"
                return
            llmobs_context = Context(trace_id=context.trace_id, span_id=parent_id)
            llmobs_context._meta[PROPAGATED_LLMOBS_TRACE_ID_KEY] = str(parent_llmobs_trace_id)
            cls._instance._llmobs_context_provider.activate(llmobs_context)
        finally:
            telemetry.record_activate_distributed_headers(error)

    @classmethod
    def activate_distributed_headers(cls, request_headers: dict[str, str]) -> None:
        """
        Activates distributed tracing headers for the current request.

        :param request_headers: A dictionary containing the headers for the current request.
        """
        if cls.enabled is False:
            log.warning(
                "LLMObs.activate_distributed_headers() called when LLMObs is not enabled. "
                "Distributed context will not be activated."
            )
            return
        context = HTTPPropagator.extract(request_headers)
        cls._instance.tracer.context_provider.activate(context)
        cls._instance._activate_llmobs_distributed_context(request_headers, context, _soft_fail=False)


# initialize the default llmobs instance
LLMObs._instance = LLMObs()
