o
    i                     @   s2  d Z ddlZddlZddlmZ ddlmZmZmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZmZmZmZmZmZmZmZmZmZm Z m!Z! dd
l"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z*m+Z+ ddl,m-Z- zddl.m/Z/ ddl0m1Z2 ddl3m4Z4m5Z5m6Z6 W n  e7y Z8 ze9de8  e9d e:de8 dZ8[8ww ddddddddddddd Z;eG d!d" d"eZ<eG d#d$ d$eZ=eG d%d& d&e<Z>eG d'd( d(e=Z?G d)d* d*e+Z@G d+d, d,eZAG d-d. d.ZBG d/d0 d0e&ZCG d1d2 d2e(ZDG d3d4 d4e*ZEdS )5zLiveKit transport implementation for Pipecat.

This module provides comprehensive LiveKit real-time communication integration
including audio streaming, data messaging, participant management, and room
event handling for conversational AI applications.
    N)	dataclass)Any	AwaitableCallableListOptional)logger)	BaseModel)create_stream_resampler)VADAnalyzer)AudioRawFrameBotConnectedFrameCancelFrameClientConnectedFrameEndFrameImageRawFrameOutputAudioRawFrameOutputDTMFFrameOutputDTMFUrgentFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrameUserAudioRawFrameUserImageRawFrame)FrameDirectionFrameProcessorSetup)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)BaseTaskManager)rtc)video_frame_pb2)retrystop_after_attemptwait_exponentialzException: zGIn order to use LiveKit, you need to `pip install pipecat-ai[livekit]`.zMissing module:                         	   
      )0123456789*#c                   @   "   e Zd ZU dZdZee ed< dS )"LiveKitOutputTransportMessageFrameFrame for transport messages in LiveKit rooms.

    Parameters:
        participant_id: Optional ID of the participant this message is for/from.
    Nparticipant_id__name__
__module____qualname____doc__r@   r   str__annotations__ rH   rH   X/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/transports/livekit/transport.pyr>   H      
 r>   c                   @   r=   )(LiveKitOutputTransportMessageUrgentFrameFrame for urgent transport messages in LiveKit rooms.

    Parameters:
        participant_id: Optional ID of the participant this message is for/from.
    Nr@   rA   rH   rH   rH   rI   rK   S   rJ   rK   c                           e Zd ZdZ fddZ  ZS )LiveKitTransportMessageFramer?   c                    Z   t    dd l}|  |d |jdtdd W d    d S 1 s&w   Y  d S )Nr   alwayszLiveKitTransportMessageFrame is deprecated and will be removed in a future version. Instead, use LiveKitOutputTransportMessageFrame.r'   
stacklevelsuper__post_init__warningscatch_warningssimplefilterwarnDeprecationWarningselfrV   	__class__rH   rI   rU   f      


"z*LiveKitTransportMessageFrame.__post_init__rB   rC   rD   rE   rU   __classcell__rH   rH   r]   rI   rN   ^       rN   c                       rM   )"LiveKitTransportMessageUrgentFramerL   c                    rO   )Nr   rP   zLiveKitTransportMessageUrgentFrame is deprecated and will be removed in a future version. Instead, use LiveKitOutputTransportMessageUrgentFrame.r'   rQ   rS   r[   r]   rH   rI   rU   |   r_   z0LiveKitTransportMessageUrgentFrame.__post_init__r`   rH   rH   r]   rI   rc   t   rb   rc   c                   @   s   e Zd ZdZdS )LiveKitParamszConfiguration parameters for LiveKit transport.

    Inherits all parameters from TransportParams without additional configuration.
    N)rB   rC   rD   rE   rH   rH   rH   rI   rd      s    rd   c                   @   s   e Zd ZU dZeg ed f ed< eg ed f ed< eg ed f ed< eeged f ed< eeged f ed< eeged f ed< eeged f ed	< eeged f ed
