o
    io                  
   @   s  d Z ddlZddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZmZ ddlZddlmZ ddlmZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddl m!Z! ddl"m#Z# zddl$m%Z% ddl&m'Z' ddl(m)Z* ddl+m,Z, W n  e-y Z. ze/de.  e/d e0de. dZ.[.ww dZ1G dd deZ2G dd deZ3G dd dZ4dS )a  HeyGen implementation for Pipecat.

This module provides integration with the HeyGen platform for creating conversational
AI applications with avatars. It manages conversation sessions and provides real-time
audio/video streaming capabilities through the HeyGen API.
    N)Enum)	AwaitableCallableOptionalUnion)logger)	BaseModel)AudioRawFrameImageRawFrame
StartFrame)FrameProcessorSetup)	HeyGenApiNewSessionRequest)LiveAvatarApiLiveAvatarNewSessionRequest)StandardSessionResponse)TransportParams)BaseTaskManager)rtc)VideoBufferType)connect)ConnectionClosedOKzException: zEIn order to use HeyGen, you need to `pip install pipecat-ai[heygen]`.zMissing module: i]  c                   @   s   e Zd ZdZdZdZdS )ServiceTypezEnum for HeyGen service types.INTERACTIVE_AVATARLIVE_AVATARN)__name__
__module____qualname____doc__r   r    r   r   R/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/services/heygen/client.pyr   6   s    r   c                   @   sR   e Zd ZU dZeg ed f ed< eeged f ed< eeged f ed< dS )HeyGenCallbacksa  Callback handlers for HeyGen events.

    Parameters:
        on_connected: Called when the bot connects to the LiveKit room.
        on_participant_connected: Called when a participant connects.
        on_participant_disconnected: Called when a participant disconnects.
    Non_connectedon_participant_connectedon_participant_disconnected)r   r   r   r   r   r   __annotations__strr   r   r   r    r!   =   s
   
 r!   c                   @   s  e Zd ZdZdddddedejdedee	e
ef  d	ee d
ededdfddZdd ZdeddfddZdQddZdededdfddZdQddZdd Zdd Zd eddfd!d"ZdQd#d$Zd%eddfd&d'Zd(eddfd)d*ZdQd+d,ZdQd-d.Z dQd/d0Z!e"defd1d2Z#e"defd3d4Z$d5e%d(eddfd6d7Z&d8d9 Z'd:d; Z(d(eddfd<d=Z)d>eddfd?d@Z*d>eddfdAdBZ+dCe,j-fdDdEZ.dCe,j/fdFdGZ0dHdI Z1dJdK Z2dLdM Z3dNe4j5fdOdPZ6dS )RHeyGenClienta  A client for interacting with HeyGen's Interactive Avatar Realtime API.

    This client manages both WebSocket and LiveKit connections for real-time avatar streaming,
    handling bi-directional audio/video communication and avatar control. It implements the API defined in
    https://docs.heygen.com/docs/interactive-avatar-realtime-api

    The client manages the following connections:
    1. WebSocket connection for avatar control and audio streaming
    2. LiveKit connection for receiving avatar video and audio

    Attributes:
        HEY_GEN_SAMPLE_RATE (int): The required sample rate for HeyGen's audio processing (24000 Hz)
    NF)session_requestservice_typeconnect_as_userapi_keysessionparamsr(   r)   	callbacksr*   returnc                C   s<  |dur|nt j| _|dur5|dur5|t jkr$t|ts$td d}n|t jkr5t|ts5td d}|du rK| jt jkrFtddd}ntdd}| jt jkrYt	||d	| _
nt||d	| _
d| _d| _d| _|| _d
| _d
| _d| _|| _|| _d| _d| _d| _d| _d| _d| _d
| _d
| _d| _d| _|| _dS )a  Initialize the HeyGen client.

        Args:
            api_key: HeyGen API key for authentication
            session: HTTP client session for API requests
            params: Transport configuration parameters
            session_request: Configuration for the HeyGen session (optional)
            service_type: Type of service to use
            callbacks: Callback handlers for HeyGen events
            connect_as_user: Whether to connect using the user token or not (default: False)
        NzeService type is LIVE_AVATAR but session_request is not SessionTokenRequest. Ignoring session_request.zjService type is INTERACTIVE_AVATAR but session_request is not NewSessionRequest. Ignoring session_request.Shawn_Therapist_publicv2)	avatar_idversionz$1c690fe7-23e0-49f9-bfba-14344450285b)r2   )r,   r   F        ) r   r   _service_typer   
