o
    iJ                  
   @   s~  d Z ddlZddlZddlZddlZddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z"m#Z# zddl$Z$ddl%m&Z' ddl(m)Z) W n  e*y Z+ ze
,de+  e
,d e-de+ dZ+[+ww G dd de#Z.G dd deZ/G dd deZ0G dd de Z1G dd de"Z2dS )zWebSocket server transport implementation for Pipecat.

This module provides WebSocket server transport functionality for real-time
audio and data streaming, including client connection management, session
handling, and frame serialization.
    N)	AwaitableCallableOptional)logger)	BaseModel)CancelFrameClientConnectedFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameInterruptionFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameDirection)FrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)serve)StatezException: zLIn order to use websockets, you need to `pip install pipecat-ai[websocket]`.zMissing module: c                   @   s>   e Zd ZU dZdZeed< dZee	 ed< dZ
ee ed< dS )WebsocketServerParamsa  Configuration parameters for WebSocket server transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for message encoding/decoding.
        session_timeout: Timeout in seconds for client sessions.
    Fadd_wav_headerN
serializersession_timeout)__name__
__module____qualname____doc__r   bool__annotations__r   r   r   r   int r%   r%   W/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/transports/websocket/server.pyr   4   s
   
 r   c                   @   sn   e Zd ZU dZeejged f ed< eejged f ed< eejged f ed< eg ed f ed< dS )WebsocketServerCallbacksa  Callback functions for WebSocket server events.

    Parameters:
        on_client_connected: Called when a client connects to the server.
        on_client_disconnected: Called when a client disconnects from the server.
        on_session_timeout: Called when a client session times out.
        on_websocket_ready: Called when the WebSocket server is ready to accept connections.
    Non_client_connectedon_client_disconnectedon_session_timeouton_websocket_ready)	r   r   r    r!   r   
websocketsWebSocketServerProtocolr   r#   r%   r%   r%   r&   r'   B   s   
 	r'   c                
       s   e Zd ZdZdededededef
 fddZ	d	e
f fd
dZd	ef fddZd	ef fddZ fddZdd ZdejfddZdejdefddZ  ZS )WebsocketServerInputTransportzWebSocket server input transport for receiving client data.

    Handles incoming WebSocket connections, message processing, and client
    session management including timeout monitoring and connection lifecycle.
    	transporthostportparams	callbacksc                    sX   t  j|fi | || _|| _|| _|| _|| _d| _d| _d| _	t
 | _d| _dS )a  Initialize the WebSocket server input transport.

        Args:
            transport: The parent transport instance.
            host: Host address to bind the WebSocket server to.
            port: Port number to bind the WebSocket server to.
            params: WebSocket server configuration parameters.
            callbacks: Callback functions for WebSocket events.
            **kwargs: Additional arguments passed to parent class.
        NF)super__init__
_transport_host_port_params
_callbacks
_websocket_server_task_monitor_taskasyncioEvent_stop_server_event_initialized)selfr/   r0   r1   r2   r3   kwargs	__class__r%   r&   r5   Y   s   

z&WebsocketServerInputTransport.__init__framec                    sj   t  |I dH  | jrdS d| _| jjr | jj|I dH  | js+| |  | _| 	|I dH  dS )zStart the WebSocket server and initialize components.

        Args:
            frame: The start frame containing initialization parameters.
        NT)
r4   startrA   r9   r   setupr<   create_task_server_task_handlerset_transport_readyrB   rF   rD   r%   r&   rG      s   z#WebsocketServerInputTransport.startc                    s\   t  |I dH  | j  | jr| | jI dH  d| _| jr,| jI dH  d| _dS dS )zStop the WebSocket server and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r4   stopr@   setr=   cancel_taskr<   rL   rD   r%   r&   rM      s   

z"WebsocketServerInputTransport.stopc                    sX   t  |I dH  | jr| | jI dH  d| _| jr*| | jI dH  d| _dS dS )zCancel the WebSocket server and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r4   cancelr=   rO   r<   rL   rD   r%   r&   rP      s   
z$WebsocketServerInputTransport.cancelc                    &   t   I dH  | j I dH  dS z'Cleanup resources and parent transport.Nr4   cleanupr6   rB   rD   r%   r&   rT         z%WebsocketServerInputTransport.cleanupc              	      s   t d| j d| j  t| j| j| j4 I dH }| j I dH  | j	 I dH  W d  I dH  dS 1 I dH s=w   Y  dS )z7Handle WebSocket server startup and client connections.zStarting websocket server on :N)
r   infor7   r8   websocket_serve_client_handlerr:   r+   r@   wait)rB   serverr%   r%   r&   rJ      s   .z2WebsocketServerInputTransport._server_task_handler	websocketc                    s  t d|j  | jr| j I dH  t d || _| j|I dH  | js9| j	j
r9| | || j	j
| _zE|2 z?3 dH W }| j	jsGq<| j	j|I dH }|sTq<t|trb| |I dH  q<t|trs| jt|jdI dH  q<| |I dH  q<6 W n" ty } zt |  d|jj d| d W Y d}~nd}~ww | j|I dH  | j I dH  d| _t d|j d	 dS )
z<Handle individual client connections and message processing.zNew client connection from Nz/Only one client connected, using new connection)messagez exception receiving data:  ()zClient z disconnected)r   rX   remote_addressr;   closewarningr:   r(   r=   r9   r   rI   _monitor_websocketr   deserialize
isinstancer   push_audio_framer   broadcast_framer^   
push_frame	ExceptionerrorrE   r   r)   )rB   r]   r^   rF   er%   r%   r&   rZ      s@   


