o
    i*t                     @   sT  d Z ddlZddlmZ ddlmZmZmZmZm	Z	 ddl
Z
ddlmZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZmZmZmZ dd	lmZmZm Z  dd
l!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' ddl(m)Z)m*Z*m+Z+ G dd dZ,G dd deZ-G dd de*Z.G dd dZ/G dd de"Z0G dd de$Z1G dd de&Z2dS )a  Tavus transport implementation for Pipecat.

This module provides integration with the Tavus platform for creating conversational
AI applications with avatars. It manages conversation sessions and provides real-time
audio/video streaming capabilities through the Tavus API.
    N)partial)Any	AwaitableCallableMappingOptional)	AudioData)logger)	BaseModel)BotConnectedFrameCancelFrameClientConnectedFrameEndFrameFrameInputAudioRawFrameInterruptionFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameDirectionFrameProcessorFrameProcessorSetup)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)DailyCallbacksDailyParamsDailyTransportClientc                   @   sf   e Zd ZdZdZdZdZdedej	fddZ
d	ed
edefddZdefddZd
edefddZdS )TavusApizHelper class for interacting with the Tavus API (v2).

    Provides methods for creating and managing conversations with Tavus avatars,
    including conversation lifecycle management and persona information retrieval.
    zhttps://tavusapi.com/v2zdev-conversationTestTavusTransportapi_keysessionc                 C   s*   || _ || _d| j d| _td| _dS )zInitialize the TavusApi client.

        Args:
            api_key: Tavus API key for authentication.
            session: An aiohttp session for making HTTP requests.
        zapplication/json)zContent-Typez	x-api-keyTAVUS_SAMPLE_ROOM_URLN)_api_key_session_headersosgetenv_dev_room_url)selfr"   r#    r,   V/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/transports/tavus/transport.py__init__:   s   zTavusApi.__init__
replica_id
persona_idreturnc              	      s   | j r| j| j dS td| d|  | j d}||d}| jj|| j|d4 I dH  }|  |	 I dH }td|  |W  d  I dH  S 1 I dH sUw   Y  dS )	a?  Create a new conversation with the specified replica and persona.

        Args:
            replica_id: ID of the replica to use in the conversation.
            persona_id: ID of the persona to use in the conversation.

        Returns:
            Dictionary containing conversation_id and conversation_url.
        )conversation_idconversation_urlz%Creating Tavus conversation: replica=z
, persona=z/conversations)r/   r0   )headersjsonNzCreated Tavus conversation: )
r*   MOCK_CONVERSATION_IDr	   debugBASE_URLr&   postr'   raise_for_statusr5   )r+   r/   r0   urlpayloadrresponser,   r,   r-   create_conversationG   s    
0zTavusApi.create_conversationr2   c              	      s   |du s
|| j krdS | j d| d}| jj|| jd4 I dH }|  td|  W d  I dH  dS 1 I dH s@w   Y  dS )zqEnd an existing conversation.

        Args:
            conversation_id: ID of the conversation to end.
        Nz/conversations/z/endr4   zEnded Tavus conversation )r6   r8   r&   r9   r'   r:   r	   r7   )r+   r2   r;   r=   r,   r,   r-   end_conversationc   s   .zTavusApi.end_conversationc              	      s   | j dur	| jS | j d| }| jj|| jd4 I dH "}|  | I dH }t	d|  |d W  d  I dH  S 1 I dH sFw   Y  dS )zGet the name of a persona by ID.

        Args:
            persona_id: ID of the persona to retrieve.

        Returns:
            The name of the persona.
        Nz
/personas/r@   zFetched Tavus persona: persona_name)
r*   MOCK_PERSONA_NAMEr8   r&   getr'   r:   r5   r	   r7   )r+   r0   r;   r=   r>   r,   r,   r-   get_persona_nameq   s   
	0zTavusApi.get_persona_nameN)__name__
