o
    i`V                  
   @   s  d Z ddlZddlZddlZddlZddlZddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z"m#Z# zddl$m%Z% ddl&m'Z' W n  e(y Z) ze
*de)  e
*d e+de) dZ)[)ww G dd de#Z,G dd deZ-G dd dZ.G dd dZ/G dd deZ0G dd de Z1G dd de"Z2dS )a  FastAPI WebSocket transport implementation for Pipecat.

This module provides WebSocket-based transport for real-time audio/video streaming
using FastAPI and WebSocket connections. Supports binary and text serialization
with configurable session timeouts and WAV header generation.
    N)	AwaitableCallableOptional)logger)	BaseModel)CancelFrameClientConnectedFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameInterruptionFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameDirection)FrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)	WebSocket)WebSocketStatezException: zTIn order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`.zMissing module: c                   @   sN   e Zd ZU dZdZeed< dZee	 ed< dZ
ee ed< dZee ed< dS )FastAPIWebsocketParamsa  Configuration parameters for FastAPI WebSocket transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for encoding/decoding messages.
        session_timeout: Session timeout in seconds, None for no timeout.
        fixed_audio_packet_size: Optional fixed-size packetization for raw PCM audio payloads.
            Useful when the remote WebSocket media endpoint requires strict audio framing.
    Fadd_wav_headerN
serializersession_timeoutfixed_audio_packet_size)__name__
__module____qualname____doc__r   bool__annotations__r   r   r   r   intr    r&   r&   X/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/transports/websocket/fastapi.pyr   6   s   
 
r   c                   @   sT   e Zd ZU dZeeged f ed< eeged f ed< eeged f ed< dS )FastAPIWebsocketCallbacksa"  Callback functions for WebSocket events.

    Parameters:
        on_client_connected: Called when a client connects to the WebSocket.
        on_client_disconnected: Called when a client disconnects from the WebSocket.
        on_session_timeout: Called when a session timeout occurs.
    Non_client_connectedon_client_disconnectedon_session_timeout)r   r    r!   r"   r   r   r   r$   r&   r&   r&   r'   r(   G   s
   
 r(   c                   @   s8   e Zd ZdZdefddZdd ZdeeB fdd	Z	d
S )_WebSocketMessageIteratorzGAsync iterator for WebSocket messages that yields both binary and text.	websocketc                 C   s
   || _ d S N)
_websocketselfr-   r&   r&   r'   __init__X   s   
z"_WebSocketMessageIterator.__init__c                 C   s   | S r.   r&   r1   r&   r&   r'   	__aiter__[   s   z#_WebSocketMessageIterator.__aiter__returnc                    s^   | j  I d H }|d dkrtd|v r|d d ur|d S d|v r-|d d ur-|d S t)Ntypezwebsocket.disconnectbytestext)r/   receiveStopAsyncIteration)r1   messager&   r&   r'   	__anext__^   s   z#_WebSocketMessageIterator.__anext__N)
r   r    r!   r"   r   r2   r4   r7   strr<   r&   r&   r&   r'   r,   U   s
    r,   c                   @   s   e Zd ZdZdedefddZdefddZd	e	j
eeB  fd
dZdeeB fddZdd Zdd Zdd Zdd Zdd Zed	efddZed	efddZdS )FastAPIWebsocketClientzWebSocket client wrapper for handling connections and message passing.

    Manages WebSocket state, message sending/receiving, and connection lifecycle
    with support for both binary and text message types.
    r-   	callbacksc                 C   s   || _ d| _|| _d| _dS )zInitialize the WebSocket client.

        Args:
            websocket: The FastAPI WebSocket connection.
            callbacks: Event callback functions.
        Fr   N)r/   _closing
_callbacks_leave_counter)r1   r-   r?   r&   r&   r'   r2   p   s   
zFastAPIWebsocketClient.__init___c                    s   |  j d7  _ dS )z]Set up the WebSocket client.

        Args:
            _: The start frame (unused).
           N)rB   )r1   rC   r&   r&   r'   setup|   s   zFastAPIWebsocketClient.setupr5   c                 C   s
   t | jS )zGet an async iterator for receiving WebSocket messages.

        Returns:
            An async iterator yielding bytes or strings.
        )r,   r/   r3   r&   r&   r'   r9      s   
