o
    i-)                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	Z
d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZmZmZmZmZmZ d d
lmZ d dlmZ d dlm Z  e e!Z"G dd dZ#dS )    N)AsyncGenerator)
HTTPStatus)uuid4)	WebSocket)WebSocketDisconnect)envs)ErrorResponse	UsageInfo)
ErrorEventInputAudioBufferAppendInputAudioBufferCommitSessionCreatedTranscriptionDeltaTranscriptionDone)OpenAIServingRealtime)VLLMValidationError)init_loggerc                   @   s   e Zd ZdZdedefddZdd Zded	B d
d	e	B fddZ
defddZd
eejd	f fddZdd Zdedejee  fddZdeeB eB fddZd deded	B fddZdd Zd	S )!RealtimeConnectionaB  Manages WebSocket lifecycle and state for realtime transcription.

    This class handles:
    - WebSocket connection lifecycle (accept, receive, send, close)
    - Event routing (session.update, append, commit)
    - Audio buffering via asyncio.Queue
    - Generation task management
    - Error handling and cleanup
    	websocketservingc                 C   sB   || _ dt  | _|| _t | _d | _d| _d| _	t
j| _d S )Nzws-F)r   r   connection_idr   asyncioQueueaudio_queuegeneration_task_is_connected_is_model_validatedr   VLLM_MAX_AUDIO_CLIP_FILESIZE_MB_max_audio_filesize_mb)selfr   r    r    a/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm/entrypoints/openai/realtime/connection.py__init__+   s   
zRealtimeConnection.__init__c              
      sf  | j  I dH  td| j d| _| t I dH  zzO	 | j  I dH }zt	
|}| |I dH  W n5 t	jyI   | ddI dH  Y n# tyk } ztd| | t|dI dH  W Y d}~nd}~ww q ty   td| j d	| _Y n ty } ztd
| W Y d}~nd}~ww W |  I dH  dS W |  I dH  dS |  I dH  w )zMain connection loop.Nz!WebSocket connection accepted: %sTzInvalid JSONinvalid_jsonzError handling event: %sprocessing_errorzWebSocket disconnected: %sFz"Unexpected error in connection: %s)r   acceptloggerdebugr   r   sendr   receive_textjsonloadshandle_eventJSONDecodeError
send_error	Exception	exceptionstrr   cleanup)r   messageeventer    r    r!   handle_connection7   s<   
"

"z$RealtimeConnection.handle_connectionmodelNreturnc                 C   s.   | j |rd S | j jd| ddtjddS )NzThe model `z` does not exist.NotFoundErrorr7   )r3   err_typestatus_codeparam)r   _is_model_supportedcreate_error_responser   	NOT_FOUND)r   r7   r    r    r!   _check_modelS   s   
zRealtimeConnection._check_modelr4   c           	   
      s  | d}|dkrtd| | |d  d| _dS |dkrtdi |}z;t|j}t	j
|t	jdt	jd }t|d	 | jkrPtd
dt|d	 dt|dkrZtd| j| W dS  ty } ztd| | ddI dH  W Y d}~dS d}~ww |dkr| jsd}| |dI dH  tdi |}|jr| jd dS |  I dH  dS | d| dI dH  dS )zRoute events to handlers.

        Supported event types:
        - session.update: Configure model
        - input_audio_buffer.append: Add audio chunk to queue
        - input_audio_buffer.commit: Start transcription generation
        typezsession.updatezSession updated: %sr7   Tzinput_audio_buffer.append)dtypeg      @i   zMaximum file size exceededaudio_filesize_mb)	parametervaluer   zCan't process empty audio.zFailed to decode audio: %szInvalid audio datainvalid_audioNzinput_audio_buffer.commitzWModel not validated. Make sure to validate the model by sending a session.update event.model_not_validatedzUnknown event type: unknown_eventr    )getr&   r'   r@   r   r   base64	b64decodeaudionp
