o
    i                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dl
mZ eeZG d	d
 d
ZG dd dee	ZG dd deeZdd Zdd Zdd Zdd ZdddZdS )    )defaultdict)AsyncGenerator)	Generator)
get_logger)AsyncStreamHandler)StreamHandler0openai_construct_completion_from_streamed_chunks-openai_construct_message_from_streamed_chunksc                   @   s   e Zd Zdd ZdddZdS )BaseOpenAIStreamHandlerc                 C   s   t tS N)r   list)self r   Y/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/openai/utils.pyinitialize_chunk_storage   s   z0BaseOpenAIStreamHandler.initialize_chunk_storageNc              	   C   s4   |st | j| j| j| j| jdd | j  d S )Noperation_type )_process_finished_streamintegrationprimary_spanrequest_kwargschunksoptionsgetfinish)r   	exceptionr   r   r   finalize_stream   s   z'BaseOpenAIStreamHandler.finalize_streamr   )__name__
__module____qualname__r   r   r   r   r   r   r      s    r   c                   @       e Zd ZdddZdddZdS )OpenAIStreamHandlerNc                 C   s    |  || t| j|| j d S r   _extract_token_chunk_loop_handlerr   r   r   chunkiteratorr   r   r   process_chunk    s   z!OpenAIStreamHandler.process_chunkc              	   C   st   | j dsdS t|d}|sdS |d }t|ddsdS z| }| jd d| W dS  ttfy9   Y dS w zYAttempt to extract the token chunk (last chunk in the stream) from the streamed response.z_dd.auto_extract_token_chunkNchoicesr   finish_reason)r   _get_ctx_itemgetattr__next__r   insertStopIterationGeneratorExitr   r(   r)   r,   choiceusage_chunkr   r   r   r%   $   s   
z(OpenAIStreamHandler._extract_token_chunkr   r   r    r!   r*   r%   r   r   r   r   r#          
r#   c                   @   r"   )OpenAIAsyncStreamHandlerNc                    s(   |  ||I d H  t| j|| j d S r   r$   r'   r   r   r   r*   9   s   z&OpenAIAsyncStreamHandler.process_chunkc              	      s|   | j ds	dS t|d}|sdS |d }t|ddsdS z| I dH }| jd d| W dS  ttfy=   Y dS w r+   )r   r.   r/   	__anext__r   r1   StopAsyncIterationr3   r4   r   r   r   r%   =   s   
z-OpenAIAsyncStreamHandler._extract_token_chunkr   r7   r   r   r   r   r9   8   r8   r9   c                 C   s   | sdS d| dd  S )z
    Returns `sk-...XXXX`, where XXXX is the last 4 characters of the provided OpenAI API key.
    This mimics how OpenAI UI formats the API key.
    Nzsk-...%sr   )openai_api_keyr   r   r   _format_openai_api_keyN   s   r>   c                 C   4   dd l }t| trdS t|drt| |jrdS dS )Nr   TStreamF)openai
isinstancer   hasattrr@   resprA   r   r   r   _is_generatorY      
rF   c                 C   r?   )Nr   TAsyncStreamF)rA   rB   r   rC   rH   rD   r   r   r   _is_async_generatore   rG   rI   c                 C   s   |  ds-t|dr|jdrt|dd}t|dd}nt|dd}|dur-| d| t|dd}|dur?|d d| t|d	g D ]
}||j | qEt|d
dr`|d d| dS dS )z
    Sets the openai model tag and appends the chunk to the correct index in the streamed_chunks list.
    When handling a streamed chat/completion/responses,
    this function is called for each chunk in the streamed response.
    zopenai.response.modeltypez	response.responseNmodelr   r   r,   usage)	get_tagrC   rJ   
startswithr/   _set_tag_strr1   indexappend)spanr(   streamed_chunksrK   rL   r5   r   r   r   r&   q   s   
r&   r   c                 C   s   t |tr|dkr| }z7|dkr!|r|d r|d d nd }n|dkr-dd |D }n|dkr8dd |D }| j|g |||d W d S  tyU   tjd	d
d Y d S w )NrK   r   
completionc                 S      g | ]}t |qS r   r   .0r5   r   r   r   
<listcomp>       z,_process_finished_stream.<locals>.<listcomp>chatc                 S   rV   r   r
   rW   r   r   r   rY      rZ   )argskwargsrK   	operationz3Error processing streamed completion/chat response.T)exc_info)rB   dictvaluesllmobs_set_tags	Exceptionlogwarning)r   rS   r]   rT   r   formatted_completionsr   r   r   r      s&   
r   N)r   )collectionsr   typingr   r   ddtrace.internal.loggerr   0ddtrace.llmobs._integrations.base_stream_handlerr   r   "ddtrace.llmobs._integrations.utilsr	   r   r   rd   r   r#   r9   r>   rF   rI   r&   r   r   r   r   r   <module>   s"    