< eeged f ed< eeeged f ed< eeged f ed< dS )LiveKitCallbacksa  Callback handlers for LiveKit events.

    Parameters:
        on_connected: Called when connected to the LiveKit room.
        on_disconnected: Called when disconnected from the LiveKit room.
        on_participant_connected: Called when a participant joins the room.
        on_participant_disconnected: Called when a participant leaves the room.
        on_audio_track_subscribed: Called when an audio track is subscribed.
        on_audio_track_unsubscribed: Called when an audio track is unsubscribed.
        on_data_received: Called when data is received from a participant.
        on_first_participant_joined: Called when the first participant joins.
    Non_connectedon_disconnectedon_before_disconnecton_participant_connectedon_participant_disconnectedon_audio_track_subscribedon_audio_track_unsubscribedon_video_track_subscribedon_video_track_unsubscribedon_data_receivedon_first_participant_joined)	rB   rC   rD   rE   r   r   rG   rF   bytesrH   rH   rH   rI   re      s   
 re   c                   @   s`  e Zd ZdZdedededededefdd	Zed
efddZ	ed
e
jfddZdefddZdd ZdefddZeededdddddd Zdd  Zdbd"ed#ee fd$d%Zd&efd'd(Zd)e
jd
efd*d+Zd
ee fd,d-Zd#ed
e fd.d/Z!d0efd1d2Z"d#efd3d4Z#d#efd5d6Z$d7e
j%fd8d9Z&d7e
j%fd:d;Z'd<e
j(d=e
j)d7e
j%fd>d?Z*d<e
j(d=e
j)d7e
j%fd@dAZ+d"e
j,fdBdCZ-dDdE Z.dFdG Z/d7e
j%fdHdIZ0d7e
j%fdJdKZ1d<e
j(d=e
j)d7e
j%fdLdMZ2d<e
j(d=e
j)d7e
j%fdNdOZ3d"e
j,fdPdQZ4dRdS Z5dbdTdUZ6dVe
j7d#efdWdXZ8dYdZ Z9d[e
j:d#efd\d]Z;d^d_ Z<d`da Z=d!S )cLiveKitTransportClientzCore client for interacting with LiveKit rooms.

    Manages the connection to LiveKit rooms and handles all low-level API interactions
    including room management, audio streaming, data messaging, and event handling.
    urltoken	room_nameparams	callbackstransport_namec                 C   s   || _ || _|| _|| _|| _|| _d| _d| _d| _d| _	d| _
d| _i | _t | _i | _t | _d| _d| _t | _dS )a  Initialize the LiveKit transport client.

        Args:
            url: LiveKit server URL to connect to.
            token: Authentication token for the room.
            room_name: Name of the LiveKit room to join.
            params: Configuration parameters for the transport.
            callbacks: Event callback handlers.
            transport_name: Name identifier for the transport.
        N Fr   )_url_token
_room_name_params
_callbacks_transport_name_room_participant_id
_connected_disconnect_counter_audio_source_audio_track_audio_tracksasyncioQueue_audio_queue_video_tracks_video_queue_other_participant_has_joined_task_managerLock_async_lock)r\   rs   rt   ru   rv   rw   rx   rH   rH   rI   __init__   s&   

zLiveKitTransportClient.__init__returnc                 C      | j S )zvGet the participant ID for this client.

        Returns:
            The participant ID assigned by LiveKit.
        )r   r\   rH   rH   rI   r@         z%LiveKitTransportClient.participant_idc                 C   s   | j s
t|  d| j S )zGet the LiveKit room instance.

        Returns:
            The LiveKit room object.

        Raises:
            Exception: If room object is not available.
        z-: missing room object (pipeline not started?))r   	Exceptionr   rH   rH   rI   room   s   
zLiveKitTransportClient.roomsetupc                    s   | j rdS |j| _ tj| j  d| _| jd| j | jd| j	 | jd| j
 | jd| j | jd| j | jd| j | jd	| j dS )
zSetup the client with task manager and room initialization.

        Args:
            setup: The frame processor setup configuration.
        N)loopparticipant_connectedparticipant_disconnectedtrack_subscribedtrack_unsubscribeddata_received	connecteddisconnected)r   task_managerr!   Roomget_event_loopr   r   on!_on_participant_connected_wrapper$_on_participant_disconnected_wrapper_on_track_subscribed_wrapper_on_track_unsubscribed_wrapper_on_data_received_wrapper_on_connected_wrapper_on_disconnected_wrapperr\   r   rH   rH   rI   r      s   zLiveKitTransportClient.setupc                    s   |   I dH  dS )zCleanup client resources.N)
disconnectr   rH   rH   rI   cleanup  s   zLiveKitTransportClient.cleanupframec                    s   | j jp|j| _dS )zStart the client and initialize audio components.

        Args:
            frame: The start frame containing initialization parameters.
        N)r}   audio_out_sample_rate_out_sample_rater\   r   rH   rH   rI   start  s   zLiveKitTransportClient.startr(   r&   r)   r/   )
multiplierminmax)stopwaitc                    s  | j 4 I dH  | jr|  jd7  _	 W d  I dH  dS td| j  zt| jj| j| j	t
jdddI dH  d| _|  jd7  _| jjj| _td| j  t
| j| jj| _t
jd| j| _t
 }t
jj|_| jj| j|I dH  | j I dH  |  }|r| jsd| _| j |d	 I dH  W n t!y } zt"d
| j d|   d}~ww W d  I dH  dS 1 I dH sw   Y  dS )z-Connect to the LiveKit room with retry logic.Nr&   zConnecting to T)auto_subscribe)optionszConnected to zpipecat-audior   zError connecting to : )#r   r   r   r   infor|   r   connectrz   r{   r!   RoomOptionslocal_participantsidr   AudioSourcer   r}   audio_out_channelsr   LocalAudioTrackcreate_audio_trackr   TrackPublishOptionsTrackSourceSOURCE_MICROPHONEsourcepublish_trackr~   rf   get_participantsr   rp   r   error)r\   r   participantserH   rH   rI   r     sN   