zFastAPIWebsocketClient.receivedatac                    s   z$|   r#t|tr| j|I dH  W dS | j|I dH  W dS W dS  tyl } z;t|  d|j	j
 d| d| jj  | jjtjkrZ| jsatd d| _W Y d}~dS W Y d}~dS W Y d}~dS d}~ww )zySend data through the WebSocket connection.

        Args:
            data: The data to send (string or bytes).
        N exception sending data:  (z), application_state: z'Closing already disconnected websocket!T)	_can_send
isinstancer7   r/   
send_bytes	send_text	Exceptionr   error	__class__r   application_stater   DISCONNECTED
is_closingwarningr@   )r1   rF   er&   r&   r'   send   s&   
 
zFastAPIWebsocketClient.sendc              
      s   |  j d8  _ | j dkrdS | jrA| jsCd| _z| j I dH  W dS  ty@ } zt|  d|  W Y d}~dS d}~ww dS dS )z Disconnect the WebSocket client.rD   r   NTz( exception while closing the websocket: )	rB   is_connectedrR   r@   r/   closerM   r   rN   )r1   rT   r&   r&   r'   
disconnect   s   
"z!FastAPIWebsocketClient.disconnectc                       | j | jI dH  dS )z)Trigger the client disconnected callback.N)rA   r*   r/   r3   r&   r&   r'   trigger_client_disconnected      z2FastAPIWebsocketClient.trigger_client_disconnectedc                    rY   )z&Trigger the client connected callback.N)rA   r)   r/   r3   r&   r&   r'   trigger_client_connected   r[   z/FastAPIWebsocketClient.trigger_client_connectedc                    rY   )z$Trigger the client timeout callback.N)rA   r+   r/   r3   r&   r&   r'   trigger_client_timeout   r[   z-FastAPIWebsocketClient.trigger_client_timeoutc                 C   s   | j o| j S )z0Check if data can be sent through the WebSocket.)rV   rR   r3   r&   r&   r'   rI      s   z FastAPIWebsocketClient._can_sendc                 C   s   | j jtjkS )zCheck if the WebSocket is currently connected.

        Returns:
            True if the WebSocket is in connected state.
        )r/   client_stater   	CONNECTEDr3   r&   r&   r'   rV      s   z#FastAPIWebsocketClient.is_connectedc                 C      | j S )zCheck if the WebSocket is currently closing.

        Returns:
            True if the WebSocket is in the process of closing.
        )r@   r3   r&   r&   r'   rR      s   z!FastAPIWebsocketClient.is_closingN)r   r    r!   r"   r   r(   r2   r   rE   typingAsyncIteratorr7   r=   r9   rU   rX   rZ   r\   r]   rI   propertyr#   rV   rR   r&   r&   r&   r'   r>   i   s    r>   c                       s   e Zd ZdZdededef fddZdef fdd	Z	d
d Z
def fddZdef fddZ fddZdd Zdd Z  ZS )FastAPIWebsocketInputTransportzInput transport for FastAPI WebSocket connections.

    Handles incoming WebSocket messages, deserializes frames, and manages
    connection monitoring with optional session timeouts.
    	transportclientparamsc                    s<   t  j|fi | || _|| _|| _d| _d| _d| _dS )a$  Initialize the WebSocket input transport.

        Args:
            transport: The parent transport instance.
            client: The WebSocket client wrapper.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        NF)superr2   
_transport_client_params_receive_task_monitor_websocket_task_initializedr1   re   rf   rg   kwargsrO   r&   r'   r2      s   
z'FastAPIWebsocketInputTransport.__init__framec                    s   t  |I dH  | jrdS d| _| j|I dH  | jjr)| jj|I dH  | js8| jjr8| 	| 
 | _| j I dH  | t I dH  | jsT| 	|  | _| |I dH  dS )zStart the input transport and begin message processing.

        Args:
            frame: The start frame containing initialization parameters.
        NT)rh   startrn   rj   rE   rk   r   rm   r   create_task_monitor_websocketr\   
