o
    i@                  
   @   s  d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
mZmZmZmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZmZmZmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* dd
l+m,Z, ddl-m.Z. ddl/m0Z0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z9m:Z: ddl;m<Z<m=Z=m>Z>m?Z? ddl@mAZA ddlBmCZC zddlDmEZF W n  eGy ZH zeIdeH  eId eJdeH dZH[Hww eG dd dZKeG dd de=ZLG dd de:ZMdS )zGrok Realtime Voice Agent LLM service implementation with WebSocket support.

Based on xAI's Grok Voice Agent API documentation:
https://docs.x.ai/docs/guides/voice/agent
    N)	dataclassfield)fields)AnyDictMappingOptionalType)logger)ToolsSchema)GrokRealtimeLLMAdapter)AggregationTypeBotStoppedSpeakingFrameCancelFrameEndFrameFrameInputAudioRawFrameInterruptionFrameLLMContextFrameLLMFullResponseEndFrameLLMFullResponseStartFrameLLMMessagesAppendFrameLLMSetToolsFrameLLMTextFrame
StartFrameTranscriptionFrameTTSAudioRawFrameTTSStartedFrameTTSStoppedFrameTTSTextFrameUserStartedSpeakingFrameUserStoppedSpeakingFrame)LLMTokenUsage)
LLMContext)LLMAssistantAggregatorParamsLLMUserAggregatorParams)LLMContextAggregatorPair)OpenAILLMContext)FrameDirection)FunctionCallFromLLM
LLMService)	NOT_GIVENLLMSettings	_NotGivenis_given)time_now_iso8601   )events)connectzException: zJIn order to use Grok Realtime, you need to `pip install pipecat-ai[grok]`.zMissing module: c                   @   s6   e Zd ZU dZeed< eed< eed< dZeed< dS )CurrentAudioResponseao  Tracks the current audio response from the assistant.

    Parameters:
        item_id: Unique identifier for the audio response item.
        content_index: Index of the audio content within the item.
        start_time_ms: Timestamp when the audio response started in milliseconds.
        total_size: Total size of audio data received in bytes. Defaults to 0.
    item_idcontent_indexstart_time_msr   
total_sizeN)__name__
__module____qualname____doc__str__annotations__intr7    r?   r?   V/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/services/grok/realtime/llm.pyr3   M   s   
 	r3   c                       s   e Zd ZU dZedd dZejeB e	d< e
ddd	Zd
ddeeef f fddZeded deeef ddfddZ  ZS )GrokRealtimeLLMSettingsa  Settings for GrokRealtimeLLMService.

    Parameters:
        session_properties: Grok Realtime session properties (voice, audio config,
            tools, etc.).  ``instructions`` is synced bidirectionally with the
            top-level ``system_instruction`` field.
    c                   C   s   t S )N)r+   r?   r?   r?   r@   <lambda>i   s    z GrokRealtimeLLMSettings.<lambda>)default_factorysession_propertiessettingsGrokRealtimeLLMService.Settingsc                 C   s.   t | jsdS | j}t | jr| j|_dS dS )zBPush top-level ``system_instruction`` into ``session_properties``.N)r.   rD   system_instructioninstructions)rE   spr?   r?   r@   _sync_top_level_to_spn   s   

z-GrokRealtimeLLMSettings._sync_top_level_to_spdeltareturnc                    sd   t  |}d|v r+t| jr+| j}d|vr+|jdur+| j}|j| _|| jkr+||d< | |  |S )a   Merge a delta, keeping ``system_instruction`` in sync with SP.

        When the delta contains ``session_properties``, it **replaces** the
        stored SP wholesale (matching legacy behaviour).  Top-level field
        values always take precedence over conflicting SP values.
        rD   rG   N)superapply_updater.   rD   rH   rG   rJ   )selfrK   changedrI   old_si	__class__r?   r@   rN   y   s   	

