o
    i'                     @   s   d Z ddlmZ ddlmZ ddlZddlmZ ddlZddlm	Z	 e	e
ZG dd deZG d	d
 d
eZG dd deZG dd dejZG dd dejZddeeef fddZdS )z
This file contains shared utilities for tracing streams in LLMobs integrations. Integrations should
implement a StreamHandler and / or AsyncStreamHandler subclass to be passed into the make_traced_stream
factory function along with the stream to wrap.
    )ABC)abstractmethodN)Union)
get_loggerc                   @   s:   e Zd Zdd Zdd Zdd Zdd Zedd
dZd	S )BaseStreamHandlerc                 K   s8   || _ || _|| _|| _|| _||fg| _|  | _d S N)integrationprimary_spanrequest_argsrequest_kwargsoptionsspansinitialize_chunk_storagechunks)selfr   spanargskwargsr    r   d/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/llmobs/_integrations/base_stream_handler.py__init__   s   zBaseStreamHandler.__init__c                 C   s   g S r   r   r   r   r   r   r      s   z*BaseStreamHandler.initialize_chunk_storagec                 C   s   | j ||f dS )z
        Add a span to the list of spans to be finished when the stream ends. This is useful for integrations that
        need to create multiple spans for a single stream like LiteLLM.
        N)r   append)r   r   r   r   r   r   add_span"   s   zBaseStreamHandler.add_spanc                 C   s   | j r| j jt   dS dS )z
        Handle exceptions that occur during streaming.

        Default implementation sets exception info on the primary span.

        Args:
            exception: The exception that occurred
        N)r	   set_exc_infosysexc_infor   	exceptionr   r   r   handle_exception)   s   	z"BaseStreamHandler.handle_exceptionNc                 C      t d)a*  
        Finalize the stream and complete all spans.

        This method is called when the stream ends (successfully or with error).
        Implementations should:
        1. Process accumulated chunks into final response
        2. Set appropriate span tags
        3. Finish all spans
        z3finalize_stream must be implemented by the subclassNotImplementedErrorr   r   r   r   finalize_stream5   s   z!BaseStreamHandler.finalize_streamr   )	__name__
__module____qualname__r   r   r   r   r   r#   r   r   r   r   r      s    
r   c                   @      e Zd ZdZedddZdS )StreamHandlera  
    Instances of StreamHandler and AsyncStreamHandler contain the logic for initializing chunk storage, processing
    chunks, handling exceptions, and finalizing a stream. The only methods that need to be implemented are
    process_chunk (a callback function that is called for each chunk in the stream) and finalize_stream (a
    callback function that is called when the stream ends). All other methods are optional and can be
    overridden if needed (for example, extra exception processing logic in handle_exception).

    Note that it is possible to pass in extra arguments via the options argument in case you need to access other
    information within the stream handler that is not covered by the existing arguments.
    Nc                 C   r    )a;  
        Process a single chunk from the stream.

        This method is called for each chunk as it's received.
        Implementations should extract and store relevant data.

        Args:
            chunk: The chunk object from the stream
            iterator: The sync iterator object from the stream
        1process_chunk must be implemented by the subclassr!   r   chunkiteratorr   r   r   process_chunkO   s   zStreamHandler.process_chunkr   r$   r%   r&   __doc__r   r-   r   r   r   r   r(   C   s    r(   c                   @   r'   )AsyncStreamHandlerz)
    Async version of StreamHandler.
    Nc                    s
   t d)a<  
        Process a single chunk from the stream.

        This method is called for each chunk as it's received.
        Implementations should extract and store relevant data.

        Args:
            chunk: The chunk object from the stream
            iterator: The async iterator object from the stream
        r)   r!   r*   r   r   r   r-   c   s   z AsyncStreamHandler.process_chunkr   r.   r   r   r   r   r0   ^   s    r0   c                       T   e Zd ZdZddef fddZdd Zdd	 Zd