frombufferint16astypefloat32lenr   r   r   
put_nowaitr/   errorr.   r   finalstart_generation)	r   r4   
event_typeappend_eventaudio_bytesaudio_arrayr5   err_msgcommit_eventr    r    r!   r,   ^   sR   


 
zRealtimeConnection.handle_eventc                 C  s(   	 | j  I dH }|du rdS |V  q)z2Generator that yields audio chunks from the queue.TN)r   rI   )r   audio_chunkr    r    r!   audio_stream_generator   s   z)RealtimeConnection.audio_stream_generatorc                    sb   | j dur| j  std dS |  }tjtt   }| j	
||}t| ||| _ dS )z(Start the transcription generation task.Nz/Generation already in progress, ignoring commit)r   doner&   warningr^   r   r   listintr   transcribe_realtimecreate_task_run_generation)r   audio_streaminput_streamstreaming_input_genr    r    r!   rV      s   


z#RealtimeConnection.start_generationrh   rg   c              
      s  d| j  dt  }d}d}d}zddlm}m} |jdd|jdd	}	| jjj	||	|d
}
|
2 zI3 dH W }|j
rtt|j
dkrt|sJ|jrJt|j}|j
d j}||7 }|t|j
d j | t|dI dH  |t|j
d j7 }| jsy nq06 t|||| d}| t||dI dH  | j s| j  | j rW dS W dS  ty } ztd| | t|dI dH  W Y d}~dS d}~ww )a  Run the generation and stream results back to the client.

        This method:
        1. Creates sampling parameters from session config
        2. Passes the streaming input generator to engine.generate()
        3. Streams transcription.delta events as text is generated
        4. Sends final transcription.done event with usage stats
        5. Feeds generated token IDs back to input_stream for next iteration
        6. Cleans up the audio queue
        zrt-- r   )RequestOutputKindSamplingParamsg           T)temperature
max_tokensoutput_kind
skip_clone)promptsampling_params
request_idN)delta)prompt_tokenscompletion_tokenstotal_tokens)textusagezError in generation: %sr$   )r   r   vllm.sampling_paramsrk   rl   from_optionalDELTAr   engine_clientgenerateoutputsrR   prompt_token_idsry   rS   ra   	token_idsr(   r   r   r	   r   r   empty
get_nowaitr/   r&   r0   r.   r1   )r   rh   rg   rt   	full_textprompt_token_ids_lencompletion_tokens_lenrk   rl   rs   
result_genoutputru   rz   r5   r    r    r!   re      sZ   




$z"RealtimeConnection._run_generationc                    s    |  }| j|I dH  dS )zSend event to client.N)model_dump_jsonr   	send_text)r   r4   datar    r    r!   r(     s   zRealtimeConnection.sendr3   codec                    s(   t ||d}| j| I dH  dS )zSend error event to client.)rT   r   N)r
   r   r   r   )r   r3   r   error_eventr    r    r!   r.   	  s   zRealtimeConnection.send_errorc                    s:   | j d | jr| j s| j  td| j dS )zCleanup resources.NzConnection cleanup complete: %s)r   rS   r   r_   cancelr&   r'   r   )r   r    r    r!   r2     s
   
zRealtimeConnection.cleanup)N)__name__
__module____qualname____doc__r   r   r"   r6   r1   r   r@   dictr,   r   rM   ndarrayr^   rV   r   r   ra   rb   re   r   r   r   r(   r.   r2   r    r    r    r!   r       s$    
;
M

r   )$r   rJ   r*   collections.abcr   httpr   uuidr   numpyrM   fastapir   starlette.websocketsr   vllmr   'vllm.entrypoints.openai.engine.protocolr   r	   )vllm.entrypoints.openai.realtime.protocolr
   r   r   r   r   r   (vllm.entrypoints.openai.realtime.servingr   vllm.exceptionsr   vllm.loggerr   r   r&   r   r    r    r    r!   <module>   s"    