__module____qualname____doc__r8   r6   rC   straiohttpClientSessionr.   dictr?   rA   rE   r,   r,   r,   r-   r    /   s    r    c                   @   sn   e Zd ZU dZeeeef ged f e	d< eeeef ged f e	d< eeeef eged f e	d< dS )TavusCallbacksa  Callback handlers for Tavus events.

    Parameters:
        on_joined: Called when the bot joins the Daily room.
        on_participant_joined: Called when a participant joins the conversation.
        on_participant_left: Called when a participant leaves the conversation.
    N	on_joinedon_participant_joinedon_participant_left)
rF   rG   rH   rI   r   r   rJ   r   r   __annotations__r,   r,   r,   r-   rN      s
   
 $rN   c                   @   s6   e Zd ZU dZdZeed< dZeed< dZeed< dS )TavusParamsa(  Configuration parameters for the Tavus transport.

    Parameters:
        audio_in_enabled: Whether to enable audio input from participants.
        audio_out_enabled: Whether to enable audio output to participants.
        microphone_out_enabled: Whether to enable microphone output track.
    Taudio_in_enabledaudio_out_enabledFmicrophone_out_enabledN)	rF   rG   rH   rI   rT   boolrR   rU   rV   r,   r,   r,   r-   rS      s
   
 rS   c                   @   s\  e Zd ZdZe dddededededed	ed
ejddfddZ	defddZ
defddZdd Zdd Zdd Zdd ZdefddZdefdd Zd!d" Z	#	$	%dDd&ed'ed(ed)ed*ef
d+d,Z	-	.	/dEd&ed'ed0ed1ed2ef
d3d4ZdeeB fd5d6Zedefd7d8Zedefd9d:ZdFd;d<ZdGd=d>Z de!de"fd?d@Z#dAefdBdCZ$dS )HTavusTransportClienta  Transport client that integrates Pipecat with the Tavus platform.

    A transport client that integrates a Pipecat Bot with the Tavus platform by managing
    conversation sessions using the Tavus API.

    This client uses `TavusApi` to interact with the Tavus backend services. When a conversation
    is started via `TavusApi`, Tavus provides a `roomURL` that can be used to connect the Pipecat Bot
    into the same virtual room where the TavusBot is operating.
    pipecat-stream)paramsr0   bot_namerZ   	callbacksr"   r/   r0   r#   r1   Nc                C   s:   || _ t||| _|| _|| _d| _d| _|| _|| _dS )a  Initialize the Tavus transport client.

        Args:
            bot_name: The name of the Pipecat bot instance.
            params: Optional parameters for Tavus operation.
            callbacks: Callback handlers for Tavus-related events.
            api_key: API key for authenticating with Tavus API.
            replica_id: ID of the replica to use in the Tavus conversation.
            persona_id: ID of the Tavus persona. Defaults to "pipecat-stream",
                which signals Tavus to use the TTS voice of the Pipecat bot
                instead of a Tavus persona voice.
            session: The aiohttp session for making async HTTP requests.
        N)		_bot_namer    _api_replica_id_persona_id_conversation_id_client