.zLiveKitTransportClient.connectc              	      s   | j 4 I dH T |  jd8  _| jr| jdkr$	 W d  I dH  dS td| j  | j I dH  | j	 I dH  d| _td| j  | j
 I dH  W d  I dH  dS 1 I dH sbw   Y  dS )z!Disconnect from the LiveKit room.Nr&   r   zDisconnecting from FDisconnected from )r   r   r   r   r   r|   r~   rh   r   r   rg   r   rH   rH   rI   r   B  s   .z!LiveKitTransportClient.disconnectNdatar@   c              
      s   | j sdS z"|r| jjj|d|gdI dH  W dS | jjj|ddI dH  W dS  tyC } ztd|  W Y d}~dS d}~ww )zSend data to participants in the room.

        Args:
            data: The data bytes to send.
            participant_id: Optional specific participant to send to.
        NT)reliabledestination_identities)r   zError sending data: )r   r   r   publish_datar   r   r   )r\   r   r@   r   rH   rH   rI   	send_dataR  s   z LiveKitTransportClient.send_datadigitc              
      s   | j sdS |tvrtd|  dS t| }z| jjj||dI dH  W dS  tyE } ztd| d|  W Y d}~dS d}~ww )zkSend DTMF tone to the room.

        Args:
            digit: The DTMF digit to send (0-9, \*, #).
        NzInvalid DTMF digit: )coder   zError sending DTMF tone r   )	r   DTMF_CODE_MAPr   warningr   r   publish_dtmfr   r   )r\   r   r   r   rH   rH   rI   	send_dtmff  s   $z LiveKitTransportClient.send_dtmfaudio_framec              
      sb   | j r| js	dS z| j|I dH  W dS  ty0 } ztd|  W Y d}~dS d}~ww )zxPublish an audio frame to the room.

        Args:
            audio_frame: The LiveKit audio frame to publish.
        FNTzError publishing audio: )r   r   capture_framer   r   r   )r\   r   r   rH   rH   rI   publish_audioz  s   z$LiveKitTransportClient.publish_audioc                 C   s   dd | j j D S )hGet list of participant IDs in the room.

        Returns:
            List of participant IDs.
        c                 S   s   g | ]}|j qS rH   )r   ).0prH   rH   rI   
<listcomp>  s    z;LiveKitTransportClient.get_participants.<locals>.<listcomp>)r   remote_participantsvaluesr   rH   rH   rI   r     s   z'LiveKitTransportClient.get_participantsc                    s.   | j j|}|r|j|j|j|jdS i S )Get metadata for a specific participant.

        Args:
            participant_id: ID of the participant to get metadata for.

        Returns:
            Dictionary containing participant metadata.
        )idnamemetadatais_speaking)r   r   getr   r   r   r   )r\   r@   participantrH   rH   rI   get_participant_metadata  s   	z/LiveKitTransportClient.get_participant_metadatar   c                    s   | j j|I dH  dS zmSet metadata for the local participant.

        Args:
            metadata: Metadata string to set.
        N)r   r   set_metadatar\   r   rH   rH   rI   set_participant_metadata  s   z/LiveKitTransportClient.set_participant_metadatac                    F   | j j|}|r|j D ]}|jdkr|dI dH  qdS dS )~Mute a specific participant's audio tracks.

        Args:
            participant_id: ID of the participant to mute.
        audioFNr   r   r   tracksr   kindset_enabledr\   r@   r   trackrH   rH   rI   mute_participant     
