o
    i                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ G dd dZG dd	 d	eeZ	G d
d deeZ
dd Zdd Zdd Zdd ZdS )    N)core)AsyncStreamHandler)StreamHandler)make_traced_streamc                   @   s   e Zd Zdd ZdddZdS )BaseLangchainStreamHandlerc                 C   s.   | j | | jdd }|r|| d S d S )Nchunk_callback)chunksappendoptionsget)selfchunkr    r   \/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/langchain/utils.py_process_chunk   s
   z)BaseLangchainStreamHandler._process_chunkNc                 C   s.   | j dd }|r|| j| j | j  d S )Non_span_finish)r
   r   primary_spanr   finish)r   	exceptionr   r   r   r   finalize_stream   s   z*BaseLangchainStreamHandler.finalize_streamN)__name__
__module____qualname__r   r   r   r   r   r   r   
   s    r   c                   @      e Zd ZdddZdS )LangchainStreamHandlerNc                 C   s   |  | d S r   r   r   r   iteratorr   r   r   process_chunk   s   z$LangchainStreamHandler.process_chunkr   r   r   r   r   r   r   r   r   r          r   c                   @   r   )LangchainAsyncStreamHandlerNc                    s   |  | d S r   r   r   r   r   r   r      s   z)LangchainAsyncStreamHandler.process_chunkr   r    r   r   r   r   r"      r!   r"   c              
   K   s   |j  d|jj |d|d}	|	| | jdi |	}
|
dd ||
 z.||i |}t|||}t|rGt	|t
| |
||||dW S t	|t| |
||||dW S  tyg   |
jt   |
   w )N.T)operation_idinterface_typesubmit_to_llmobsinstancezlangchain.request.stream)r   r   r   )r   	__class__r   updatetraceset_tag_get_chunk_callbackinspect
isasyncgenr   r"   r   	Exceptionset_exc_infosysexc_infor   )integrationfuncr'   argskwargsr%   on_span_startedon_span_finishedextra_optionsr
   spanrespr   r   r   r   shared_stream"   s<   

r<   c                 C   sB   t d| ||f}g }| D ]}|r|jr||j qt|S )Nzlangchain.stream.chunk.callback)r   dispatch_with_resultsvaluesvaluer	   _build_chunk_callback)r%   r5   r6   results	callbacksresultr   r   r   r,   Q   s   

r,   c                    s    st S  fdd}|S )Nc                    s    D ]}||  q| S r   r   )r   callbackrB   r   r   _chunk_callback`   s   
z._build_chunk_callback.<locals>._chunk_callback)_no_op_callback)rB   rF   r   rE   r   r@   \   s   r@   c                 C   s   d S r   r   )r   r   r   r   rG   h   s   rG   )r-   r1   ddtrace.internalr   0ddtrace.llmobs._integrations.base_stream_handlerr   r   r   r   r   r"   r<   r,   r@   rG   r   r   r   r   <module>   s    /