o
    >iE                     @   sP  d Z ddlZddlZddlZddlZddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZ ddlmZmZmZmZmZmZmZ ee Z!G dd	 d	e"eZ#e
G d
d dZ$de"de"de"de"fddZ%de"de&de"de"fddZ'de&de&de&de"fddZ(de"dee"ef de"fddZ)d#dee" fdd Z*d!d" Z+dS )$aM  
WebSocket handler for TTS streaming.

Provides bidirectional WebSocket communication for TTS:
- Client sends JSON request with text and parameters
- Server streams audio chunks as binary frames
- Client can send control messages (cancel, ping)

Protocol:
1. Client connects to /v1/tts/ws
2. Client sends JSON: {"text": "...", "speaker": "...", ...}
3. Server sends binary audio chunks
4. Server sends JSON: {"event": "complete", "metrics": {...}}
5. Connection closes

Control messages:
- {"event": "cancel"} - Stop generation
- {"event": "ping"} - Keep-alive (server responds with pong)
    N)OptionalDictAny)	dataclass)Enum)
get_loggerset_request_contextclear_request_contextlog_request_receivedlog_first_audio_emittedlog_request_completedlog_request_failed)record_request_receivedrecord_request_completedrecord_request_failedrecord_ttfb
record_rtfrecord_audio_durationrecord_chunks_sentc                   @   s4   e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
ZdS )WSMessageTypezWebSocket message types.requestcancelpingaudio_chunkheaderprogresscompleteerrorpongN)__name__
__module____qualname____doc__REQUESTCANCELPINGAUDIO_CHUNKHEADERPROGRESSCOMPLETEERRORPONG r,   r,   $veena3modal/api/websocket_handler.pyr   3   s    r   c                   @   s   e Zd ZU dZeed< dZeed< dZeed< dZ	e
ed< d	Zeed
< dZe
ed< dZeed< dZee
 ed< dZeed< dZeed< dZeed< dZe
ed< edeeef dd fddZdee fddZdS )	WSRequestzParsed WebSocket TTS request.textmale1speaker皙?temperature2   top_k      ?top_p   
max_tokens?repetition_penaltyNseedFchunkingT	normalizewavformat>  sample_ratedatareturnc                 C   s~   | | dd| dd| dd| dd| d	d
| dd| dd| d| dd| dd| dd| dddS )zParse request from JSON dict.r/    r1   r0   r3   r2   r5   r4   r7   r6   r9   r8   r;   r:   r<   r=   Fr>   Tr@   r?   rB   rA   )r/   r1   r3   r5   r7   r9   r;   r<   r=   r>   r@   rB   )get)clsrC   r,   r,   r-   	from_dictS   s   










zWSRequest.from_dictc                 C   sP   | j r| j  s
dS t| j dkrdt| j  dS | jdkr&d| j dS dS )	z2Validate request, return error message if invalid.z$Text is required and cannot be emptyiP  zText too long: z chars (max 50000)r?   z2WebSocket streaming only supports WAV format (got )N)r/   striplenr@   )selfr,   r,   r-   validatee   s   
zWSRequest.validate)r   r    r!   r"   str__annotations__r1   r3   floatr5   intr7   r9   r;   r<   r   r=   boolr>   r@   rB   classmethodr   r   rH   rM   r,   r,   r,   r-   r.   C   s"   
 r.   codemessage
request_idrD   c                 C   s   t tjj| ||ddS )zCreate JSON error message.rT   rU   rV   )eventr   )jsondumpsr   r*   valuerW   r,   r,   r-   create_error_messagep   s   r\   rB   model_versionc                 C   s   t tjj| |d|dS )z/Create JSON header message (sent before audio).r?   )rX   rV   rB   r@   r]   )rY   rZ   r   r'   r[   rV   rB   r]   r,   r,   r-   create_header_message|   s   r_   chunks_sent
bytes_sent
elapsed_msc                 C   s   t tjj| ||dS )zCreate JSON progress message.)rX   r`   ra   rb   )rY   rZ   r   r(   r[   r`   ra   rb   r,   r,   r-   create_progress_message   s   rd   metricsc                 C   s   t tjj| |dS )zCreate JSON completion message.)rX   rV   re   )rY   rZ   r   r)   r[   rV   re   r,   r,   r-   create_complete_message   s
   rg   api_keyc           *         s  ddl m}m} ddlm} ddlm} ddlm} ddl	m
} tt td  I dH  td	  d
 d}zzW| r|r| }	||}
