o
    i&                     @   sf  d dl Z d dlmZ d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ eeZdd	 Zd
d Zdd ZG dd dZG dd deeZG dd deeZdd Zdd Zdeeeef ef fddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Z d'edefd(d)Z!d'edefd*d+Z"d'edefd,d-Z#d'edefd.d/Z$d'edefd0d1Z%dS )2    N)Any)
get_logger)AsyncStreamHandler)StreamHandler)make_traced_stream	_get_attrc                 c   s0    | D ]}|j dkr|jj dkr|jjV  qd S Ncontent_block_delta
text_deltatypedeltatexttraced_streamchunk r   a/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/anthropic/_streaming.py_text_stream_generator   s   
r   c                 C  s:   | 2 z3 d H W }|j dkr|jj dkr|jjV  q6 d S r	   r   r   r   r   r   _async_text_stream_generator   s   
r   c                 C   sl   dd }dd }t |st|rt|t| ||||d}|S t|s&t|r4t|t| ||||d}|S dS )a  
    Creates a traced stream with a callback that adds a text_stream attribute
    to the underlying stream object when it is created.

    Overrides the `text_stream` attribute to trace yielded chunks; otherwise,
    the underlying stream will bypass the wrapper tracing code
    c                 S      t | | _d S N)r   text_streamstreamr   r   r   add_text_stream%      z1handle_streamed_response.<locals>.add_text_streamc                 S   r   r   )r   r   r   r   r   r   add_async_text_stream(   r   z7handle_streamed_response.<locals>.add_async_text_stream)on_stream_createdN)
_is_stream_is_stream_managerr   AnthropicStreamHandler_is_async_stream_is_async_stream_managerAnthropicAsyncStreamHandler)integrationrespargskwargsspanr   r   r   r   r   r   handle_streamed_response   s   	r+   c                   @      e Zd ZdddZdS )BaseAnthropicStreamHandlerNc                 C   s(   t | j| j| j| j| j | j  d S r   )_process_finished_streamr&   primary_spanrequest_argsrequest_kwargschunksfinish)self	exceptionr   r   r   finalize_stream:   s   z*BaseAnthropicStreamHandler.finalize_streamr   )__name__
__module____qualname__r6   r   r   r   r   r-   9       r-   c                   @   r,   )r"   Nc                 C   s   | j | d S r   r2   appendr4   r   iteratorr   r   r   process_chunkB   s   z$AnthropicStreamHandler.process_chunkr   r7   r8   r9   r?   r   r   r   r   r"   A   r:   r"   c                   @   r,   )r%   Nc                    s   | j | d S r   r;   r=   r   r   r   r?   G   s   z)AnthropicAsyncStreamHandler.process_chunkr   r@   r   r   r   r   r%   F   r:   r%   c                 C   sD   zt |}| j|g ||d W d S  ty!   tjddd Y d S w )N)r(   r)   responsez3Error processing streamed completion/chat response.T)exc_info)_construct_messagellmobs_set_tags	Exceptionlogwarning)r&   r*   r(   r)   streamed_chunksresp_messager   r   r   r.   K   s   r.   c                 C   s    dg i}| D ]}t ||}q|S )zIteratively build up a response message from streamed chunks.

    The resulting message dictionary is of form:
      {"content": [{"type": [TYPE], "text": "[TEXT]"}], "role": "...", "finish_reason": "...", "usage": ...}
    content)_extract_from_chunk)rH   messager   r   r   r   rC   T   s   rC   returnc                 C   s>   t tttttd}t| dd}||}|dur|| |}|S )zJConstructs a chat message dictionary from streamed chunks given chunk type)message_startcontent_block_startr
   content_block_stopmessage_deltaerrorr    N)_on_message_start_chunk_on_content_block_start_chunk_on_content_block_delta_chunk_on_content_block_stop_chunk_on_message_delta_chunk_on_error_chunkr   get)r   rL   TRANSFORMATIONS_BY_BLOCK_TYPE
chunk_typetransformationr   r   r   rK   `   s   

rK   c                 C   s   t | dd}|rFt |dd}t |dd}|r||d< |rFdt |ddi|d< t |dd }t |dd }|d ur<||d d< |d urF||d d< |S )	NrL   rS   roleusageinput_tokensr   cache_creation_input_tokenscache_read_input_tokensr   )r   rL   chunk_message
chunk_rolechunk_usagecache_write_tokenscache_read_tokensr   r   r   rT   r   s   rT   c                 C   sv   t | dd}|r9t |dd}|dkr$t |dd}|d d|d |S |dkr9t |dd}|d d|dd	 |S )
Ncontent_blockrS   r   r   rJ   )r   r   tool_usename)r   rj   input)r   r<   )r   rL   chunk_content_blockchunk_content_block_typechunk_content_block_textchunk_content_block_namer   r   r   rU      s   rU   c                 C   s   | ds|S t| dd}|rOt|dd}|r#|d d d  |7  < t|dd}|rOt|dddkrOd	|d d vrCd|d d d	< |d d d	  |7  < |S )
zAppend new content from delta events to current message.content block
    Note: Anthropic beta streaming can emit content_block_delta without a corresponding
    content_block_start. Guard to avoid IndexError which breaks span construction.
    rJ   r   rS   r   partial_jsonr   input_json_deltark   )rZ   r   )r   rL   delta_blockchunk_content_textchunk_content_jsonr   r   r   rV      s   
rV   c                 C   sX   | ds|S t|d d dd}|dkr*t|d d dd}t||d d d< |S )zFinalize the current content block, parsing tool_use input JSON into a dict.
    Anthropic beta streaming can emit content_block_stop without a corresponding
    content_block_start. Guard to avoid IndexError which breaks span construction.
    rJ   rp   r   rS   ri   rk   z{})rZ   r   jsonloads)r   rL   content_type
input_jsonr   r   r   rW      s   
rW   c                 C   s   t | dd}t |dd}|r||d< t | di }|rK|dddd}t |dd|d< t |d	d }t |d
d }|d ur?||d	< |d urG||d
< ||d< |S )Nr   rS   stop_reasonfinish_reasonr_   r   )output_tokensr`   r|   ra   rb   )r   rZ   )r   rL   rs   chunk_finish_reasonre   message_usagecache_creation_tokensrg   r   r   r   rX      s    rX   c                 C   sN   t | dr%i |d< t | jdr| jj|d d< t | jdr%| jj|d d< |S )NrR   r   rL   )r   rR   r   rL   )r   rL   r   r   r   rY      s   
rY   r'   c                 C   .   dD ]}t t|rt| tt|r dS qdS )N)StreamBetaMessageStreamTFhasattr	anthropic
isinstancegetattrr'   attrr   r   r   r       
   r    c                 C   r   )N)AsyncStreamBetaAsyncMessageStreamTFr   r   r   r   r   r#      r   r#   c                 C   r   )N)MessageStreamManagerBetaMessageStreamManagerTFr   r   r   r   r   r!      r   r!   c                 C   r   )N)AsyncMessageStreamManagerBetaAsyncMessageStreamManagerTFr   r   r   r   r   r$      r   r$   c                 C   s    t | pt| pt| pt| S r   )r    r#   r!   r$   )r'   r   r   r   is_streaming_operation   s    r   )&rv   typingr   r   ddtrace.internal.loggerr   0ddtrace.llmobs._integrations.base_stream_handlerr   r   r   ddtrace.llmobs._utilsr   r7   rF   r   r   r+   r-   r"   r%   r.   rC   tupledictstrboolrK   rT   rU   rV   rW   rX   rY   r    r#   r!   r$   r   r   r   r   r   <module>   s:    	