z'LiveKitTransportClient.mute_participantc                    r   )Unmute a specific participant's audio tracks.

        Args:
            participant_id: ID of the participant to unmute.
        r   TNr   r   rH   rH   rI   unmute_participant  r   z)LiveKitTransportClient.unmute_participantr   c                 C      | j | ||  d dS )z)Wrapper for participant connected events.z!::_async_on_participant_connectedN)r   create_task_async_on_participant_connectedr\   r   rH   rH   rI   r        z8LiveKitTransportClient._on_participant_connected_wrapperc                 C   r   )z,Wrapper for participant disconnected events.z$::_async_on_participant_disconnectedN)r   r   "_async_on_participant_disconnectedr   rH   rH   rI   r     r   z;LiveKitTransportClient._on_participant_disconnected_wrapperr   publicationc                 C   "   | j | ||||  d dS )z$Wrapper for track subscribed events.z::_async_on_track_subscribedN)r   r   _async_on_track_subscribedr\   r   r  r   rH   rH   rI   r        z3LiveKitTransportClient._on_track_subscribed_wrapperc                 C   r  )z&Wrapper for track unsubscribed events.z::_async_on_track_unsubscribedN)r   r   _async_on_track_unsubscribedr  rH   rH   rI   r     r  z5LiveKitTransportClient._on_track_unsubscribed_wrapperc                 C   r   )z!Wrapper for data received events.z::_async_on_data_receivedN)r   r   _async_on_data_receivedr\   r   rH   rH   rI   r     r   z0LiveKitTransportClient._on_data_received_wrapperc                 C      | j |  |  d dS )zWrapper for connected events.z::_async_on_connectedN)r   r   _async_on_connectedr   rH   rH   rI   r     s   z,LiveKitTransportClient._on_connected_wrapperc                 C   r	  )z Wrapper for disconnected events.z::_async_on_disconnectedN)r   r   _async_on_disconnectedr   rH   rH   rI   r     s   z/LiveKitTransportClient._on_disconnected_wrapperc                    sP   t d|j  | j|jI dH  | js&d| _| j|jI dH  dS dS )$Handle participant connected events.zParticipant connected: NT)r   r   identityr~   ri   r   r   rp   r   rH   rH   rI   r     s   z6LiveKitTransportClient._async_on_participant_connectedc                    sF   t d|j  | j|jI dH  t|  dkr!d| _dS dS )'Handle participant disconnected events.zParticipant disconnected: Nr   F)	r   r   r  r~   rj   r   lenr   r   r   rH   rH   rI   r     s   
z9LiveKitTransportClient._async_on_participant_disconnectedc                    s   |j tjjkr;td|j d|j  || j|j< t|}| j	
| ||j|  d | j|jI dH  dS |j tjjkrytd|j d|j  || j|j< | jjrmt|}| j	
| ||j|  d | j|jI dH  dS dS )zHandle track subscribed events.zAudio track subscribed: z from participant z::_process_audio_streamNzVideo track subscribed: z::_process_video_stream)r   r!   	TrackKind
KIND_AUDIOr   r   r   r   AudioStreamr   r   _process_audio_streamr~   rk   
KIND_VIDEOr   r}   video_in_enabledVideoStream_process_video_streamrm   )r\   r   r  r   audio_streamvideo_streamrH   rH   rI   r    s*   

z1LiveKitTransportClient._async_on_track_subscribedc                    sl   t d|j d|j  |jtjjkr!| j	|jI dH  dS |jtjj
kr4| j|jI dH  dS dS )z!Handle track unsubscribed events.zTrack unsubscribed: z from N)r   r   r   r  r   r!   r  r  r~   rl   r  rn   r  rH   rH   rI   r  *  s   z3LiveKitTransportClient._async_on_track_unsubscribedc                    s    | j |j|jjI dH  dS )Handle data received events.N)r~   ro   r   r   r   r  rH   rH   rI   r  7  s   z.LiveKitTransportClient._async_on_data_receivedc                    s   | j  I dH  dS )zHandle connected events.N)r~   rf   r   rH   rH   rI   r
  ;     z*LiveKitTransportClient._async_on_connectedc                    s4   d| _ td| j d|  | j I dH  dS )zHandle disconnected events.Fr   z
. Reason: N)r   r   r   r|   r~   rg   )r\   reasonrH   rH   rI   r  ?  s   z-LiveKitTransportClient._async_on_disconnectedr  c                    b   t d|  |2 z"3 dH W }t|tjr#| j||fI dH  qt dt|  q6 dS )z1Process incoming audio stream from a participant.z0Started processing audio stream for participant N Received unexpected event type: )	r   r   
isinstancer!   AudioFrameEventr   putr   type)r\   r  r@   eventrH   rH   rI   r  E     z,LiveKitTransportClient._process_audio_streamc                 C  $   	 | j  I dH \}}||fV  q)z(Get the next audio frame from the queue.TN)r   r   r\   r   r@   rH   rH   rI   get_next_audio_frameN  
   
z+LiveKitTransportClient.get_next_audio_framer  c                    r  )z1Process incoming video stream from a participant.z0Started processing video stream for participant Nr  )	r   r   r  r!   VideoFrameEventr   r!  r   r"  )r\   r  r@   r#  rH   rH   rI   r  T  r$  z,LiveKitTransportClient._process_video_streamc                 C  r%  )z(Get the next video frame from the queue.TN)r   r   r&  rH   rH   rI   get_next_video_frame]  r(  z+LiveKitTransportClient.get_next_video_framec                 C   s   | j  dS )z6String representation of the LiveKit transport client.z::LiveKitTransportClient)r   r   rH   rH   rI   __str__c  s   zLiveKitTransportClient.__str__N)>rB   rC   rD   rE   rF   rd   re   r   propertyr@   r!   r   r   r   r   r   r   r   r#   r$   r%   r   r   rq   r   r   r   
AudioFrameboolr   r   r   dictr   r   r   r   RemoteParticipantr   r   TrackRemoteTrackPublicationr   r   
DataPacketr   r   r   r   r   r  r  r  r
  r  r  r  r'  r  r  r*  r+  rH   rH   rH   rI   rr      s    