isinstancer   r   warningr   r   _apir   _heyGen_session
_websocket_task_manager_params_in_sample_rate_out_sample_rate
_connected_session_request
_callbacks_event_queue_event_task_video_task_audio_task_video_frame_callback_audio_frame_callback_send_interval_next_send_time_audio_seconds_sent_transport_ready_connect_as_user)selfr+   r,   r-   r(   r)   r.   r*   r   r   r    __init__Z   sb   
zHeyGenClient.__init__c                    s   | j | jI d H | _td| jj  td| jj  td| jj  td| jj	  t
d| jj d| jj	  t
d d S )NzHeyGen sessionId: zHeyGen realtime_endpoint: zHeyGen livekit URL: zHeyGen livekit token: z5Full Link: https://meet.livekit.io/custom?liveKitUrl=z&token=zHeyGen session started)r8   new_sessionr@   r9   r   debug
session_idws_urllivekit_urlaccess_tokeninforM   r   r   r    _initialize   s   zHeyGenClient._initializesetupc              
      s   | j durtd dS |j| _z|  I dH  t | _| j	| 
| j|  d| _W dS  tyQ } ztd|  |  I dH  W Y d}~dS d}~ww )zSetup the client and initialize the conversation.

        Establishes a new session with HeyGen's API if one doesn't exist.

        Args:
            setup: The frame processor setup configuration.
        Nz"heygen_session already initializedz::event_callback_taskzFailed to setup HeyGenClient: )r9   r   rP   task_managerr;   rW   asyncioQueuerB   create_task_callback_task_handlerrC   	Exceptionerrorcleanup)rM   rX   er   r   r    rX      s"   



zHeyGenClient.setupc              
      s   z2| j dur| j| j jI dH  d| _ d| _| jr.| jr1| j| jI dH  d| _W dS W dS W dS  tyN } zt	
d|  W Y d}~dS d}~ww )zgCleanup client resources.

        Closes the active HeyGen session and resets internal state.
        NFzException during cleanup: )r9   r8   close_sessionrQ   r?   rC   r;   cancel_taskr^   r   r_   rM   ra   r   r   r    r`      s   
zHeyGenClient.cleanupframeaudio_chunk_sizec                    s   | j rtd dS td | jjp|j| _| jjp|j| _|| j d | _td| j  | 	 I dH  | 
 I dH  | | jj dS )a|  Start the client and establish all necessary connections.

        Initializes WebSocket and LiveKit connections using the provided configuration.
        Sets up audio processing with the specified sample rates.

        Args:
            frame: Initial configuration frame containing audio parameters
            audio_chunk_size: Audio chunk size for output processing
        zheygen client already startedNzHeyGenClient starting   zHeyGenClient send_interval: )r:   r   rP   r<   audio_in_sample_rater=   audio_out_sample_rater>   rH   _ws_connect_livekit_connect_call_event_callbackrA   r"   )rM   re   rf   r   r   r    start   s   


zHeyGenClient.startc                    s:   t d |  I dH  |  I dH  |  I dH  dS )zStop the client and terminate all connections.

        Disconnects from WebSocket and LiveKit endpoints, and performs cleanup.
        zHeyGenVideoService stoppingN)r   rP   _ws_disconnect_livekit_disconnectr`   rV   r   r   r    stop  s
   
zHeyGenClient.stopc              
      s   z,| j rtd W dS td t| jjdI dH | _ d| _| jj| 	 dd| _
W dS  tyM } zt|  d|  d| _ W Y d}~dS d}~ww )	z%Connect to HeyGen websocket endpoint.z"HeyGenClient ws already connected!NzHeyGenClient ws connecting)uriTHeyGenClient_Websocketnamez initialization error: )r:   r   rP   websocket_connectr9   rR   r?   r;   r\   _ws_receive_task_handler_receive_taskr^   r_   rd   r   r   r    rj     s$   