_callbacks_params)r+   r[   rZ   r\   r"   r/   r0   r#   r,   r,   r-   r.      s   
zTavusTransportClient.__init__c                    s,   | j | j| jI dH }|d | _|d S )z4Initialize the conversation and return the room URL.Nr2   r3   )r^   r?   r_   r`   ra   )r+   r>   r,   r,   r-   _initialize   s   
z TavusTransportClient._initializesetupc              
      s@  | j durtd| j   dS z|  I dH }td"i dt| jdd| jd| jdt| jddt| jddt| jdd	t| jd	d
t| jd
dt| jddt| jddt| jddt| jddt| jddt| jddt| jddt| jddt| jddt| jddt| jddt| jdd| j	j
d| j	jdt| jddt| jddt| jddt| jddt| jddt| jddt| jd}t|dd | j|| j| _| j|I dH  W dS  ty } ztd!|  | j| j I dH  d| _ W Y d}~dS d}~ww )#zSetup the client and initialize the conversation.

        Args:
            setup: The frame processor setup configuration.
        Nz!Conversation ID already defined: on_active_speaker_changedrO   on_lefton_before_leaveon_erroron_app_messageon_call_state_updatedon_client_connectedon_client_disconnectedon_dialin_connectedon_dialin_readyon_dialin_stoppedon_dialin_erroron_dialin_warningon_dialout_answeredon_dialout_connectedon_dialout_stoppedon_dialout_erroron_dialout_warningon_dtmf_eventrP   rQ   on_participant_updatedon_transcription_messageon_recording_startedon_recording_stoppedon_recording_erroron_transcription_stoppedon_transcription_errorPipecatz&Failed to setup TavusTransportClient: r,   )ra   r	   r7   re   r   r   _on_handle_callback
_on_joined_on_leftrc   rP   rQ   r   rd   r]   rb   rf   	Exceptionerrorr^   rA   )r+   rf   room_urldaily_callbackser,   r,   r-   rf      s   
	
 #%zTavusTransportClient.setupc              
      sP   z| j  I dH  W dS  ty' } ztd|  W Y d}~dS d}~ww )zCleanup client resources.NzException during cleanup: )rb   cleanupr   r	   r   )r+   r   r,   r,   r-   r     s   zTavusTransportClient.cleanupc                    s"   t d | j|I dH  dS )zHandle joined event.zTavusTransportClient joined!N)r	   r7   rc   rO   r+   datar,   r,   r-   r     s   
zTavusTransportClient._on_joinedc                    s   t d dS )zHandle left event.zTavusTransportClient left!N)r	   r7   r+   r,   r,   r-   r     s   zTavusTransportClient._on_leftc                    s"   t d| d| d|  dS )zHandle generic callback events.z[Callback] z called with args=z	, kwargs=N)r	   trace)r+   
event_nameargskwargsr,   r,   r-   r     s    z(TavusTransportClient._on_handle_callbackc                    s   | j | jI dH S )zjGet the persona name from the API.

        Returns:
            The name of the current persona.
        N)r^   rE   r`   r   r,   r,   r-   rE      s   z%TavusTransportClient.get_persona_nameframec                    s2   t d | j|I dH  | j I dH  dS )zStart the client and join the room.

        Args:
            frame: The start frame containing initialization parameters.
        z#TavusTransportClient start invoked!N)r	   r7   rb   startjoinr+   r   r,   r,   r-   r   (  s   
zTavusTransportClient.startc                    s0   | j  I dH  | j| jI dH  d| _dS )z)Stop the client and end the conversation.N)rb   leaver^   rA   ra   r   r,   r,   r-   stop2  s   
zTavusTransportClient.stop   cameraRGBparticipant_idcallback	frameratevideo_sourcecolor_formatc                        | j |||||I dH  dS )ao  Capture video from a participant.

        Args:
            participant_id: ID of the participant to capture video from.
            callback: Callback function to handle video frames.
            framerate: Desired framerate for video capture.
            video_source: Video source to capture from.
            color_format: Color format for video frames.
        N)rb   capture_participant_video)r+   r   r   r   r   r   r,   r,   r-   r   8     