push_framer   rl   _receive_messagesset_transport_readyr1   rr   rq   r&   r'   rs      s   z$FastAPIWebsocketInputTransport.startc                    sF   | j r| | j I dH  d| _ | jr!| | jI dH  d| _dS dS )zStop all running tasks.N)rm   cancel_taskrl   r3   r&   r&   r'   _stop_tasks  s   
z*FastAPIWebsocketInputTransport._stop_tasksc                    6   t  |I dH  |  I dH  | j I dH  dS )zStop the input transport and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)rh   stopr{   rj   rX   ry   rq   r&   r'   r}        z#FastAPIWebsocketInputTransport.stopc                    r|   )zCancel the input transport and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)rh   cancelr{   rj   rX   ry   rq   r&   r'   r      r~   z%FastAPIWebsocketInputTransport.cancelc                    &   t   I dH  | j I dH  dS zClean up transport resources.Nrh   cleanupri   r3   rq   r&   r'   r   *     z&FastAPIWebsocketInputTransport.cleanupc                    s   zH| j  2 z?3 dH W }| jjsq| jj|I dH }|sqt|tr-| |I dH  qt|tr>| j	t|j
dI dH  q| |I dH  q6 W n" tyk } zt|  d|jj d| d W Y d}~nd}~ww | j jsz| j  I dH  dS dS )z3Main message receiving loop for WebSocket messages.N)r;   z exception receiving data: rH   ))rj   r9   rk   r   deserializerJ   r   push_audio_framer   broadcast_framer;   rv   rM   r   rN   rO   r   rR   rZ   )r1   r;   rr   rT   r&   r&   r'   rw   /  s*   

,z0FastAPIWebsocketInputTransport._receive_messagesc                    s*   t | jjI dH  | j I dH  dS )zeWait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event.N)asynciosleeprk   r   rj   r]   r3   r&   r&   r'   ru   I  s   z1FastAPIWebsocketInputTransport._monitor_websocket)r   r    r!   r"   r   r>   r   r2   r   rs   r{   r	   r}   r   r   r   rw   ru   __classcell__r&   r&   rq   r'   rd      s     	

rd   c                       s   e Zd ZdZdededef fddZdef fdd	Z	de
f fd
dZdef fddZ fddZdedef fddZdeeB fddZdedefddZdefddZdd Z  ZS )FastAPIWebsocketOutputTransportzOutput transport for FastAPI WebSocket connections.

    Handles outgoing frame serialization, audio streaming with timing simulation,
    and WebSocket message transmission with optional WAV header generation.
    re   rf   rg   c                    sD   t  j|fi | || _|| _|| _d| _d| _t | _d| _	dS )a%  Initialize the WebSocket output transport.

        Args:
            transport: The parent transport instance.
            client: The WebSocket client wrapper.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        r   FN)
rh   r2   ri   rj   rk   _send_interval_next_send_time	bytearray_audio_send_bufferrn   ro   rq   r&   r'   r2   V  s   
z(FastAPIWebsocketOutputTransport.__init__rr   c                    sx   t  |I dH  | jrdS d| _| j|I dH  | jjr)| jj|I dH  | j| j d | _	| 
|I dH  dS )zStart the output transport and initialize timing.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )rh   rs   rn   rj   rE   rk   r   audio_chunk_sizesample_rater   rx   ry   rq   r&   r'   rs   ~  s   z%FastAPIWebsocketOutputTransport.startc                    8   t  |I dH  | |I dH  | j I dH  dS )zStop the output transport and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)rh   r}   _write_framerj   rX   ry   rq   r&   r'   r}        z$FastAPIWebsocketOutputTransport.stopc                    r   )zCancel the output transport and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)rh   r   r   rj   rX   ry   rq   r&   r'   r     r   z&FastAPIWebsocketOutputTransport.cancelc                    r   r   r   r3   rq   r&   r'   r     r   z'FastAPIWebsocketOutputTransport.cleanup	directionc                    sP   t  ||I dH  t|tr&| jjr| j  | |I dH  d| _	dS dS )zProcess outgoing frames with special handling for interruptions.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        Nr   )
rh   process_framerJ   r   rk   r   r   clearr   r   )r1   rr   r   rq   r&   r'   r     s   


z-FastAPIWebsocketOutputTransport.process_framec                    s   |  |I dH  dS )zoSend a transport message frame.

        Args:
            frame: The transport message frame to send.
        N)r   ry   r&   r&   r'   send_message  s   z,FastAPIWebsocketOutputTransport.send_messager5   c              	      s   | j js	| j jsdS t|j| j| jjd}| jjrht	
 B}t|d}|d ||j ||j ||j W d   n1 sGw   Y  t| |j|jd}|}W d   n1 scw   Y  | |I dH  |  I dH  dS )zWrite an audio frame to the WebSocket with timing simulation.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audior   num_channelswbr   N)r   r   T)rj   rR   rV   r   r   r   rk   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer   _write_audio_sleep)r1   rr   bufferwf	wav_framer&   r&   r'   write_audio_frame  s4   	