zHeyGenClient._ws_connectc              
      s   | j rEz| j I dH }t|}| |I dH  W n$ ty%   Y dS  ty? } zt	d|  W Y d}~dS d}~ww | j sdS dS )z#Handle incoming WebSocket messages.Nz$Error processing WebSocket message: )
r?   r:   recvjsonloads_handle_ws_server_eventr   r^   r   r_   )rM   messageparsed_messagera   r   r   r    rv     s   
z%HeyGenClient._ws_receive_task_handlereventc                    s<   | d}|dkrtd|  dS td|  dS )z&Handle an event from HeyGen websocket.typezagent.statez'HeyGenClient ws received agent status: z(HeyGenClient ws received unknown event: N)getr   rP   trace)rM   r~   
event_typer   r   r    r{   ,  s
   
z$HeyGenClient._handle_ws_server_eventc              
      s~   z9zd| _ | jr| j I dH  W n ty. } zt|  d|  W Y d}~nd}~ww W d| _dS W d| _dS d| _w )z*Disconnect from HeyGen websocket endpoint.FNz disconnect error: )r?   r:   closer^   r   r_   rd   r   r   r    rn   4  s    
zHeyGenClient._ws_disconnectr|   c              
      st   | j st|  d dS z| jr!| jt|I dH  W dS W dS  ty9 } z
td|  |d}~ww )z#Send a message to HeyGen websocket.z$ websocket is not connected anymore.Nz+Error sending message to HeyGen websocket: )	r?   r   rP   r:   sendry   dumpsr^   r_   )rM   r|   ra   r   r   r    _ws_send?  s   zHeyGenClient._ws_sendevent_idc                    s.   t d |   | d|dI dH  dS )zInterrupt the avatar's current action.

        Stops the current animation/speech and returns the avatar to idle state.
        Useful for handling user interruptions during avatar speech.
        zHeyGenClient interruptzagent.interruptr   r   N)r   rP   _reset_audio_timingr   rM   r   r   r   r    	interruptK  s   
zHeyGenClient.interruptc                    s.   t d | dtt dI dH  dS )zStart the avatar's listening animation.

        Triggers visual cues indicating the avatar is listening to user input.
        z"HeyGenClient start_agent_listeningzagent.start_listeningr   N)r   rP   r   r&   uuiduuid4rV   r   r   r    start_agent_listeningZ  s   

z"HeyGenClient.start_agent_listeningc                    s$   |  dtt dI dH  dS )zoStop the avatar's listening animation.

        Returns the avatar to idle state from listening state.
        zagent.stop_listeningr   N)r   r&   r   r   rV   r   r   r    stop_agent_listeningg  s   
z!HeyGenClient.stop_agent_listeningc                 C   s
   d| _ dS )zHIndicates that the output transport is ready and able to receive frames.TN)rK   rV   r   r   r    transport_readys  s   
zHeyGenClient.transport_readyc                 C      | j S )z`Get the output sample rate.

        Returns:
            The output sample rate in Hz.
        )r>   rV   r   r   r    out_sample_ratew     zHeyGenClient.out_sample_ratec                 C   r   )z^Get the input sample rate.

        Returns:
            The input sample rate in Hz.
        )r=   rV   r   r   r    in_sample_rate  r   zHeyGenClient.in_sample_rateaudioc                    s<   t |d}| d||dI dH  |  I dH  dS )zSend audio data to the agent speak.

        Args:
            audio: Audio data in base64 encoded format
            event_id: Unique identifier for the event
        zutf-8zagent.speak)r   r   r   N)base64	b64encodedecoder   _write_audio_sleep)rM   r   r   audio_base64r   r   r    agent_speak  s   
zHeyGenClient.agent_speakc                 C   s   d| _ d| _dS )z%Reset audio timing control variables.r4   r   N)rJ   rI   rV   r   r   r    r     s   
z HeyGenClient._reset_audio_timingc                    s   | j dk r|  j | j7  _ t | j | _dS t }td| j| }|dkr:t|I dH  |  j| j7  _dS t | j | _dS )z7Simulate audio playback timing with appropriate delays.g      @Nr   )rJ   rH   time	monotonicrI   maxrZ   sleep)rM   current_timesleep_durationr   r   r    r     s   
zHeyGenClient._write_audio_sleepc                    s$   |    | d|dI dH  dS )zSend signaling that the agent has finished speaking.

        Args:
            event_id: Unique identifier for the event
        zagent.speak_endr   N)r   r   r   r   r   r    agent_speak_end  s   zHeyGenClient.agent_speak_endparticipant_idc                    s   t d| d| j  || _| jdurt d dS | jr`|| jjv rb| jj| }|j	 D ]3}|j