,z-WebsocketServerInputTransport._client_handlerr   c                    sb   zt |I dH  |jtjur| j|I dH  W dS W dS  t jy0   t	d|j
   w )z1Monitor WebSocket connection for session timeout.NzMonitoring task cancelled for: )r>   sleepstater   CLOSEDr:   r*   CancelledErrorr   rX   ra   )rB   r]   r   r%   r%   r&   rd      s   z0WebsocketServerInputTransport._monitor_websocket)r   r   r    r!   r   strr$   r   r'   r5   r   rG   r	   rM   r   rP   rT   rJ   r,   r-   rZ   rd   __classcell__r%   r%   rD   r&   r.   R   s0    '.r.   c                       s   e Zd ZdZdedef fddZdeej	 fddZ
d	ef fd
dZd	ef fddZd	ef fddZ fddZd	edef fddZd	eeB fddZd	edefddZd	efddZdd Z  ZS )WebsocketServerOutputTransportzWebSocket server output transport for sending data to clients.

    Handles outgoing frame serialization, audio streaming with timing control,
    and client connection management for WebSocket communication.
    r/   r2   c                    s<   t  j|fi | || _|| _d| _d| _d| _d| _dS )a  Initialize the WebSocket server output transport.

        Args:
            transport: The parent transport instance.
            params: WebSocket server configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        Nr   F)r4   r5   r6   r9   r;   _send_interval_next_send_timerA   )rB   r/   r2   rC   rD   r%   r&   r5      s   
z'WebsocketServerOutputTransport.__init__r]   c                    s,   | j r| j  I dH  td || _ dS )zSet the active client WebSocket connection.

        Args:
            websocket: The WebSocket connection to set as active, or None to clear.
        Nz-Only one client allowed, using new connection)r;   rb   r   rc   rB   r]   r%   r%   r&   set_client_connection  s
   

z4WebsocketServerOutputTransport.set_client_connectionrF   c                    sf   t  |I dH  | jrdS d| _| jjr | jj|I dH  | j| j d | _| 	|I dH  dS )zStart the output transport and initialize components.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )
r4   rG   rA   r9   r   rH   audio_chunk_sizesample_ratert   rK   rL   rD   r%   r&   rG   #  s   z$WebsocketServerOutputTransport.startc                    (   t  |I dH  | |I dH  dS )zStop the output transport and send final frame.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r4   rM   _write_framerL   rD   r%   r&   rM   5     z#WebsocketServerOutputTransport.stopc                    r{   )zCancel the output transport and send cancellation frame.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r4   rP   r|   rL   rD   r%   r&   rP   >  r}   z%WebsocketServerOutputTransport.cancelc                    rQ   rR   rS   rU   rD   r%   r&   rT   G  rV   z&WebsocketServerOutputTransport.cleanup	directionc                    s>   t  ||I dH  t|tr| |I dH  d| _dS dS )zProcess frames and handle interruption timing.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        Nr   )r4   process_framerf   r   r|   ru   )rB   rF   r~   rD   r%   r&   r   L  s   

z,WebsocketServerOutputTransport.process_framec                    s   |  |I dH  dS )z}Send a transport message frame to the client.

        Args:
            frame: The transport message frame to send.
        N)r|   rL   r%   r%   r&   send_messageY  s   z+WebsocketServerOutputTransport.send_messagereturnc              	      s   | j sdS t|j| j| jjd}| jjrct B}t	
|d}|d ||j ||j ||j W d   n1 sBw   Y  t| |j|jd}|}W d   n1 s^w   Y  | |I dH  |  I dH  dS )zWrite an audio frame to the WebSocket client with timing control.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audiorz   num_channelswbrx   N)rz   r   T)r;   r   r   rz   r9   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer|   _write_audio_sleep)rB   rF   bufferwf	wav_framer%   r%   r&   write_audio_framec  s4   	