'
-




		rr   c                       s   e Zd ZdZdededef fddZede	e
 fdd	Zd
ef fddZd
ef fddZd
ef fddZdef fddZ fddZdedefddZdd Zdd Zdejdefdd Zd!ejdefd"d#Z   Z!S )$LiveKitInputTransportzHandles incoming media streams and events from LiveKit rooms.

    Processes incoming audio streams from room participants and forwards them
    as Pipecat frames, including audio resampling and VAD integration.
    	transportclientrv   c                    sF   t  j|fi | || _|| _d| _d| _|j| _t | _	d| _
dS )a  Initialize the LiveKit input transport.

        Args:
            transport: The parent transport instance.
            client: LiveKitTransportClient instance.
            params: Configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        NF)rT   r   
_transport_client_audio_in_task_video_in_taskvad_analyzer_vad_analyzerr
   
_resampler_initializedr\   r6  r7  rv   kwargsr]   rH   rI   r   o  s   
zLiveKitInputTransport.__init__r   c                 C   r   )zzGet the Voice Activity Detection analyzer.

        Returns:
            The VAD analyzer instance if configured.
        )r=  r   rH   rH   rI   r<    r   z"LiveKitInputTransport.vad_analyzerr   c                    s   t  |I dH  | jrdS d| _| j|I dH  | j I dH  | js2| jjr2| | 	 | _| j
sA| jjrA| |  | _
| |I dH  td dS )zStart the input transport and connect to LiveKit room.

        Args:
            frame: The start frame containing initialization parameters.
        NTzLiveKitInputTransport started)rT   r   r?  r9  r   r:  r}   audio_in_enabledr   _audio_in_task_handlerr;  r  _video_in_task_handlerset_transport_readyr   r   r   r]   rH   rI   r     s   zLiveKitInputTransport.startc                    sb   t  |I dH  | j I dH  | jr| | jI dH  | jr*| | jI dH  td dS )zStop the input transport and disconnect from LiveKit room.

        Args:
            frame: The end frame signaling transport shutdown.
        NzLiveKitInputTransport stopped)	rT   r   r9  r   r:  cancel_taskr;  r   r   r   r]   rH   rI   r     s   zLiveKitInputTransport.stopc                    sp   t  |I dH  | j I dH  | jr"| jjr"| | jI dH  | jr4| jj	r6| | jI dH  dS dS dS )zCancel the input transport and disconnect from LiveKit room.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)
rT   cancelr9  r   r:  r}   rB  rF  r;  r  r   r]   rH   rI   rG    s   zLiveKitInputTransport.cancelr   c                    *   t  |I dH  | j|I dH  dS )zSetup the input transport with shared client setup.

        Args:
            setup: The frame processor setup configuration.
        NrT   r   r9  r   r]   rH   rI   r        zLiveKitInputTransport.setupc                    &   t   I dH  | j I dH  dS )z-Cleanup input transport and shared resources.NrT   r   r8  r   r]   rH   rI   r        zLiveKitInputTransport.cleanupmessagesenderc                    s"   t ||d}| |I dH  dS )zPush an application message as an urgent transport frame.

        Args:
            message: The message data to send.
            sender: ID of the message sender.
        rN  r@   N)rK   
push_frame)r\   rN  rO  r   rH   rH   rI   push_app_message  s   z&LiveKitInputTransport.push_app_messagec                    s   t d | j }|2 z/3 dH W }|r<|\}}| |I dH }t|jdkr)qt||j|j|j	d}| 
|I dH  q6 dS )z/Handle incoming audio frames from participants.zAudio input task startedNr   )user_idr   sample_ratenum_channels)r   r   r9  r'  !_convert_livekit_audio_to_pipecatr  r   r   rT  rU  push_audio_frame)r\   audio_iterator
audio_dataaudio_frame_eventr@   pipecat_audio_frameinput_audio_framerH   rH   rI   rC    s(   


z,LiveKitInputTransport._audio_in_task_handlerc                    s   t d | j }|2 z03 dH W }|r=|\}}| j|dI dH }t|jdkr*qt||j|j|j	d}| 
|I dH  q6 dS )z/Handle incoming video frames from participants.zVideo input task startedN)video_frame_eventr   )rS  imagesizeformat)r   r   r9  r*  !_convert_livekit_video_to_pipecatr  r^  r   r_  r`  push_video_frame)r\   video_iterator
video_datar]  r@   pipecat_video_frameinput_video_framerH   rH   rI   rD    s(   

z,LiveKitInputTransport._video_in_task_handlerrZ  c                    s:   |j }| j|j |j| jI dH }t|| j|jdS )z3Convert LiveKit audio frame to Pipecat audio frame.N)r   rT  rU  )r   r>  resampler   tobytesrT  r   rU  )r\   rZ  r   rY  rH   rH   rI   rV  
  s   
z7LiveKitInputTransport._convert_livekit_audio_to_pipecatr]  c                    s.   |j tjj}t|j|j|jfdd}|S )z3Convert LiveKit video frame to Pipecat video frame.RGB)r^  r_  r`  )	r   convertproto_video_frameVideoBufferTypeRGB24r   r   widthheight)r\   r]  	rgb_frameimage_framerH   rH   rI   ra    s   
z7LiveKitInputTransport._convert_livekit_video_to_pipecat)"rB   rC   rD   rE   r   rr   rd   r   r-  r   r   r<  r   r   r   r   r   rG  r   r   r   r   rF   rR  rC  rD  r!   r   r   rV  r)  r   ra  ra   rH   rH   r]   rI   r5  h  s:    	