tjjkr_|jdur_t d|j  tj|j| jd}| jj| |dd| _ dS q0dS dS dS )	zCapture audio frames from the HeyGen avatar.

        Args:
            participant_id: Identifier of the participant to capture audio from
            callback: Async function to handle received audio frames
        zcapture_participant_audio: z, sample_rate: NLTrying to capture more than one audio stream. It is currently not supported.z+Starting audio capture for existing track: tracksample_rateHeyGenClient_Receive_Audiors   )r   rP   r=   rG   rE   r7   _livekit_roomremote_participantstrack_publicationsvalueskindr   	TrackKind
KIND_AUDIOr   sidAudioStreamr;   r\   _process_audio_frames)rM   r   callbackparticipant	track_pubaudio_streamr   r   r    capture_participant_audio  s2   

z&HeyGenClient.capture_participant_audioc                    s   t d|  || _| jdurt d dS | jrY|| jjv r[| jj| }|j D ]0}|j	t
jjkrX|jdurXt d|j  t
|j}| jj| |dd| _ dS q,dS dS dS )zCapture video frames from the HeyGen avatar.

        Args:
            participant_id: Identifier of the participant to capture video from
            callback: Async function to handle received video frames
        zcapture_participant_video: Nr   z+Starting video capture for existing track: HeyGenClient_Receive_Videors   )r   rP   rF   rD   r7   r   r   r   r   r   r   r   
KIND_VIDEOr   r   VideoStreamr;   r\   _process_video_frames)rM   r   r   r   r   video_streamr   r   r    capture_participant_video  s*   

z&HeyGenClient.capture_participant_videostreamc                    s   zvzKt d |2 z@3 dH W }z |j}t|j}t||jdd}| jr/| jr/| |I dH  W q
 t	yJ } zt 
d|  W Y d}~q
d}~ww 6 W n t	yg } zt 
d|  W Y d}~nd}~ww W t d dS W t d dS t d w )z)Process audio frames from LiveKit stream.z"Starting audio frame processing...N   )r   r   num_channelszError processing audio frame: zError processing audio frames: zAudio frame processing ended.)r   rP   re   bytesdatar	   r   rK   rG   r^   r_   )rM   r   frame_eventaudio_frame
audio_datara   r   r   r    r     s8   

z"HeyGenClient._process_audio_framesc                    s"  zz^t d |2 zS3 dH W }z3|j}|jtjkr |tj}tt|j	|j
|jfdd}|jd |_| jrB| jrB| |I dH  W q
 ty] } zt d|  W Y d}~q
d}~ww 6 W n tyz } zt d|  W Y d}~nd}~ww W t d dS W t d dS t d w )	z)Process video frames from LiveKit stream.z"Starting video frame processing...NRGB)imagesizeformati  z)Error processing individual video frame: zError processing video frames: zVideo frame processing ended.)r   rP   re   r   r   RGB24convertr
   r   r   widthheighttimestamp_usptsrK   rF   r^   r_   )rM   r   r   video_frameimage_framera   r   r   r    r     s<   

z"HeyGenClient._process_video_framesc           	   
      s  zt d jj  t  _ jddtjf fdd} jddtj	dtj