z1FastAPIWebsocketOutputTransport.write_audio_framec                    s  | j js	| j jsdS | jjsdS zY| jj|I dH }|rh| jj}|r\t|tt	fr\| j
t| t| j
|krYt| j
d| }| j
d|= | j |I dH  t| j
|ks:W dS | j |I dH  W dS W dS  ty } zt|  d|jj d| d W Y d}~dS d}~ww )z1Serialize and send a frame through the WebSocket.NrG   rH   r   )rj   rR   rV   rk   r   	serializer   rJ   r7   r   r   extendlenrU   rM   r   rN   rO   r   )r1   rr   payloadpacket_byteschunkrT   r&   r&   r'   r     s.   .z,FastAPIWebsocketOutputTransport._write_framec                    sZ   t  }td| j| }t|I dH  |dkr#t  | j | _dS |  j| j7  _dS )z7Simulate audio playback timing with appropriate delays.r   N)time	monotonicmaxr   r   r   r   )r1   current_timesleep_durationr&   r&   r'   r   
  s   z2FastAPIWebsocketOutputTransport._write_audio_sleep)r   r    r!   r"   r   r>   r   r2   r   rs   r	   r}   r   r   r   r
   r   r   r   r   r   r   r#   r   r   r   r   r&   r&   rq   r'   r   O  s(    (



'r   c                
       st   e Zd ZdZ		ddededee dee f fddZd	e	fd
dZ
d	efddZdd Zdd Zdd Z  ZS )FastAPIWebsocketTransportax  FastAPI WebSocket transport for real-time audio/video streaming.

    Provides bidirectional WebSocket communication with frame serialization,
    session management, and event handling for client connections and timeouts.

    Event handlers available:

    - on_client_connected(transport, websocket): Client WebSocket connected
    - on_client_disconnected(transport, websocket): Client WebSocket disconnected
    - on_session_timeout(transport, websocket): Session timed out

    Example::

        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, websocket):
            ...
    Nr-   rg   
input_nameoutput_namec                    s   t  j||d || _t| j| j| jd| _t|| j| _	t
| | j	| j| jd| _t| | j	| j| jd| _| d | d | d dS )a4  Initialize the FastAPI WebSocket transport.

        Args:
            websocket: The FastAPI WebSocket connection.
            params: Transport configuration parameters.
            input_name: Optional name for the input processor.
            output_name: Optional name for the output processor.
        )r   r   )r)   r*   r+   )namer)   r*   r+   N)rh   r2   rk   r(   _on_client_connected_on_client_disconnected_on_session_timeoutrA   r>   rj   rd   _input_name_inputr   _output_name_output_register_event_handler)r1   r-   rg   r   r   rq   r&   r'   r2   )  s"   

z"FastAPIWebsocketTransport.__init__r5   c                 C   r`   )zqGet the input transport processor.

        Returns:
            The WebSocket input transport instance.
        )r   r3   r&   r&   r'   inputQ     zFastAPIWebsocketTransport.inputc                 C   r`   )zsGet the output transport processor.

        Returns:
            The WebSocket output transport instance.
        )r   r3   r&   r&   r'   outputY  r   z FastAPIWebsocketTransport.outputc                       |  d|I dH  dS )zHandle client connected event.r)   N_call_event_handlerr0   r&   r&   r'   r   a     z.FastAPIWebsocketTransport._on_client_connectedc                    r   )z!Handle client disconnected event.r*   Nr   r0   r&   r&   r'   r   e  r   z1FastAPIWebsocketTransport._on_client_disconnectedc                    r   )zHandle session timeout event.r+   Nr   r0   r&   r&   r'   r   i  r   z-FastAPIWebsocketTransport._on_session_timeout)NN)r   r    r!   r"   r   r   r   r=   r2   rd   r   r   r   r   r   r   r   r&   r&   rq   r'   r     s$    (r   )3r"   r   r   r   ra   r   r   r   r   logurur   pydanticr   pipecat.frames.framesr   r   r	   r
   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   fastapir   starlette.websocketsr   ModuleNotFoundErrorrT   rN   rM   r   r(   r,   r>   rd   r   r   r&   r&   r&   r'   <module>   sD   4lz H