r5  c                       s   e Zd ZdZdededef fddZdef fdd	Z	de
f fd
dZdef fddZdef fddZ fddZdeeB fddZdedefddZdefddZdeeB fddZdedejfddZ  ZS )LiveKitOutputTransportzHandles outgoing media streams and events to LiveKit rooms.

    Manages sending audio frames and data messages to LiveKit room participants,
    including audio format conversion for LiveKit compatibility.
    r6  r7  rv   c                    s*   t  j|fi | || _|| _d| _dS )a  Initialize the LiveKit output transport.

        Args:
            transport: The parent transport instance.
            client: LiveKitTransportClient instance.
            params: Configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        FN)rT   r   r8  r9  r?  r@  r]   rH   rI   r   /  s   
zLiveKitOutputTransport.__init__r   c                    sd   t  |I dH  | jrdS d| _| j|I dH  | j I dH  | |I dH  td dS )zStart the output transport and connect to LiveKit room.

        Args:
            frame: The start frame containing initialization parameters.
        NTzLiveKitOutputTransport started)rT   r   r?  r9  r   rE  r   r   r   r]   rH   rI   r   E  s   zLiveKitOutputTransport.startc                    s2   t  |I dH  | j I dH  td dS )zStop the output transport and disconnect from LiveKit room.

        Args:
            frame: The end frame signaling transport shutdown.
        NzLiveKitOutputTransport stopped)rT   r   r9  r   r   r   r   r]   rH   rI   r   W  s   zLiveKitOutputTransport.stopc                    s(   t  |I dH  | j I dH  dS )zCancel the output transport and disconnect from LiveKit room.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)rT   rG  r9  r   r   r]   rH   rI   rG  a  s   zLiveKitOutputTransport.cancelr   c                    rH  )zSetup the output transport with shared client setup.

        Args:
            setup: The frame processor setup configuration.
        NrI  r   r]   rH   rI   r   j  rJ  zLiveKitOutputTransport.setupc                    rK  )z.Cleanup output transport and shared resources.NrL  r   r]   rH   rI   r   s  rM  zLiveKitOutputTransport.cleanupc                    sf   |j }t|trtj|dd}t|ttfr&| j|	 |j
I dH  dS | j|	 I dH  dS )zySend a transport message to participants.

        Args:
            frame: The transport message frame to send.
        F)ensure_asciiN)rN  r  r0  jsondumpsr>   rK   r9  r   encoder@   )r\   r   rN  rH   rH   rI   send_messagex  s   
z#LiveKitOutputTransport.send_messager   c                    s    |  |j}| j|I dH S )zWrite an audio frame to the LiveKit room.

        Args:
            frame: The audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        N)!_convert_pipecat_audio_to_livekitr   r9  r   )r\   r   livekit_audiorH   rH   rI   write_audio_frame  s   	z(LiveKitOutputTransport.write_audio_framec                 C   s   dS )zLiveKit supports native DTMF via telephone events.

        Returns:
            True, as LiveKit supports native DTMF transmission.
        TrH   r   rH   rH   rI   _supports_native_dtmf  s   z,LiveKitOutputTransport._supports_native_dtmfc                    s   | j |jjI dH  dS )zUse LiveKit's native publish_dtmf method for telephone events.

        Args:
            frame: The DTMF frame to write.
        N)r9  r   buttonvaluer   rH   rH   rI   _write_dtmf_native  s   z)LiveKitOutputTransport._write_dtmf_nativepipecat_audioc                 C   s4   d}t || }|| jj }tj|| j| jj|dS )z2Convert Pipecat audio data to LiveKit audio frame.r'   )r   rT  rU  samples_per_channel)r  r}   r   r!   r.  rT  )r\   r  bytes_per_sampletotal_samplesr  rH   rH   rI   rx    s   z8LiveKitOutputTransport._convert_pipecat_audio_to_livekit) rB   rC   rD   rE   r   rr   rd   r   r   r   r   r   r   rG  r   r   r   r   r   rw  r   r/  rz  r{  r   r   r~  rq   r!   r.  rx  ra   rH   rH   r]   rI   rr  (  s*    
		