|	|
}|jst|jp_d|jpcddI dH  jddI dH  W W || t  dS | r| }||
\}}}|stddt|d  ddI dH  jddI dH  W W || t  dS ztj ! ddI dH }t"#|}W nb tj$y   tdddI dH  jddI dH  Y W W || t  dS  t"j%y0 } z*tdd| dI dH  jddI dH  W Y d}~W W || t  dS d}~ww |&dd}|dkrUt"'ddiI dH  ! I dH }t"#|}t()|}| }|rtd |dI dH  jddI dH  W W || t  dS ||j*}t+t,|j-|d!d"d# t.|d!d"d$ |/ std%d&dI dH  jd'dI dH  W W || t  dS |0 }|r|j1nd(}z	|2 I dH }W nF |j3y } z8t4d)d*t|d+ t5d)d*|d, td*d-dI dH  jd.dI dH  W Y d}~W W || t  dS d}~ww t6|j7|d/I dH  t88 }d}d}d}i } fd0d1}t9| } z|j-}!|j:rezdd2l;m<}" |"|!}!W n
 t=yd   Y nw |j>|!||j?|j@|jA|jB|jC|jD|jE|d
d32 zi3 dH W \}#}$ rtd4   nV|$}|d7 }|t,|#7 }|du rt88 }t|| d }%tF|%t,|#d5 tG|%d |d!d6 H|#I dH  |d7 dkrtI||tt88 | d d8I dH  q|6 W | J  z| I dH  W n tjKy   Y nw || n| J  z| I dH  W n tjKy   Y nw || w t88 | }&|&d9d}'|'dkr7|&|' nd}(||tL|'d:tL|&d:|rOt|| d ndtL|(d; d<})tM|)d=I dH   stNd>t|&d |'|(|d? tOd>|&|d!d@ tP|(|dA tQ|'|dB tR||dC jddI dH  W nP tSy } zCtTdD|  t4dEdFt|d+ t5dEdFdG ztdHt|dI dH  jd'dI dH  W n
 tSy   Y nw W Y d}~nd}~ww W || t  dS W || t  dS || t  w )Iz
    Handle WebSocket TTS streaming connection.
    
    This is the main handler called from FastAPI WebSocket route.
    
    Args:
        websocket: FastAPI WebSocket instance
        api_key: Optional API key for authentication
    r   )get_api_validatorhash_api_key)get_rate_limiter)FeatureFlags)resolve_speaker_name)tts_runtime)rV   NzWebSocket connection accepted: FAUTH_FAILEDzAuthentication failedrW   i  )rT   RATE_LIMIT_EXCEEDEDz!Rate limit exceeded. Retry after    z	 seconds.g      >@timeoutTIMEOUTz%No request received within 30 secondsi  INVALID_JSONzInvalid JSON: i  rX   r   r   r   VALIDATION_ERRORTr?   )rV   text_lengthr1   streamr@   )r1   rx   r@   MODEL_NOT_LOADEDzTTS model not initializedi  unknowni  STREAMING_OVERLOADED)rV   status_code