d Zdd Z	e
dd Z  ZS )TracedStreama  
    The TracedStream and AsyncTracedStream classes are wrappers around the underlying stream object that deal with
    iterating over the stream and calling the stream handler to process chunks, handle exceptions, and finalize the
    stream. Because each library's streamed response is different, these traced stream classes are meant to be generic
    enough to work with iterables, iterators, generators, and context managers.
    Nhandlerc                    $   t  | || _|| _| j| _dS )a  
        Wrap a stream object to trace the stream.

        Args:
            wrapped: The stream object to wrap
            handler: The StreamHandler instance to use for processing chunks
            on_stream_created: In the case that the stream is created by a stream manager, this
                callback function will be called when the underlying stream is created in case
                modifications to the stream object are needed
        N)superr   _self_handler_self_on_stream_created__wrapped___self_stream_iterr   wrappedr3   on_stream_created	__class__r   r   r   z      zTracedStream.__init__c              
   c   sz    d }z2z| j D ]}| j|| j  |V  qW n ty, } z	|}| j|  d }~ww W | j| d S | j| w r   )r9   r6   r-   	Exceptionr   r#   r   excr+   er   r   r   __iter__   s   
zTracedStream.__iter__c              
   C   sl   z| j  }| j|| j  |W S  ty   | j    ty5 } z| j| | j|  d }~ww r   )r9   __next__r6   r-   StopIterationr#   r@   r   r   r+   rC   r   r   r   rE      s   

zTracedStream.__next__c                 C   sB   | j  }|| j u r| S || _t|| j| j}| jr| | |S )a  
        Enter the context of the stream.

        If the stream is wrapped by a stream manager, the stream manager will be entered and the
        underlying stream will be wrapped in a TracedStream object. We retain a reference to the
        underlying stream object to be consumed.

        If the stream is not wrapped by a stream manager, the stream will be returned as is.
        )r8   	__enter__r9   r2   r6   r7   r   resulttraced_streamr   r   r   rH      s   



zTracedStream.__enter__c                 C   s   | j |||S r   )r8   __exit__r   exc_typeexc_valexc_tbr   r   r   rL      s   zTracedStream.__exit__c                 C      | j S r   r6   r   r   r   r   r3         zTracedStream.handlerr   )r$   r%   r&   r/   r(   r   rD   rE   rH   rL   propertyr3   __classcell__r   r   r=   r   r2   r   s    r2   c                       r1   )TracedAsyncStreamz(
    Async version of TracedStream.
    Nr3   c                    r4   )a  
        Wrap an async stream object to trace the stream.

        Args:
            wrapped: The stream object to wrap
            handler: The AsyncStreamHandler instance to use for processing chunks
            on_stream_created: In the case that the stream is created by a stream manager, this
                callback function will be called when the underlying stream is created in case
                modifications to the stream object are needed
        N)r5   r   r6   r7   r8   _self_async_stream_iterr:   r=   r   r   r      r?   zTracedAsyncStream.__init__c              
   C  s   d }z:z| j 2 z3 d H W }| j|| j I d H  |V  q6 W n ty4 } z	|}| j|  d }~ww W | j| d S | j| w r   )rW   r6   r-   r@   r   r#   rA   r   r   r   	__aiter__   s   zTracedAsyncStream.__aiter__c              
      sz   z| j  I d H }| j|| j I d H  |W S  ty$   | j    ty< } z| j| | j|  d }~ww r   )rW   	__anext__r6   r-   StopAsyncIterationr#   r@   r   rG   r   r   r   rY      s   
zTracedAsyncStream.__anext__c                    sJ   | j  I dH }|| j u r| S || _t|| j| j}| jr#| | |S )a  
        Enter the context of the stream.

        If the stream is wrapped by a stream manager, the stream manager will be entered and the
        underlying stream will be wrapped in a TracedAsyncStream object. We retain a reference to the
        underlying stream object to be consumed.

        If the stream is not wrapped by a stream manager, the stream will be returned as is.
        N)r8   
__aenter__rW   rV   r6   r7   rI   r   r   r   r[      s   


zTracedAsyncStream.__aenter__c                    s   | j |||I d H S r   )r8   	__aexit__rM   r   r   r   r\     s   zTracedAsyncStream.__aexit__c                 C   rQ   r   rR   r   r   r   r   r3     rS   zTracedAsyncStream.handlerr   )r$   r%   r&   r/   r0   r   rX   rY   r[   r\   rT   r3   rU   r   r   r=   r   rV      s    rV   r3   c                 C   s"   t |trt| ||S t| ||S )zf
    Create a TracedStream or TracedAsyncStream object from a stream object and a stream handler.
    )
isinstancer0   rV   r2   )r;   r3   r<   r   r   r   make_traced_stream  s   
r^   r   )r/   abcr   r   r   typingr   wraptddtrace.internal.loggerr   r$   logr   r(   r0   ObjectProxyr2   rV   r^   r   r   r   r   <module>   s    /NK