o
    i*H                     @   sJ  d Z ddlZddlZddlZddlZddlmZmZmZ ddl	Z	ddl
mZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' ddl(m)Z) G dd de'Z*G dd deZ+G dd dZ,G dd de"Z-G dd de$Z.G dd de&Z/dS )a  WebSocket client transport implementation for Pipecat.

This module provides a WebSocket client transport that enables bidirectional
communication over WebSocket connections, with support for audio streaming,
frame serialization, and connection management.
    N)	AwaitableCallableOptional)logger)	BaseModel)connect)	CancelFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameProcessorSetup)FrameSerializer)ProtobufFrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)BaseTaskManagerc                   @   sF   e Zd ZU dZdZeed< dZee	e
e
f  ed< dZee ed< dS )WebsocketClientParamszConfiguration parameters for WebSocket client transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for encoding/decoding messages.
    Tadd_wav_headerNadditional_headers
serializer)__name__
__module____qualname____doc__r   bool__annotations__r   r   dictstrr   r    r%   r%   W/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/transports/websocket/client.pyr   -   s
   
 r   c                   @   s^   e Zd ZU dZeejged f ed< eejged f ed< eejej	ged f ed< dS )WebsocketClientCallbacksa  Callback functions for WebSocket client events.

    Parameters:
        on_connected: Called when WebSocket connection is established.
        on_disconnected: Called when WebSocket connection is closed.
        on_message: Called when a message is received from the WebSocket.
    Non_connectedon_disconnected
on_message)
r   r   r   r    r   
websocketsWebSocketClientProtocolr   r"   Datar%   r%   r%   r&   r'   :   s
   
  r'   c                   @   s   e Zd ZdZdedededefddZede	fd	d
Z
de	fddZdd Zdd ZdejdefddZedefddZedefddZdd Zdd ZdS )WebsocketClientSessionzManages a WebSocket client connection session.

    Handles connection lifecycle, message sending/receiving, and provides
    callback mechanisms for connection events.
    uriparams	callbackstransport_namec                 C   s.   || _ || _|| _|| _d| _d| _d| _dS )a4  Initialize the WebSocket client session.

        Args:
            uri: The WebSocket URI to connect to.
            params: Configuration parameters for the session.
            callbacks: Callback functions for session events.
            transport_name: Name of the parent transport for logging.
        r   N)_uri_params
_callbacks_transport_name_leave_counter_task_manager
_websocket)selfr/   r0   r1   r2   r%   r%   r&   __init__O   s   
zWebsocketClientSession.__init__returnc                 C   s   | j st| j d| j S )zGet the task manager for this session.

        Returns:
            The task manager instance.

        Raises:
            Exception: If task manager is not initialized.
        zM::WebsocketClientSession: TaskManager not initialized (pipeline not started?))r8   	Exceptionr6   r:   r%   r%   r&   task_managerg   s
   

z#WebsocketClientSession.task_managerr?   c                    s$   |  j d7  _ | js|| _dS dS )zSet up the session with a task manager.

        Args:
            task_manager: The task manager to use for session tasks.
           N)r7   r8   )r:   r?   r%   r%   r&   setupw   s
   
zWebsocketClientSession.setupc                    s   | j rdS z)t| jd| jjdI dH | _ | j|  | j d| _	| j
| j I dH  W dS  tyB   td| j  Y dS w )z Connect to the WebSocket server.N
   )r/   open_timeoutr   z.::WebsocketClientSession::_client_task_handlerzTimeout connecting to )r9   websocket_connectr3   r4   r   r?   create_task_client_task_handlerr6   _client_taskr5   r(   TimeoutErrorr   errorr>   r%   r%   r&   r      s"   
zWebsocketClientSession.connectc                    sR   |  j d8  _ | jr| j dkrdS | j| jI dH  | j I dH  d| _dS )z%Disconnect from the WebSocket server.r@   r   N)r7   r9   r?   cancel_taskrG   closer>   r%   r%   r&   
disconnect   s   
z!WebsocketClientSession.disconnectmessagec                    s   d}z;z| j r| j |I dH  d}W W |S W W |S  ty> } zt|  d|jj d| d W Y d}~W |S d}~ww |     Y S )zwSend a message through the WebSocket connection.

        Args:
            message: The message data to send.
        FNTz exception sending data:  ())r9   sendr=   r   rI   	__class__r   )r:   rM   resulter%   r%   r&   rP      s   ,zWebsocketClientSession.sendc                 C      | j r| j jtjjkS dS )zCheck if the WebSocket is currently connected.

        Returns:
            True if the WebSocket is in connected state.
        F)r9   stater+   StateOPENr>   r%   r%   r&   is_connected      z#WebsocketClientSession.is_connectedc                 C   rT   )zCheck if the WebSocket is currently closing.

        Returns:
            True if the WebSocket is in the process of closing.
        F)r9   rU   r+   rV   CLOSINGr>   r%   r%   r&   