z0WebsocketServerOutputTransport.write_audio_framec                    s   | j jsdS z!| j j|I dH }|r#| jr&| j|I dH  W dS W dS W dS  tyK } zt|  d|jj	 d| d W Y d}~dS d}~ww )z3Serialize and send a frame to the WebSocket client.Nz exception sending data: r_   r`   )
r9   r   	serializer;   sendrj   r   rk   rE   r   )rB   rF   payloadrl   r%   r%   r&   r|     s   
.z+WebsocketServerOutputTransport._write_framec                    sZ   t  }td| j| }t|I dH  |dkr#t  | j | _dS |  j| j7  _dS )z>Simulate audio device timing by sleeping between audio chunks.r   N)time	monotonicmaxru   r>   rm   rt   )rB   current_timesleep_durationr%   r%   r&   r     s   z1WebsocketServerOutputTransport._write_audio_sleep)r   r   r    r!   r   r   r5   r   r,   r-   rw   r   rG   r	   rM   r   rP   rT   r
   r   r   r   r   r   r   r"   r   r|   r   rr   r%   r%   rD   r&   rs      s    		

'rs   c                       s   e Zd ZdZ				ddedededee d	ee f
 fd
dZde	fddZ
defddZdd Zdd Zdd Zdd Z  ZS )WebsocketServerTransporta  WebSocket server transport for bidirectional real-time communication.

    Provides a complete WebSocket server implementation with separate input and
    output transports, client connection management, and event handling for
    real-time audio and data streaming applications.

    Event handlers available:

    - on_client_connected(transport, websocket): Client WebSocket connected
    - on_client_disconnected(transport, websocket): Client WebSocket disconnected
    - on_session_timeout(transport, websocket): Session timed out
    - on_websocket_ready(transport): WebSocket server is ready to accept connections

    Example::

        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, websocket):
            ...
    	localhost="  Nr2   r0   r1   
input_nameoutput_namec                    sz   t  j||d || _|| _|| _t| j| j| j| j	d| _
d| _d| _d| _| d | d | d | d dS )a  Initialize the WebSocket server transport.

        Args:
            params: WebSocket server configuration parameters.
            host: Host address to bind the server to. Defaults to "localhost".
            port: Port number to bind the server to. Defaults to 8765.
            input_name: Optional name for the input processor.
            output_name: Optional name for the output processor.
        )r   r   )r(   r)   r*   r+   Nr(   r)   r*   r+   )r4   r5   r7   r8   r9   r'   _on_client_connected_on_client_disconnected_on_session_timeout_on_websocket_readyr:   _input_outputr;   _register_event_handler)rB   r2   r0   r1   r   r   rD   r%   r&   r5     s"   


z!WebsocketServerTransport.__init__r   c                 C   s,   | j st| | j| j| j| j| jd| _ | j S )zGet the input transport for receiving client data.

        Returns:
            The WebSocket server input transport instance.
        name)r   r.   r7   r8   r9   r:   _input_namerU   r%   r%   r&   input  s
   zWebsocketServerTransport.inputc                 C   s    | j st| | j| jd| _ | j S )zGet the output transport for sending data to clients.

        Returns:
            The WebSocket server output transport instance.
        r   )r   rs   r9   _output_namerU   r%   r%   r&   output  s
   
zWebsocketServerTransport.outputc                    s\   | j r'| j |I dH  | d|I dH  | jr%| jt I dH  dS dS td dS )z Handle client connection events.Nr(   <A WebsocketServerTransport output is missing in the pipeline)r   rw   _call_event_handlerr   ri   r   r   rk   rv   r%   r%   r&   r     s   z-WebsocketServerTransport._on_client_connectedc                    s>   | j r| j dI dH  | d|I dH  dS td dS )z#Handle client disconnection events.Nr)   r   )r   rw   r   r   rk   rv   r%   r%   r&   r      s
   z0WebsocketServerTransport._on_client_disconnectedc                    s   |  d|I dH  dS )z%Handle client session timeout events.r*   Nr   rv   r%   r%   r&   r     s   z,WebsocketServerTransport._on_session_timeoutc                    s   |  dI dH  dS )z%Handle WebSocket server ready events.r+   Nr   rU   r%   r%   r&   r     s   z,WebsocketServerTransport._on_websocket_ready)r   r   NN)r   r   r    r!   r   rq   r$   r   r5   r.   r   rs   r   r   r   r   r   rr   r%   r%   rD   r&   r     s.    '
r   )3r!   r>   r   r   r   typingr   r   r   logurur   pydanticr   pipecat.frames.framesr   r   r	   r
   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   r,   websockets.asyncio.serverr   rY   websockets.protocolr   ModuleNotFoundErrorrl   rk   rj   r   r'   r.   rs   r   r%   r%   r%   r&   <module>   s>   4
 & ,