z$GrokRealtimeLLMSettings.apply_updateclsc                 C   s   dd t | D dh }i }i }i }ttjj }| D ]!\}}| j||}	|	|v r2|||	< q|	|v r;|||	< q|||< q|rLtjdi ||d< | di |}
||
_	|
S )a1  Build a delta from a plain dict, routing SP keys into ``session_properties``.

        Keys that correspond to ``SessionProperties`` fields are collected into
        a nested ``session_properties`` value.  ``model`` is always routed to
        the top-level field.  Unknown keys go to ``extra``.
        c                 S   s   h | ]}|j qS r?   )name).0fr?   r?   r@   	<setcomp>   s    z7GrokRealtimeLLMSettings.from_mapping.<locals>.<setcomp>extrarD   Nr?   )
dataclass_fieldssetr1   SessionPropertiesmodel_fieldskeysitems_aliasesgetrY   )rT   rE   own_field_namestopsp_dictrY   sp_keyskeyvalue	canonicalinstancer?   r?   r@   from_mapping   s"   


z$GrokRealtimeLLMSettings.from_mapping)rE   rF   )r8   r9   r:   r;   r   rD   r1   r\   r-   r=   staticmethodrJ   r   r<   r   rN   classmethodr	   r   rj   __classcell__r?   r?   rR   r@   rA   ^   s    
 

rA   c                       s~  e Zd ZU dZeZeed< eZdddddde	de	d	e
ej d
e
e def
 fddZdefddZdefddZde	de
e fddZdefddZdefddZdd Zdd Zdd  Zd!d" Z	#d{d$ed%ed&edefd'd(Zd)d* Zd+ed,efd-d.Zd/ef fd0d1Zd/ef fd2d3Zd/e f fd4d5Z!d/e"de#f fd6d7Z$d8e%fd9d:Z&d;d< Z'd=ej(fd>d?Z)d@dA Z*dBdC Z+dDdE Z, fdFdGZ-dHdI Z.dJdK Z/dLdM Z0dNdO Z1dPdQ Z2dRdS Z3dTdU Z4dVdW Z5dXdY Z6dZd[ Z7d\d] Z8d^e	fd_d`Z9dadb Z:dcdd Z;dedf Z<dgdh Z=didj Z>dkdl Z?dmefdndoZ@dpdq ZAdre	dse	fdtduZBeC eD dvd8eEdweCdxeDdeFfdydzZG  ZHS )|GrokRealtimeLLMServicea  Grok Realtime Voice Agent LLM service providing real-time audio and text communication.

    Implements the Grok Voice Agent API with WebSocket communication for low-latency
    bidirectional audio and text interactions. Supports function calling, conversation
    management, and real-time transcription.

    Features:
        - Real-time audio streaming (PCM, PCMU, PCMA formats)
        - Configurable sample rates (8kHz to 48kHz for PCM)
        - Multiple voice options (Ara, Rex, Sal, Eve, Leo)
        - Built-in tools (web_search, x_search, file_search)
        - Custom function calling
        - Server-side VAD (Voice Activity Detection)
    	_settingszwss://api.x.ai/v1/realtimeNF)base_urlrD   rE   start_audio_pausedapi_keyrp   rD   rE   rq   c                   s   | j dddddddddddt d}|dur*td| j d ||_|jdur*|j|_| j | |dur9|| t	 j
d	||d| || _|| _|| _d| _d| _d| _d| _d| _d| _d| _d| _d| _i | _i | _t | _| d | d dS )
a  Initialize the Grok Realtime Voice Agent LLM service.

        Args:
            api_key: xAI API key for authentication.
            base_url: WebSocket base URL for the realtime API.
                Defaults to "wss://api.x.ai/v1/realtime".
            session_properties: Configuration properties for the realtime session.
                If None, uses default SessionProperties with voice "Ara".

                .. deprecated:: 0.0.105
                    Use ``settings=GrokRealtimeLLMService.Settings(session_properties=...)``
                    instead.

                To set a different voice, configure it in session_properties:

                    session_properties = events.SessionProperties(voice="Rex")

                Available voices: Ara, Rex, Sal, Eve, Leo.
            settings: Runtime-updatable settings for this service.
            start_audio_paused: Whether to start with audio input paused. Defaults to False.
            **kwargs: Additional arguments passed to parent LLMService.
        NF)modelrG   temperature