z.TavusTransportClient.capture_participant_video
microphone>     audio_sourcesample_ratecallback_interval_msc                    r   )a  Capture audio from a participant.

        Args:
            participant_id: ID of the participant to capture audio from.
            callback: Callback function to handle audio data.
            audio_source: Audio source to capture from.
            sample_rate: Desired sample rate for audio capture.
            callback_interval_ms: Interval between audio callbacks in milliseconds.
        N)rb   capture_participant_audio)r+   r   r   r   r   r   r,   r,   r-   r   M  r   z.TavusTransportClient.capture_participant_audioc                       | j |I dH  dS )eSend a message to participants.

        Args:
            frame: The message frame to send.
        N)rb   send_messager   r,   r,   r-   r   b  s   z!TavusTransportClient.send_messagec                 C      | j jS )z`Get the output sample rate.

        Returns:
            The output sample rate in Hz.
        )rb   out_sample_rater   r,   r,   r-   r   l     z$TavusTransportClient.out_sample_ratec                 C   r   )z^Get the input sample rate.

        Returns:
            The input sample rate in Hz.
        )rb   in_sample_rater   r,   r,   r-   r   u  r   z#TavusTransportClient.in_sample_ratec                    s*   t dd| jdd}| |I dH  dS )z.Send an interrupt message to the conversation.conversationzconversation.interrupt)message_type
event_typer2   )messageN)r   ra   r   )r+   transport_framer,   r,   r-   send_interrupt_message~  s   z+TavusTransportClient.send_interrupt_messagec                    s&   | j sdS | j j||dI dH  dS )Update subscription settings for participants.

        Args:
            participant_settings: Per-participant subscription settings.
            profile_settings: Global subscription profile settings.
        Nparticipant_settingsprofile_settingsrb   update_subscriptionsr+   r   r   r,   r,   r-   r     s   z)TavusTransportClient.update_subscriptionsc                    s   | j sdS | j |I dH S )zWrite an audio frame to the transport.

        Args:
            frame: The audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        FN)rb   write_audio_framer   r,   r,   r-   r     s   	z&TavusTransportClient.write_audio_framedestinationc                    s"   | j sdS | j |I dH  dS )zRegister an audio destination for output.

        Args:
            destination: The destination identifier to register.
        Nrb   register_audio_destinationr+   r   r,   r,   r-   r     s   z/TavusTransportClient.register_audio_destination)r   r   r   )r   r   r   )r1   NNN)%rF   rG   rH   rI   rS   rJ   rN   rK   rL   r.   re   r   rf   r   r   r   r   rE   r   r   r   r   intr   r   r   r   r   propertyr   r   r   r   r   rW   r   r   r,   r,   r,   r-   rX      s    	

!9







rX   c                       s   e Zd ZdZdedef fddZdef fddZ fd	d
Z	de
f fddZdef fddZdef fddZdd ZdededefddZ  ZS )TavusInputTransportzInput transport for receiving audio and events from Tavus conversations.

    Handles incoming audio streams from participants and manages audio capture
    from the Daily room connected to the Tavus conversation.
    clientrZ   c                    s*   t  j|fi | || _|| _d| _dS )zInitialize the Tavus input transport.

        Args:
            client: The Tavus transport client instance.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        FN)superr.   rb   rd   _initializedr+   r   rZ   r   	__class__r,   r-   r.     s   
zTavusInputTransport.__init__rf   c                    *   t  |I dH  | j|I dH  dS )znSetup the input transport.

        Args:
            setup: The frame processor setup configuration.
        Nr   rf   rb   r+   rf   r   r,   r-   rf        zTavusInputTransport.setupc                    &   t   I dH  | j I dH  dS )z"Cleanup input transport resources.Nr   r   rb   r   r   r,   r-   r        zTavusInputTransport.cleanupr   c                    sJ   t  |I dH  | jrdS d| _| j|I dH  | |I dH  dS )z{Start the input transport.

        Args:
            frame: The start frame containing initialization parameters.
        NT)r   r   r   rb   set_transport_readyr   r   r,   r-   r     s   zTavusInputTransport.startc                    (   t  |I dH  | j I dH  dS )zpStop the input transport.

        Args:
            frame: The end frame signaling transport shutdown.
        Nr   r   rb   r   r   r,   r-   r        zTavusInputTransport.stopc                    (   t  |I dH  | j I dH  dS )zyCancel the input transport.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nr   cancelrb   r   r   r   r,   r-   r     r   zTavusInputTransport.cancelc                    sH   | j jr"td|d   | jj|d | j| jjdI dH  dS dS )zStart capturing audio from a participant.

        Args:
            participant: The participant to capture audio from.
        z;TavusTransportClient start capturing audio for participant id)r   r   r   N)rd   rT   r	   inforb   r   _on_participant_audio_datar   r+   participantr,   r,   r-   start_capturing_audio  s   z)TavusInputTransport.start_capturing_audior   audior   c                    s0   t |j|j|jd}||_| |I dH  dS )z'Handle received participant audio data.)r   r   num_channelsN)r   audio_framesr   r   transport_sourcepush_audio_frame)r+   r   r   r   r   r,   r,   r-   r   
  s   z.TavusInputTransport._on_participant_audio_data)rF   rG   rH   rI   rX   r   r.   r   rf   r   r   r   r   r   r   r   r   rJ   r   r   __classcell__r,   r,   r   r-   r     s(    			r   c                       s   e Zd ZdZdedef fddZdef fddZ fd	d
Z	de
f fddZdef fddZdef fddZdeeB fddZdedef fddZdd ZdedefddZdefddZ  ZS )TavusOutputTransportzOutput transport for sending audio and events to Tavus conversations.

    Handles outgoing audio streams to participants and manages the custom
    audio track expected by the Tavus platform.
    r   rZ   c                    s0   t  j|fi | || _|| _d| _d| _dS )zInitialize the Tavus output transport.

        Args:
            client: The Tavus transport client instance.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        FstreamN)r   r.   rb   rd   r   _transport_destinationr   r   r,   r-   r.     s
   
zTavusOutputTransport.__init__rf   c                    r   )zoSetup the output transport.

        Args:
            setup: The frame processor setup configuration.
        Nr   r   r   r,   r-   rf   4  r   zTavusOutputTransport.setupc                    r   )z#Cleanup output transport resources.Nr   r   r   r,   r-   r   =  r   zTavusOutputTransport.cleanupr   c                    sd   t  |I dH  | jrdS d| _| j|I dH  | jr(| j| jI dH  | |I dH  dS )z|Start the output transport.

        Args:
            frame: The start frame containing initialization parameters.
        NT)r   r   r   rb   r   r   r   r   r   r,   r-   r   B  s   zTavusOutputTransport.startc                    r   )zqStop the output transport.

        Args:
            frame: The end frame signaling transport shutdown.
        Nr   r   r   r,   r-   r   V  r   zTavusOutputTransport.stopc                    r   )zzCancel the output transport.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nr   r   r   r,   r-   r   _  r   zTavusOutputTransport.cancelc                    s(   t d|  | j|I dH  dS )r   z%TavusOutputTransport sending message N)r	   r   rb   r   r   r,   r,   r-   r   h  s   z!TavusOutputTransport.send_message	directionc                    s6   t  ||I dH  t|tr|  I dH  dS dS )zProcess frames and handle interruptions.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        N)r   process_frame
isinstancer   _handle_interruptions)r+   r   r   r   r,   r-   r   s  s
   
z"TavusOutputTransport.process_framec                    s   | j  I dH  dS )z8Handle interruption events by sending interrupt message.N)rb   r   r   r,   r,   r-   r   ~  s   z*TavusOutputTransport._handle_interruptionsr1   c                    s   | j |_| j|I dH S )zWrite an audio frame to the Tavus transport.

        Args:
            frame: The audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        N)r   transport_destinationrb   r   r   r,   r,   r-   r     s   
z&TavusOutputTransport.write_audio_framer   c                    r   )zwRegister an audio destination.

        Args:
            destination: The destination identifier to register.
        Nr   r   r,   r,   r-   r     s   z/TavusOutputTransport.register_audio_destination)rF   rG   rH   rI   rX   r   r.   r   rf   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rW   r   rJ   r   r   r,   r,   r   r-   r     s&    			
r   c                       s   e Zd ZdZde ddfdedejdededed	ed
ee dee f fddZ	dd Z
dd Zdd Zd ddZdefddZdefddZdefddZdefddZ  ZS )!TavusTransporta  Transport implementation for Tavus video calls.

    When used, the Pipecat bot joins the same virtual room as the Tavus Avatar and the user.
    This is achieved by using `TavusTransportClient`, which initiates the conversation via
    `TavusApi` and obtains a room URL that all participants connect to.

    Event handlers available:

    - on_connected(transport, data): Bot connected to the room
    - on_client_connected(transport, participant): Participant connected to the session
    - on_client_disconnected(transport, participant): Participant disconnected from the session

    Example::

        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, participant):
            ...
    rY   Nr[   r#   r"   r/   r0   rZ   
input_nameoutput_namec	           
   	      sv   t  j||d || _t| j| j| jd}	td|	|||||d| _d| _	d| _
d| _| d | d | d dS )	aq  Initialize the Tavus transport.

        Args:
            bot_name: The name of the Pipecat bot.
            session: aiohttp session used for async HTTP requests.
            api_key: Tavus API key for authentication.
            replica_id: ID of the replica model used for voice generation.
            persona_id: ID of the Tavus persona. Defaults to "pipecat-stream"
                to use the Pipecat TTS voice.
            params: Optional Tavus-specific configuration parameters.
            input_name: Optional name for the input transport.
            output_name: Optional name for the output transport.
        )r   r   )rO   rP   rQ   r   )r[   r\   r"   r/   r0   r#   rZ   Non_connectedrm   rn   )r   r.   rd   rN   r   _on_participant_joined_on_participant_leftrX   rb   _input_output_tavus_participant_id_register_event_handler)
r+   r[   r#   r"   r/   r0   rZ   r   r   r\   r   r,   r-   r.     s,   	

zTavusTransport.__init__c                    6   |  d|I dH  | jr| jt I dH  dS dS )zHandle bot joined room event.r   N)_call_event_handlerr   
push_framer   r   r,   r,   r-   r     
   zTavusTransport._on_joinedc                    sB   | j  I dH }|di dd|kr| |I dH  dS dS )zHandle participant left events.Nr   userName )rb   rE   rD   _on_client_disconnected)r+   r   reasonrB   r,   r,   r-   r     s
   z#TavusTransport._on_participant_leftc                    s   | j  I dH }|di dd|kr|d | _dS | |I dH  | jrAtd| j d | j| jdd	d
iiidI dH  | jrO| j	|I dH  dS dS )z!Handle participant joined events.Nr   r   r   r   z	Ignoring z's microphonemediar   unsubscribed)r   )
rb   rE   rD   r   _on_client_connectedr	   r7   r   r   r   )r+   r   rB   r,   r,   r-   r     s    z%TavusTransport._on_participant_joinedc                    s   | j j||dI dH  dS )r   r   Nr   r   r,   r,   r-   r     s
   z#TavusTransport.update_subscriptionsr1   c                 C      | j st| j| jd| _ | j S )zGet the input transport for receiving media and events.

        Returns:
            The Tavus input transport instance.
        r   rZ   )r   r   rb   rd   r   r,   r,   r-   input     zTavusTransport.inputc                 C   r  )zGet the output transport for sending media and events.

        Returns:
            The Tavus output transport instance.
        r  )r   r   rb   rd   r   r,   r,   r-   output  r  zTavusTransport.outputr   c                    r   )zHandle client connected events.rm   N)r   r   r   r   r   r,   r,   r-   r  !  r   z#TavusTransport._on_client_connectedc                    s   |  d|I dH  dS )z"Handle client disconnected events.rn   N)r   r   r,   r,   r-   r   '  s   z&TavusTransport._on_client_disconnectedr   )rF   rG   rH   rI   rS   rJ   rK   rL   r   r.   r   r   r   r   r   r  r  r   r  r   r   r,   r,   r   r-   r     s>    	3


r   )3rI   r(   	functoolsr   typingr   r   r   r   r   rK   daily.dailyr   logurur	   pydanticr
   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   r   r   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   "pipecat.transports.daily.transportr   r   r   r    rN   rS   rX   r   r   r   r,   r,   r,   r-   <module>   s0   4V  g 