rr  c                       s  e Zd ZdZ			dFdedededee dee dee f fd	d
ZdefddZ	de
fddZedefddZdefddZdee fddZdedefddZdefddZdefddZdefdd Zd!d" Zd#d$ Zd%d& Zdefd'd(Zdefd)d*Zdefd+d,Zdefd-d.Zdefd/d0Zdefd1d2Zd3e defd4d5Z!dGd6edee fd7d8Z"dGd6edee fd9d:Z#d;d< Z$d=d> Z%d?d@ Z&dAefdBdCZ'defdDdEZ(  Z)S )HLiveKitTransporta  Transport implementation for LiveKit real-time communication.

    Provides comprehensive LiveKit integration including audio streaming, data
    messaging, participant management, and room event handling for conversational
    AI applications.

    Event handlers available:

    - on_connected: Called when the bot connects to the room.
    - on_disconnected: Called when the bot disconnects from the room.
    - on_before_disconnect: [sync] Called just before the bot disconnects.
    - on_call_state_updated: Called when the call state changes. Args: (state: str)
    - on_first_participant_joined: Called when the first participant joins.
      Args: (participant_id: str)
    - on_participant_connected: Called when a participant connects.
      Args: (participant_id: str)
    - on_participant_disconnected: Called when a participant disconnects.
      Args: (participant_id: str)
    - on_participant_left: Called when a participant leaves.
      Args: (participant_id: str, reason: str)
    - on_audio_track_subscribed: Called when an audio track is subscribed.
      Args: (participant_id: str)
    - on_audio_track_unsubscribed: Called when an audio track is unsubscribed.
      Args: (participant_id: str)
    - on_video_track_subscribed: Called when a video track is subscribed.
      Args: (participant_id: str)
    - on_video_track_unsubscribed: Called when a video track is unsubscribed.
      Args: (participant_id: str)
    - on_data_received: Called when data is received from a participant.
      Args: (data: bytes, participant_id: str)

    Example::

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant_id):
            await task.queue_frame(TTSSpeakFrame("Hello!"))

        @transport.event_handler("on_participant_disconnected")
        async def on_participant_disconnected(transport, participant_id):
            await task.queue_frame(EndFrame())
    Nrs   rt   ru   rv   
input_nameoutput_namec                    s   t  j||d t| j| j| j| j| j| j| j	| j
| j| j| jd}|p&t | _t|||| j|| j| _d| _d| _| d | d | d | d | d | d	 | d
 | d | d | d | d | d | jddd dS )a  Initialize the LiveKit transport.

        Args:
            url: LiveKit server URL to connect to.
            token: Authentication token for the room.
            room_name: Name of the LiveKit room to join.
            params: Configuration parameters for the transport.
            input_name: Optional name for the input transport.
            output_name: Optional name for the output transport.
        )r  r  )rf   rg   rh   ri   rj   rk   rl   rm   rn   ro   rp   Nrf   rg   ri   rj   rk   rl   rm   rn   ro   rp   on_participant_lefton_call_state_updatedrh   T)sync)rT   r   re   _on_connected_on_disconnected_on_before_disconnect_on_participant_connected_on_participant_disconnected_on_audio_track_subscribed_on_audio_track_unsubscribed_on_video_track_subscribed_on_video_track_unsubscribed_on_data_received_on_first_participant_joinedrd   r}   rr   r   r9  _input_output_register_event_handler)r\   rs   rt   ru   rv   r  r  rw   r]   rH   rI   r     sB   











zLiveKitTransport.__init__r   c                 C   $   | j st| | j| j| jd| _ | j S )zGet the input transport for receiving media and events.

        Returns:
            The LiveKit input transport instance.
        r   )r  r5  r9  r}   _input_namer   rH   rH   rI   input  
   zLiveKitTransport.inputc                 C   r  )zGet the output transport for sending media and events.

        Returns:
            The LiveKit output transport instance.
        r  )r  rr  r9  r}   _output_namer   rH   rH   rI   output$  r  zLiveKitTransport.outputc                 C   s   | j jS )zyGet the participant ID for this transport.

        Returns:
            The participant ID assigned by LiveKit.
        )r9  r@   r   rH   rH   rI   r@   0  s   zLiveKitTransport.participant_idr   c                    s&   | j r| j |tjI dH  dS dS )zlSend an audio frame to the LiveKit room.

        Args:
            frame: The audio frame to send.
        N)r  queue_framer   
DOWNSTREAMr   rH   rH   rI   
send_audio9  s   zLiveKitTransport.send_audioc                 C   s
   | j  S )r   )r9  r   r   rH   rH   rI   r   B  s   