error_codeerror_message)r|   r}   r1   z,Streaming capacity exhausted. Retry shortly.i  r^   c                     s   z[ sZz<t j ddI d H } t| }|ddkr+d td  W W d S |ddkr?t	ddiI d H  W n t j
yJ   Y q tyT   Y W d S w  rW d S W d S  tyf   Y d S w )	Ng?rr   rX   r   TzClient requested cancel: r   r   )asynciowait_forreceive_textrY   loadsrF   loggerinfo	send_textrZ   TimeoutError	Exception)msgrC   	cancelledrV   	websocketr,   r-   listen_for_cancelI  s2   
z/handle_websocket_tts.<locals>.listen_for_cancel)normalize_text)r/   r1   r3   r5   r7   r9   r;   r<   enable_chunkingadmission_leaserelease_admission_leasezGeneration cancelled: )rV   ttfb_mschunk_size_bytes)ttfb_secondsr1   rx   
   rc   audio_duration_seconds      )rV   r`   total_bytesr   total_time_secondsr   rtfr   rf      )rV   r|   total_duration_msr   r   r`   )r|   duration_secondsr1   rx   )r   r1   )r   r1   )chunksr1   zWebSocket error: i  WEBSOCKET_ERROR)r|   r}   INTERNAL_ERROR)Uveena3modal.api.authri   rj   veena3modal.api.rate_limiterrk   veena3modal.api.error_handlersrl   veena3modal.api.schemasrm   veena3modal.servicesrn   rN   uuiduuid4r   acceptr   r   is_auth_enabledrM   is_validr   r\   r}   r~   closerelease_streaming_slotr	   is_rate_limiting_enabledcheckrQ   r   r   r   rY   r   r   JSONDecodeErrorrF   rZ   r.   rH   r1   r
   rK   r/   metrics_request_receivedis_initializedget_runtimer]   acquire_streaming_slotStreamingAdmissionErrorr   metrics_request_failedr_   rB   timecreate_taskr>   &veena3modal.processing.text_normalizerr   ImportErrorgenerate_speech_streamingr3   r5   r7   r9   r;   r<   r=   r   r   
send_bytesrd   r   CancelledErrorroundrg   r   metrics_request_completedr   r   r   r   	exception)*r   rh   ri   rj   rk   rl   rm   rn   r   	validatorkey_hashauth_resultrate_limiterallowed	remainingreset_afterraw_messagerU   e
event_typereqvalidation_errorr1   runtimer]   exc
start_timer`   r   first_chunk_timefinal_metricsr   cancel_taskr/   r   r   re   r   
total_timeaudio_durationr   completion_metricsr,   r   r-   handle_websocket_tts   sB  


  

    

   
y
  
q
 



 
]
 

 
F
  
-
 
,





r   c                 C   sJ   ddl m}m} | d|dddfd|dtt fdd	}td
 dS )z
    Add WebSocket routes to FastAPI app.
    
    Call this from create_app() to enable WebSocket support.
    
    Args:
        app: FastAPI application instance
    r   )	WebSocketQueryz
/v1/tts/wsNrh   )aliasr   c                    s:   | j dd}|dr|dd }t| |I dH  dS )a  
        WebSocket endpoint for TTS streaming.
        
        Connect with optional api_key query parameter:
        ws://host/v1/tts/ws?api_key=your-key
        
        Then send JSON request:
        {"text": "Hello world", "speaker": "male1"}
        
        Receive:
        - JSON header message
        - Binary audio chunks
        - JSON progress messages (every 10 chunks)
        - JSON completion message
        authorizationrE   zBearer    N)headersrF   
startswithr   )r   rh   auth_headerr,   r,   r-   websocket_tts  s
   
z+add_websocket_routes.<locals>.websocket_ttsz%WebSocket route /v1/tts/ws registered)fastapir   r   r   r   rN   r   r   )appr   r   r   r,   r,   r-   add_websocket_routes  s   	
r   )N),r"   r   rY   r   r   typingr   r   r   dataclassesr   enumr   veena3modal.shared.loggingr   r   r	   r
   r   r   r   veena3modal.shared.metricsr   r   r   r   r   r   r   r   r   r   r   r   rN   r   r.   r\   rQ   r_   rd   rg   r   r   r,   r,   r,   r-   <module>   sX    $$	
,



  E