max_tokenstop_ptop_kfrequency_penaltypresence_penaltyseedfilter_incomplete_user_turnsuser_turn_completion_configrD   rD   )rp   rE   Ton_conversation_item_createdon_conversation_item_updatedr?   )Settingsr1   r\   _warn_deprecated_paramrD   rH   rG   rJ   rN   rM   __init__rr   rp   _audio_input_paused
_websocket_receive_task_context_llm_needs_conversation_setup_disconnecting_api_session_ready_run_llm_when_api_session_ready_current_assistant_response_current_audio_response_messages_added_manually_pending_function_callsr[   _completed_tool_calls_register_event_handler)rO   rr   rp   rD   rE   rq   kwargsdefault_settingsrR   r?   r@   r      sb   !


zGrokRealtimeLLMService.__init__rL   c                 C   s   dS )zCheck if the service can generate usage metrics.

        Returns:
            True if metrics generation is supported.
        Tr?   rO   r?   r?   r@   can_generate_metrics3  s   z+GrokRealtimeLLMService.can_generate_metricspausedc                 C   s
   || _ dS )zzSet whether audio input is paused.

        Args:
            paused: True to pause audio input, False to resume.
        N)r   )rO   r   r?   r?   r@   set_audio_input_paused;  s   
z-GrokRealtimeLLMService.set_audio_input_paused	directionc                 C   s`   | j jjsdS |dkr| j jjjn| j jjj}|r.|jr.t|jdr&|jjS |jjdv r.dS dS )a!  Get manually configured sample rate for input or output.

        Args:
            direction: Either "input" or "output".

        Returns:
            Configured sample rate or None if not manually configured.
            For PCMU/PCMA formats, returns 8000 Hz (G.711 standard).
        Ninputrate)z
audio/pcmuz
audio/pcmai@  )	ro   rD   audior   outputformathasattrr   type)rO   r   audio_configr?   r?   r@   _get_configured_sample_rateC  s   



z2GrokRealtimeLLMService._get_configured_sample_ratec                 C   s   |  d}|du rtd|S )zGet the output sample rate from session properties.

        Returns:
            Output sample rate in Hz.

        Note:
            This assumes start() has been called, which guarantees
            session_properties.audio.output exists.
        r   Nz"Output sample rate not configured.)r   RuntimeError)rO   r   r?   r?   r@   _get_output_sample_rate`  s   

z.GrokRealtimeLLMService._get_output_sample_ratec                 C   s   | j jjr| j jjjdkS dS )z$Check if server-side VAD is enabled.
server_vadF)ro   rD   turn_detectionr   r   r?   r?   r@   _is_turn_detection_enabledo  s   
z1GrokRealtimeLLMService._is_turn_detection_enabledc                    s   |   s| t I dH  | t I dH  |  I dH  |  I dH  | jr>| t	 I dH  | t
 I dH  dS dS )z-Handle user interruption of assistant speech.N)r   send_client_eventr1   InputAudioBufferClearEventResponseCancelEvent _truncate_current_audio_responsestop_all_metricsr   
push_framer   r   r   r?   r?   r@   _handle_interruptionu  s   z+GrokRealtimeLLMService._handle_interruptionc                       dS )z#Handle user started speaking event.Nr?   rO   framer?   r?   r@   _handle_user_started_speaking     z4GrokRealtimeLLMService._handle_user_started_speakingc                    s:   |   s| t I dH  | t I dH  dS dS )z#Handle user stopped speaking event.N)r   r   r1   InputAudioBufferCommitEventResponseCreateEventr   r?   r?   r@   _handle_user_stopped_speaking  s
   z4GrokRealtimeLLMService._handle_user_stopped_speakingc                    s   d| _ dS )z"Handle bot stopped speaking event.N)r   r   r?   r?   r@   _handle_bot_stopped_speaking  s   