z!LiveKitTransport.get_participantsr@   c                    s   | j |I dH S )r   N)r9  r   r\   r@   rH   rH   rI   r   J  s   	z)LiveKitTransport.get_participant_metadatar   c                       | j |I dH  dS r   )r9  r   r   rH   rH   rI   r   U     zLiveKitTransport.set_metadatac                    r  )r   N)r9  r   r  rH   rH   rI   r   ]  r  z!LiveKitTransport.mute_participantc                    r  )r   N)r9  r   r  rH   rH   rI   r   e  r  z#LiveKitTransport.unmute_participantc                    s4   |  dI dH  | jr| jt I dH  dS dS )zHandle room connected events.rf   N)_call_event_handlerr  rQ  r   r   rH   rH   rI   r  m  s
   zLiveKitTransport._on_connectedc                       |  dI dH  dS )z Handle room disconnected events.rg   Nr  r   rH   rH   rI   r  s  r  z!LiveKitTransport._on_disconnectedc                    r  )z(Handle before disconnection room events.rh   Nr  r   rH   rH   rI   r  w  r  z&LiveKitTransport._on_before_disconnectc                    s6   |  d|I dH  | jr| jt I dH  dS dS )r  ri   N)r  r  rQ  r   r  rH   rH   rI   r  {  s
   z*LiveKitTransport._on_participant_connectedc                    s,   |  d|I dH  |  d|dI dH  dS )r  rj   Nr  r   r  r  rH   rH   rI   r    s   z-LiveKitTransport._on_participant_disconnectedc                    R   |  d|I dH  | jjj|}|r%|j D ]}| j|j|| qdS dS )z%Handle audio track subscribed events.rk   N)	r  r9  r   r   r   audio_tracksr   r   r   r\   r@   r   r  rH   rH   rI   r       z+LiveKitTransport._on_audio_track_subscribedc                       |  d|I dH  dS )z'Handle audio track unsubscribed events.rl   Nr  r  rH   rH   rI   r       z-LiveKitTransport._on_audio_track_unsubscribedc                    r  )z%Handle video track subscribed events.rm   N)	r  r9  r   r   r   video_tracksr   r   r   r  rH   rH   rI   r    r  z+LiveKitTransport._on_video_track_subscribedc                    r  )z'Handle video track unsubscribed events.rn   Nr  r  rH   rH   rI   r    r  z-LiveKitTransport._on_video_track_unsubscribedr   c                    s8   | j r| j | |I dH  | d||I dH  dS )r  Nro   )r  rR  decoder  )r\   r   r@   rH   rH   rI   r    s   z"LiveKitTransport._on_data_receivedrN  c                    .   | j rt||d}| j |I dH  dS dS )zSend a message to participants in the room.

        Args:
            message: The message string to send.
            participant_id: Optional specific participant to send to.
        rP  N)r  r>   rw  r\   rN  r@   r   rH   rH   rI   rw       zLiveKitTransport.send_messagec                    r  )zSend an urgent message to participants in the room.

        Args:
            message: The urgent message string to send.
            participant_id: Optional specific participant to send to.
        rP  N)r  rK   rw  r  rH   rH   rI   send_message_urgent  r  z$LiveKitTransport.send_message_urgentc                       dS )zXHandle room events.

        Args:
            event: The room event to handle.
        NrH   r\   r#  rH   rH   rI   on_room_event     zLiveKitTransport.on_room_eventc                    r  )zfHandle participant events.

        Args:
            event: The participant event to handle.
        NrH   r  rH   rH   rI   on_participant_event  r  z%LiveKitTransport.on_participant_eventc                    r  )zZHandle track events.

        Args:
            event: The track event to handle.
        NrH   r  rH   rH   rI   on_track_event  r  zLiveKitTransport.on_track_eventstatec                    r  )z Handle call state update events.r  Nr  )r\   r  rH   rH   rI   _on_call_state_updated  r  z'LiveKitTransport._on_call_state_updatedc                    r  )z'Handle first participant joined events.rp   Nr  r  rH   rH   rI   r    r  z-LiveKitTransport._on_first_participant_joined)NNNr,  )*rB   rC   rD   rE   rF   r   rd   r   r5  r  rr  r  r-  r@   r   r  r   r   r0  r   r   r   r   r  r  r  r  r  r  r  r  r  rq   r  rw  r  r  r  r  r  r  ra   rH   rH   r]   rI   r    sZ    /8	

			r  )FrE   r   rt  dataclassesr   typingr   r   r   r   r   logurur   pydanticr	   pipecat.audio.utilsr
   pipecat.audio.vad.vad_analyzerr   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   r   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   "pipecat.utils.asyncio.task_managerr    livekitr!   livekit.rtc._protor"   rk  tenacityr#   r$   r%   ModuleNotFoundErrorr   r   r   r   r>   rK   rN   rc   rd   re   rr   r5  rr  r  rH   rH   rH   rI   <module>   sr   @


	   = A 