dtjf fd	d
} jddtj	dtj
dtjfdd} jddtjf fdd} jsb jjn jj} j jj|I dH  t d jj  t d jjj  t dt jj   jj D ]1}t d|j d|j    jj|j |j D ]}t d|j d|j d|j  qqW dS  ty } zt d|  d _W Y d}~dS d}~ww )zConnect to LiveKit room.z-HeyGenClient livekit connecting to room URL: participant_connectedr   c              	      sb   t d| j d| j  | j D ]}t d|j d|j d|j  q  j	j
| j d S )NzParticipant connected - SID: , Identity: zAvailable track - SID: , Kind: , Name: )r   rP   r   identityr   r   r   rt   rl   rA   r#   )r   r   rV   r   r    r#   9  s   
z?HeyGenClient._livekit_connect.<locals>.on_participant_connectedtrack_subscribedr   publicationc                    s   | j tjjkr- jd ur- jd u r-td|j  t	| } j
j |dd _d S | j tjjkr] jd ur_ jd u ratd|j  tj|  jd} j
j |dd _d S d S d S d S )Nz+Creating video stream processor for track: r   rs   z+Creating audio stream processor for track: r   r   )r   r   r   r   rF   rD   r   rP   r   r   r;   r\   r   r   rG   rE   r   r=   r   )r   r   r   r   r   rV   r   r    on_track_subscribedF  s&   






z:HeyGenClient._livekit_connect.<locals>.on_track_subscribedtrack_unsubscribedc                 S   s   t d|j d| j  d S )NzTrack unsubscribed - SID: r   )r   rP   r   r   )r   r   r   r   r   r    on_track_unsubscribeda  s   z<HeyGenClient._livekit_connect.<locals>.on_track_unsubscribedparticipant_disconnectedc                    s0   t d| j d| j    jj| j d S )Nz Participant disconnected - SID: r   )r   rP   r   r   rl   rA   r$   )r   rV   r   r    r$   i  s   
zBHeyGenClient._livekit_connect.<locals>.on_participant_disconnectedNz(Successfully connected to LiveKit room: zLocal participant SID: zNumber of remote participants: zExisting participant - SID: r   zExisting track - SID: r   r   zLiveKit initialization error: )r   rP   r9   rS   r   Roomr   onRemoteParticipantTrackRemoteTrackPublicationrL   livekit_agent_tokenrT   r   rt   local_participantr   lenr   r   r   rl   rA   r#   r   r   r^   r_   )	rM   r#   r   r   r$   rT   r   r   ra   r   rV   r    rk   1  sl   







zHeyGenClient._livekit_connectc              
      s   zCt d | jr| j| jI dH  d| _| jr'| j| jI dH  d| _| jrBt d | j I dH  d| _t d W dS W dS  ty_ } zt 	d|  W Y d}~dS d}~ww )zDisconnect from LiveKit room.zStarting LiveKit disconnect...NzDisconnecting from LiveKit roomz+Successfully disconnected from LiveKit roomzLiveKit disconnect error: )
r   rP   rD   r;   rc   rE   r   
disconnectr^   r_   rd   r   r   r    ro     s&   

z HeyGenClient._livekit_disconnectc                 G   s   | j |g|R  dS )z,Queue an event callback for async execution.N)rB   
put_nowait)rM   r   argsr   r   r    rl     s   z!HeyGenClient._call_event_callbackqueuec                    s.   	 |  I dH ^}}|| I dH  |  q)z1Handle queued callbacks from the specified queue.TN)r   	task_done)rM   r   r   r   r   r   r    r]     s   z#HeyGenClient._callback_task_handler)r/   N)7r   r   r   r   r&   aiohttpClientSessionr   r   r   r   r   r   r!   boolrN   rW   r   rX   r`   r   intrm   rp   rj   rv   dictr{   rn   r   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   rk   ro   rl   rZ   r[   r]   r   r   r   r    r'   K   sh    	

]





_r'   )5r   rZ   r   ry   r   r   enumr   typingr   r   r   r   r   logurur   pydanticr   pipecat.frames.framesr	   r
   r   "pipecat.processors.frame_processorr   .pipecat.services.heygen.api_interactive_avatarr   r   &pipecat.services.heygen.api_liveavatarr   r    pipecat.services.heygen.base_apir   !pipecat.transports.base_transportr   "pipecat.utils.asyncio.task_managerr   livekitr   "livekit.rtc._proto.video_frame_pb2r   websockets.asyncio.clientr   ru   websockets.exceptionsr   ModuleNotFoundErrorra   r_   r^   HEY_GEN_SAMPLE_RATEr   r!   r'   r   r   r   r    <module>   sB   