z3GrokRealtimeLLMService._handle_bot_stopped_speaking   total_bytessample_ratebytes_per_samplec                 C   s,   |du r|   }|| }|| }t|d S )zGCalculate audio duration in milliseconds based on PCM audio parameters.N  )r   r>   )rO   r   r   r   samplesduration_secondsr?   r?   r@   _calculate_audio_duration_ms  s
   z3GrokRealtimeLLMService._calculate_audio_duration_msc              
      sP   | j sdS zd| _ W dS  ty' } ztd|  W Y d}~dS d}~ww )zTruncates the current audio response.

        Note: Grok may not support truncation events like OpenAI.
        This is a best-effort cleanup.
        Nz-Audio truncation cleanup failed (non-fatal): )r   	Exceptionr
   warningrO   er?   r?   r@   r     s   z7GrokRealtimeLLMService._truncate_current_audio_responseinput_sample_rateoutput_sample_ratec                 C   s`   | j j}|jst |_|jjstjtj|dd|j_|jjs.tj	tj|dd|j_dS dS )a)  Ensure session_properties.audio has input and output configs.

        Fills in any missing audio configuration using the given sample rates.

        Args:
            input_sample_rate: Sample rate for audio input (Hz).
            output_sample_rate: Sample rate for audio output (Hz).
        )r   )r   N)
ro   rD   r   r1   AudioConfigurationr   
AudioInputPCMAudioFormatr   AudioOutput)rO   r   r   propsr?   r?   r@   _ensure_audio_config  s   	



z+GrokRealtimeLLMService._ensure_audio_configr   c                    s6   t  |I dH  | |j|j |  I dH  dS )zStart the service and establish WebSocket connection.

        Args:
            frame: The start frame triggering service initialization.
        N)rM   startr   audio_in_sample_rateaudio_out_sample_rate_connectr   rR   r?   r@   r     s   zGrokRealtimeLLMService.startc                    &   t  |I dH  |  I dH  dS )zStop the service and close WebSocket connection.

        Args:
            frame: The end frame triggering service shutdown.
        N)rM   stop_disconnectr   rR   r?   r@   r        zGrokRealtimeLLMService.stopc                    r   )zCancel the service and close WebSocket connection.

        Args:
            frame: The cancel frame triggering service cancellation.
        N)rM   cancelr   r   rR   r?   r@   r     r   zGrokRealtimeLLMService.cancelc                    s  t  ||I dH  t|trnpt|tr | |jI dH  nat|tr1| js0| 	|I dH  nPt|t
r>|  I dH  nCt|trL| |I dH  n5t|trZ| |I dH  n't|trg|  I dH  nt|tru| |I dH  nt|tr|  I dH  | ||I dH  dS )zProcess incoming frames from the pipeline.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        N)rM   process_frame
isinstancer   r   _handle_contextcontextr   r   _send_user_audior   r   r    r   r!   r   r   r   r   _handle_messages_appendr   _send_session_updater   )rO   r   r   rR   r?   r@   r     s.   








z$GrokRealtimeLLMService.process_framer   c                    sN   | j s|| _ | jddI dH  |  I dH  dS || _ | jddI dH  dS )zHandle LLM context updates.Fsend_new_resultsNT)r   !_process_completed_function_calls_create_response)rO   r   r?   r?   r@   r      s   z&GrokRealtimeLLMService._handle_contextc                    s   t d dS )z)Handle appending messages to the context.z<LLMMessagesAppendFrame not yet implemented for Grok RealtimeN)r
   r   r   r?   r?   r@   r   
  s   z.GrokRealtimeLLMService._handle_messages_appendeventc                    s   |  |jddI dH  dS )zuSend a client event to the Grok Voice Agent API.

        Args:
            event: The client event to send.
        T)exclude_noneN)_ws_send
model_dump)rO   r   r?   r?   r@   r     s   z(GrokRealtimeLLMService.send_client_eventc              
      s   z"| j rW dS t| jdd| j idI dH | _ | |  | _W dS  tyF } z| jd| |dI dH  d| _ W Y d}~dS d}~ww )z'Establish WebSocket connection to Grok.NAuthorizationzBearer )uriadditional_headerszError connecting to Grok: 	error_msg	exception)	r   websocket_connectrp   rr   create_task_receive_task_handlerr   r   
push_errorr   r?   r?   r@   r     s   zGrokRealtimeLLMService._connectc              
      s   z6d| _ d| _|  I dH  | jr| j I dH  d| _| jr.| j| jddI dH  d| _t | _d| _ W dS  t	yW } z| j
d| |dI dH  W Y d}~dS d}~ww )zClose WebSocket connection.TFNg      ?)timeoutzError disconnecting: r   )r   r   r   r   closer   cancel_taskr[   r   r   r   r   r?   r?   r@   r   +  s"   (z"GrokRealtimeLLMService._disconnectc              
      s   z| j s| jr| jt|I dH  W dS W dS W dS  tyI } z!| j s*| js1W Y d}~dS | jd| |dI dH  W Y d}~dS d}~ww )z-Send a message over the WebSocket connection.NzError sending client event: r   )r   r   sendjsondumpsr   r   )rO   realtime_messager   r?   r?   r@   r   ?  s   (zGrokRealtimeLLMService._ws_sendc                    s|   |  d}|  d}t |I dH }d|v r"|r"|r"| || ddh}| |@ r3|  I dH  | | |  |S )z=Apply a settings delta, sending a session update when needed.r   r   NrD   rG   )r   rM   _update_settingsr   r^   r    _warn_unhandled_updated_settings)rO   rK   
input_rateoutput_raterP   handledrR   r?   r@   r   I  s   

z'GrokRealtimeLLMService._update_settingsc                    s   | j j}|  }| jr$|| j}|d r|d |_|d r$|d |_|jr4t|jtr4|	|j|_| 
tj|dI dH  dS )z&Update session settings on the server.toolsrG   )sessionN)ro   rD   get_llm_adapterr   get_llm_invocation_paramsr   rH   r   r   from_standard_toolsr   r1   SessionUpdateEvent)rO   rE   adapterllm_invocation_paramsr?   r?   r@   r   \  s   

z+GrokRealtimeLLMService._send_session_updatec                    sV  | j 2 z"3 dH W }zt|}W n ty, } ztd|  W Y d}~qd}~ww |jdkr3q|jdkrA| |I dH  q|jdkrO| |I dH  q|jdkr]| 	|I dH  q|jdkrk| 
|I dH  q|jdkry| |I dH  q|jd	krq|jd
krq|jdkr| |I dH  q|jdkrq|jdkr| |I dH  q|jdkr| |I dH  q|jdkr| |I dH  q|jdkr| |I dH  q|jdkr| |I dH  q|jdkr| |I dH  q|jdkrq|jdkr| |I dH  q|jdkr'|jjdv rt|  d|jj  q| |I dH   dS q6 dS )z#Handle incoming WebSocket messages.NzFailed to parse server event: pingzconversation.createdzsession.updatedzresponse.createdzresponse.output_audio.deltazresponse.output_audio.donezresponse.content_part.addedzresponse.content_part.donezresponse.output_item.addedzresponse.output_item.donezconversation.item.addedz5conversation.item.input_audio_transcription.completedzresponse.donez!input_audio_buffer.speech_startedz!input_audio_buffer.speech_stoppedz&response.output_audio_transcript.deltaz&response.function_call_arguments.deltaz%response.function_call_arguments.doneerror)response_cancel_not_active(conversation_already_has_active_response )r   r1   parse_server_eventr   r
   r   r    _handle_evt_conversation_created_handle_evt_session_updated_handle_evt_response_created_handle_evt_audio_delta_handle_evt_audio_done#_handle_evt_conversation_item_added/_handle_evt_input_audio_transcription_completed_handle_evt_response_done_handle_evt_speech_started_handle_evt_speech_stopped"_handle_evt_audio_transcript_delta(_handle_evt_function_call_arguments_doner   codedebugmessage_handle_evt_error)rO   r  evtr   r?   r?   r@   r   t  sh   
















z,GrokRealtimeLLMService._receive_task_handlerc                    s   |   I dH  dS )zAHandle conversation.created event - first event after connecting.N)r   rO   r  r?   r?   r@   r    s   z7GrokRealtimeLLMService._handle_evt_conversation_createdc                    r   )z<Handle response.created event - response generation started.Nr?   r  r?   r?   r@   r    r   z3GrokRealtimeLLMService._handle_evt_response_createdc                    s*   d| _ | jrd| _|  I dH  dS dS )zHandle session.updated event.TFN)r   r   r   r  r?   r?   r@   r    s   z2GrokRealtimeLLMService._handle_evt_session_updatedc                    s   |   I dH  | js$t|j|jtt d d| _| t I dH  t	
|j}| j jt|7  _t||  dd}| |I dH  dS )z:Handle audio delta event - streaming audio from assistant.Nr   )r4   r5   r6   r0   )r   r   num_channels)stop_ttfb_metricsr   r3   r4   r5   r>   timer   r   base64	b64decoderK   r7   lenr   r   )rO   r  r   r   r?   r?   r@   r    s"   z.GrokRealtimeLLMService._handle_evt_audio_deltac                    s"   | j r| t I dH  dS dS )zHandle audio done event.N)r   r   r   r  r?   r?   r@   r	       z-GrokRealtimeLLMService._handle_evt_audio_donec                    s   |j jdkr"|j j| jvr|j | j|j j< ntd|j j d | d|j j|j I dH  | j	|j jr?| j|j j= dS |j j
dkrT|j | _| t I dH  dS dS )z%Handle conversation.item.added event.function_callzFunction call z already tracked, skippingr}   N	assistant)itemr   call_idr   r
   r  _call_event_handleridr   ra   roler   r   r   r  r?   r?   r@   r
    s   z:GrokRealtimeLLMService._handle_evt_conversation_item_addedc                    sZ   |  d|jdI dH  |jr|j nd}|r+| t|dt |dtjI dH  dS dS )z1Handle input audio transcription completed event.r~   N )result)	r"  r4   
transcriptstripr   r   r/   r(   UPSTREAM)rO   r  r'  r?   r?   r@   r    s   zFGrokRealtimeLLMService._handle_evt_input_audio_transcription_completedc                    s   |j p|jj }|r%|jr%t|jpd|jpd|jpdd}| |I dH  |  I dH  | t	 I dH  d| _
|jjdkrUd}|jjrJt|jj}| j|dI dH  dS |jjD ]}| d|j|I dH  qYdS )zHandle response.done event.r   )prompt_tokenscompletion_tokenstotal_tokensNfailedzResponse failedr   r~   )usageresponser,  r"   input_tokensoutput_tokensstart_llm_usage_metricsstop_processing_metricsr   r   r   statusstatus_detailsr<   r   r   r"  r#  )rO   r  r/  tokensr   r   r?   r?   r@   r    s*   
z0GrokRealtimeLLMService._handle_evt_response_donec                    s"   |j r| |j I dH  dS dS )z$Handle audio transcript delta event.N)rK   #_push_output_transcript_text_framesr  r?   r?   r@   r    r  z9GrokRealtimeLLMService._handle_evt_audio_transcript_deltatextc                    sH   t |}d|_| |I d H  t|tjd}d|_| |I d H  d S )NF)aggregated_byT)r   append_to_contextr   r   r   SENTENCEincludes_inter_frame_spaces)rO   r9  llm_text_frametts_text_framer?   r?   r@   r8    s   
z:GrokRealtimeLLMService._push_output_transcript_text_framesc              
      s   z@t |j}| j|j}|r6| j|j= t| j|j|j|dg}| 	|I dH  t
d|j  W dS t
d|j  W dS  ty\ } zt
d|  W Y d}~dS d}~ww )z*Handle function call arguments done event.)r   tool_call_idfunction_name	argumentsNzProcessed function call: z,No tracked function call found for call_id: z+Failed to process function call arguments: )r   loadsrB  r   ra   r!  r)   r   rU   run_function_callsr
   r  r   r   r   )rO   r  argsfunction_call_itemfunction_callsr   r?   r?   r@   r  /  s(   
	z?GrokRealtimeLLMService._handle_evt_function_call_arguments_donec                    s2   |   I dH  | tI dH  |  I dH  dS )z%Handle speech started event from VAD.N)r   broadcast_framer    broadcast_interruptionr  r?   r?   r@   r  I  s   z1GrokRealtimeLLMService._handle_evt_speech_startedc                    s2   |   I dH  |  I dH  | tI dH  dS )z%Handle speech stopped event from VAD.N)start_ttfb_metricsstart_processing_metricsrH  r!   r  r?   r?   r@   r  O  s   z1GrokRealtimeLLMService._handle_evt_speech_stoppedc                    s"   | j d|jj dI dH  dS )zHandle error event.zGrok Realtime Error: r.  N)r   r   r  r  r?   r?   r@   r  U  s    z(GrokRealtimeLLMService._handle_evt_errorc                    sD   t d |  I dH  d| _| jddI dH  |  I dH  dS )z9Reset the conversation by disconnecting and reconnecting.zResetting Grok conversationNTFr   )r
   r  r   r   r   r   r   r?   r?   r@   reset_conversation]  s   
z)GrokRealtimeLLMService.reset_conversationc                    s   | j s	d| _dS |  }| jrJtd|| j  || j}|d }|D ]}t	j
|d}d| j|jj< | |I dH  q(|  I dH  d| _td | t I dH  |  I dH  |  I dH  | t	jt	jdd	gd
dI dH  dS )zCreate an assistant response.TNz4Setting up Grok conversation with initial messages: messagesr   FzCreating Grok responser9  r   )
modalities)r0  )r   r   r   r   r
   r  get_messages_for_loggingr   r   r1   ConversationItemCreateEventr   r   r#  r   r   r   r   rK  rJ  r   ResponseProperties)rO   r   r   rM  r   r  r?   r?   r@   r   g  s8   

z'GrokRealtimeLLMService._create_responser   c                    s   d}| j  D ]0}|dr8|ddkr8|d}|r8|| jvr8|r2d}| ||dI dH  | j| q|rD|  I dH  dS dS )zAProcess completed function calls and send results to the service.Fr$  contentIN_PROGRESSr@  TN)r   get_messagesra   r   _send_tool_resultaddr   )rO   r   sent_new_resultr  r@  r?   r?   r@   r     s   
z8GrokRealtimeLLMService._process_completed_function_callsc                    s:   | j rdS t|jd}| tj|dI dH  dS )zSend user audio to Grok.Nzutf-8)r   )r   r  	b64encoder   decoder   r1   InputAudioBufferAppendEvent)rO   r   payloadr?   r?   r@   r     s
   z'GrokRealtimeLLMService._send_user_audior@  r&  c                    s8   t jd|tj|ddd}| t j|dI dH  dS )z Send a tool call result to Grok.function_call_outputF)ensure_ascii)r   r!  r   rN  N)r1   ConversationItemr   r   r   rQ  )rO   r@  r&  r   r?   r?   r@   rV    s   z(GrokRealtimeLLMService._send_tool_resultuser_paramsassistant_paramsra  rb  c                C   s   t |}d|_t|||dS )aN  Create context aggregators for the Grok Realtime service.

        Args:
            context: The LLM context.
            user_params: User aggregator parameters.
            assistant_params: Assistant aggregator parameters.

        Returns:
            LLMContextAggregatorPair for user and assistant context aggregation.
        Fr`  )r#   from_openai_contextexpect_stripped_wordsr&   )rO   r   ra  rb  r?   r?   r@   create_context_aggregator  s
   