is_closing   rY   z!WebsocketClientSession.is_closingc                    s   z| j 2 z3 dH W }| j| j |I dH  q6 W n" ty; } zt|  d|jj d| d W Y d}~nd}~ww | j| j I dH  dS )z7Handle incoming messages from the WebSocket connection.Nz exception receiving data: rN   rO   )	r9   r5   r*   r=   r   rI   rQ   r   r)   )r:   rM   rS   r%   r%   r&   rF      s   ,z+WebsocketClientSession._client_task_handlerc                 C   s   | j  dS )z6String representation of the WebSocket client session.z::WebsocketClientSession)r6   r>   r%   r%   r&   __str__   s   zWebsocketClientSession.__str__N)r   r   r   r    r$   r   r'   r;   propertyr   r?   rA   r   rL   r+   r-   r!   rP   rX   r[   rF   r\   r%   r%   r%   r&   r.   H   s.    

r.   c                       s   e Zd ZdZdededef fddZdef fdd	Z	d
e
f fddZd
ef fddZd
ef fddZ fddZdd Z  ZS )WebsocketClientInputTransportzWebSocket client input transport for receiving frames.

    Handles incoming WebSocket messages, deserializes them to frames,
    and pushes them downstream in the processing pipeline.
    	transportsessionr0   c                    s(   t  | || _|| _|| _d| _dS )a  Initialize the WebSocket client input transport.

        Args:
            transport: The parent transport instance.
            session: The WebSocket session to use for communication.
            params: Configuration parameters for the transport.
        FN)superr;   
_transport_sessionr4   _initializedr:   r_   r`   r0   rQ   r%   r&   r;      s
   
z&WebsocketClientInputTransport.__init__rA   c                    ,   t  |I dH  | j|jI dH  dS )zSet up the input transport with the frame processor setup.

        Args:
            setup: The frame processor setup configuration.
        Nra   rA   rc   r?   r:   rA   rf   r%   r&   rA         z#WebsocketClientInputTransport.setupframec                    sd   t  |I dH  | jrdS d| _| jjr | jj|I dH  | j I dH  | |I dH  dS )zStart the input transport and initialize the WebSocket connection.

        Args:
            frame: The start frame containing initialization parameters.
        NT)	ra   startrd   r4   r   rA   rc   r   set_transport_readyr:   rk   rf   r%   r&   rl      s   z#WebsocketClientInputTransport.startc                    (   t  |I dH  | j I dH  dS )zStop the input transport and disconnect from WebSocket.

        Args:
            frame: The end frame signaling transport shutdown.
        Nra   stoprc   rL   rn   rf   r%   r&   rq   	     z"WebsocketClientInputTransport.stopc                    ro   )zCancel the input transport and disconnect from WebSocket.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nra   cancelrc   rL   rn   rf   r%   r&   rt     rr   z$WebsocketClientInputTransport.cancelc                    &   t   I dH  | j I dH  dS )z'Clean up the input transport resources.Nra   cleanuprb   r>   rf   r%   r&   rw        z%WebsocketClientInputTransport.cleanupc                    s   | j jsdS | j j|I dH }|sdS t|tr(| j jr(| |I dH  dS t|tr:| jt|j	dI dH  dS | 
|I dH  dS )zHandle incoming WebSocket messages.

        Args:
            websocket: The WebSocket connection that received the message.
            message: The received message data.
        N)rM   )r4   r   deserialize
isinstancer   audio_in_enabledpush_audio_framer   broadcast_framerM   
push_frame)r:   	websocketrM   rk   r%   r%   r&   r*      s   
z(WebsocketClientInputTransport.on_message)r   r   r   r    r   r.   r   r;   r   rA   r   rl   r	   rq   r   rt   rw   r*   __classcell__r%   r%   rf   r&   r^      s    			r^   c                       s   e Zd ZdZdededef fddZdef fdd	Z	d
e
f fddZd
ef fddZd
ef fddZ fddZd
eeB fddZd
edefddZd
efddZdd Z  ZS )WebsocketClientOutputTransportzWebSocket client output transport for sending frames.

    Handles outgoing frames, serializes them for WebSocket transmission,
    and manages audio streaming with proper timing simulation.
    r_   r`   r0   c                    s4   t  | || _|| _|| _d| _d| _d| _dS )a  Initialize the WebSocket client output transport.

        Args:
            transport: The parent transport instance.
            session: The WebSocket session to use for communication.
            params: Configuration parameters for the transport.
        r   FN)ra   r;   rb   rc   r4   _send_interval_next_send_timerd   re   rf   r%   r&   r;   ;  s   