z0GrokRealtimeLLMService.create_context_aggregator)Nr   )Ir8   r9   r:   r;   rA   r   r=   r   adapter_classr<   r   r1   r\   boolr   r   r   r>   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r(   r   r#   r   r   ClientEventr   r   r   r   r   r   r   r  r  r  r  r	  r
  r  r  r  r8  r  r  r  r  rL  r   r   r   rV  r%   r$   r'   r&   re  rm   r?   r?   rR   r@   rn      s   
 a


	

<
%rn   )Nr;   r  r   r  dataclassesr   r   r   rZ   typingr   r   r   r   r	   logurur
   %pipecat.adapters.schemas.tools_schemar   /pipecat.adapters.services.grok_realtime_adapterr   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   pipecat.metrics.metricsr"   *pipecat.processors.aggregators.llm_contextr#   +pipecat.processors.aggregators.llm_responser$   r%   5pipecat.processors.aggregators.llm_response_universalr&   1pipecat.processors.aggregators.openai_llm_contextr'   "pipecat.processors.frame_processorr(   pipecat.services.llm_servicer)   r*   pipecat.services.settingsr+   r,   r-   r.   pipecat.utils.timer/   r%  r1   websockets.asyncio.clientr2   r   ModuleNotFoundErrorr   r   r   r3   rA   rn   r?   r?   r?   r@   <module>   sD   \
]