z'WebsocketClientOutputTransport.__init__rA   c                    rg   )zSet up the output transport with the frame processor setup.

        Args:
            setup: The frame processor setup configuration.
        Nrh   ri   rf   r%   r&   rA   Y  rj   z$WebsocketClientOutputTransport.setuprk   c                    sv   t  |I dH  | jrdS d| _| j| j d | _| jjr)| jj|I dH  | j	
 I dH  | |I dH  dS )zStart the output transport and initialize the WebSocket connection.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )ra   rl   rd   audio_chunk_sizesample_rater   r4   r   rA   rc   r   rm   rn   rf   r%   r&   rl   b  s   z$WebsocketClientOutputTransport.startc                    ro   )zStop the output transport and disconnect from WebSocket.

        Args:
            frame: The end frame signaling transport shutdown.
        Nrp   rn   rf   r%   r&   rq   u  rr   z#WebsocketClientOutputTransport.stopc                    ro   )zCancel the output transport and disconnect from WebSocket.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nrs   rn   rf   r%   r&   rt   ~  rr   z%WebsocketClientOutputTransport.cancelc                    ru   )z(Clean up the output transport resources.Nrv   r>   rf   r%   r&   rw     rx   z&WebsocketClientOutputTransport.cleanupc                    s   |  |I dH  dS )zSend a transport message through the WebSocket.

        Args:
            frame: The transport message frame to send.
        N)_write_framern   r%   r%   r&   send_message  s   z+WebsocketClientOutputTransport.send_messager<   c              	      s   | j js	| j jsdS t|j| j| jjd}| jjrht	
 B}t|d}|d ||j ||j ||j W d   n1 sGw   Y  t| |j|jd}|}W d   n1 scw   Y  | |I dH  |  I dH  dS )zWrite an audio frame to the WebSocket with optional WAV header.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audior   num_channelswbr   N)r   r   T)rc   r[   rX   r   r   r   r4   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer   _write_audio_sleep)r:   rk   bufferwf	wav_framer%   r%   r&   write_audio_frame  s4   	

z0WebsocketClientOutputTransport.write_audio_framec                    sT   | j js	| j jsdS | jjsdS | jj|I dH }|r(| j |I dH  dS dS )z3Write a frame to the WebSocket after serialization.N)rc   r[   rX   r4   r   	serializerP   )r:   rk   payloadr%   r%   r&   r     s   z+WebsocketClientOutputTransport._write_framec                    sZ   t  }td| j| }t|I dH  |dkr#t  | j | _dS |  j| j7  _dS )z1Simulate audio playback timing with sleep delays.r   N)time	monotonicmaxr   asynciosleepr   )r:   current_timesleep_durationr%   r%   r&   r     s   z1WebsocketClientOutputTransport._write_audio_sleep)r   r   r   r    r   r.   r   r;   r   rA   r   rl   r	   rq   r   rt   rw   r   r   r   r   r!   r   r
   r   r   r   r%   r%   rf   r&   r   4  s(    			

'r   c                       sf   e Zd ZdZ	ddedee f fddZdefdd	Z	de
fd
dZdd Zdd Zdd Z  ZS )WebsocketClientTransporta  WebSocket client transport for bidirectional communication.

    Provides a complete WebSocket client transport implementation with
    input and output capabilities, connection management, and event handling.

    Event handlers available:

    - on_connected(transport): Connected to WebSocket server
    - on_disconnected(transport): Disconnected from WebSocket server

    Example::

        @transport.event_handler("on_connected")
        async def on_connected(transport):
            ...
    Nr/   r0   c                    st   t    |p	t | _| jjpt | j_t| j| j| j	d}t
|| j|| j| _d| _d| _| d | d dS )zInitialize the WebSocket client transport.

        Args:
            uri: The WebSocket URI to connect to.
            params: Optional configuration parameters for the transport.
        )r(   r)   r*   Nr(   r)   )ra   r;   r   r4   r   r   r'   _on_connected_on_disconnected_on_messager.   namerc   _input_output_register_event_handler)r:   r/   r0   r1   rf   r%   r&   r;     s   

z!WebsocketClientTransport.__init__r<   c                 C      | j st| | j| j| _ | j S )zGet the input transport for receiving frames.

        Returns:
            The WebSocket client input transport instance.
        )r   r^   rc   r4   r>   r%   r%   r&   input     zWebsocketClientTransport.inputc                 C   r   )zGet the output transport for sending frames.

        Returns:
            The WebSocket client output transport instance.
        )r   r   rc   r4   r>   r%   r%   r&   output  r   zWebsocketClientTransport.outputc                       |  d|I dH  dS )z.Handle WebSocket connection established event.r(   N_call_event_handlerr:   r   r%   r%   r&   r        z&WebsocketClientTransport._on_connectedc                    r   )z)Handle WebSocket connection closed event.r)   Nr   r   r%   r%   r&   r     r   z)WebsocketClientTransport._on_disconnectedc                    s$   | j r| j ||I dH  dS dS )z"Handle incoming WebSocket message.N)r   r*   )r:   r   rM   r%   r%   r&   r   "  s   z$WebsocketClientTransport._on_message)N)r   r   r   r    r$   r   r   r;   r^   r   r   r   r   r   r   r   r%   r%   rf   r&   r     s    

r   )0r    r   r   r   r   typingr   r   r   r+   logurur   pydantic.mainr   websockets.asyncio.clientr   rD   pipecat.frames.framesr   r	   r
   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.serializers.protobufr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   "pipecat.utils.asyncio.task_managerr   r   r'   r.   r^   r   r   r%   r%   r%   r&   <module>   s4   , 
c "