o
    i:                  
   @   s  d Z ddlZddlZddlmZmZ ddlmZmZm	Z	m
Z
mZ ddlZddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZmZ ddlmZ dd	lmZmZm Z  dd
l!m"Z"m#Z#m$Z$m%Z% ddl&m'Z'm(Z( ddl)m*Z* ddl+m,Z, ddl-m.Z. zddl/m0Z1 ddl2m3Z3 W n  e4y Z5 ze6de5  e6d e7de5 dZ5[5ww de'de8fddZ9eG dd deZ:eG dd deZ;G dd de%Z<G dd de$Z=G d d! d!e"Z>dS )"zRime text-to-speech service implementations.

This module provides both WebSocket and HTTP-based text-to-speech services
using Rime's API for streaming and batch audio synthesis.
    N)	dataclassfield)AnyAsyncGeneratorClassVarDictOptional)logger)	BaseModel)	CancelFrameEndFrame
ErrorFrameFrameInterruptionFrame
StartFrameTTSAudioRawFrameTTSStartedFrameTTSStoppedFrame)FrameDirection)	NOT_GIVENTTSSettings	_NotGiven)InterruptibleTTSServiceTextAggregationMode
TTSServiceWebsocketTTSService)Languageresolve_language)BaseTextAggregator)SkipTagsAggregator)
traced_tts)connect)StatezException: zAIn order to use Rime, you need to `pip install pipecat-ai[rime]`.zMissing module: languagereturnc              
   C   s0   t jdt jdt jdt jdt jdi}t| |ddS )zConvert pipecat Language to Rime language code.

    Args:
        language: The pipecat Language enum value.

    Returns:
        Three-letter language code used by Rime (e.g., 'eng' for English).
    gerfraengspahinF)use_base_code)r   DEFRENESHIr   )r#   LANGUAGE_MAP r1   M/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/services/rime/tts.pylanguage_to_rime_language7   s   
r3   c                   @   sx  e Zd ZU dZedd dZedB eB ed< edd dZ	e
dB eB ed< ed	d dZedB eB ed
< edd dZedB eB ed< edd dZedB eB ed< edd dZedB eB ed< edd dZedB eB ed< edd dZedB eB ed< edd dZe
dB eB ed< edd dZe
dB eB ed< edd dZe
dB eB ed< ddiZeeeef  ed< dS )RimeTTSSettingsa  Settings for RimeTTSService and RimeHttpTTSService.

    Parameters:
        segment: Text segmentation mode ("immediate", "bySentence", "never").
        speedAlpha: Speech speed multiplier (mistv2 only).
        reduceLatency: Whether to reduce latency at potential quality cost (mistv2 only).
        pauseBetweenBrackets: Whether to add pauses between bracketed content (mistv2 only).
        phonemizeBetweenBrackets: Whether to phonemize bracketed content (mistv2 only).
        noTextNormalization: Whether to disable text normalization (mistv2 only).
        saveOovs: Whether to save out-of-vocabulary words (mistv2 only).
        inlineSpeedAlpha: Inline speed control markup.
        repetition_penalty: Token repetition penalty (arcana only, 1.0-2.0).
        temperature: Sampling temperature (arcana only, 0.0-1.0).
        top_p: Cumulative probability threshold (arcana only, 0.0-1.0).
    c                   C      t S Nr   r1   r1   r1   r2   <lambda>\       zRimeTTSSettings.<lambda>default_factoryNsegmentc                   C   r5   r6   r7   r1   r1   r1   r2   r8   ]   r9   
speedAlphac                   C   r5   r6   r7   r1   r1   r1   r2   r8   ^   r9   reduceLatencyc                   C   r5   r6   r7   r1   r1   r1   r2   r8   _   r9   pauseBetweenBracketsc                   C   r5   r6   r7   r1   r1   r1   r2   r8   `   r9   phonemizeBetweenBracketsc                   C   r5   r6   r7   r1   r1   r1   r2   r8   a   r9   noTextNormalizationc                   C   r5   r6   r7   r1   r1   r1   r2   r8   b   r9   saveOovsc                   C   r5   r6   r7   r1   r1   r1   r2   r8   c   r9   inlineSpeedAlphac                   C   r5   r6   r7   r1   r1   r1   r2   r8   d   r9   repetition_penaltyc                   C   r5   r6   r7   r1   r1   r1   r2   r8   e   r9   temperaturec                   C   r5   r6   r7   r1   r1   r1   r2   r8   f   r9   top_pspeakervoice_aliases)__name__
__module____qualname____doc__r   r<   strr   __annotations__r=   floatr>   boolr?   r@   rA   rB   rC   rD   rE   rF   rI   r   r   r1   r1   r1   r2   r4   J   s   
  r4   c                   @   s   e Zd ZU dZedd dZedB eB ed< edd dZ	e
dB eB ed< ed	d dZe
dB eB ed
< edd dZe
dB eB ed< ddiZeeeef  ed< dS )RimeNonJsonTTSSettingsa7  Settings for RimeNonJsonTTSService.

    Parameters:
        segment: Text segmentation mode ("immediate", "bySentence", "never").
        repetition_penalty: Token repetition penalty (1.0-2.0).
        temperature: Sampling temperature (0.0-1.0).
        top_p: Cumulative probability threshold (0.0-1.0).
    c                   C   r5   r6   r7   r1   r1   r1   r2   r8   v   r9   zRimeNonJsonTTSSettings.<lambda>r:   Nr<   c                   C   r5   r6   r7   r1   r1   r1   r2   r8   w   r9   rD   c                   C   r5   r6   r7   r1   r1   r1   r2   r8   x   r9   rE   c                   C   r5   r6   r7   r1   r1   r1   r2   r8   y   r9   rF   rG   rH   rI   )rJ   rK   rL   rM   r   r<   rN   r   rO   rD   rP   rE   rF   rI   r   r   r1   r1   r1   r2   rR   k   s   
 	 rR   c                       s  e Zd ZU dZeZeed< G dd deZdddddddddd	de	d	e
e	 d
e	de
e	 de
e de
e de
e de
e de
e de
e f fddZdefddZdede	dB fddZdee	ef fddZde	de	fddZdede	fd d!Zde	d"e	d#e	de	fd$d%Zde	d&ede	fd'd(Zd)edee	ef f fd*d+ZdYde	d-e	defd.d/Zdefd0d1Zdefd2d3Zd4e f fd5d6Z!d4e"f fd7d8Z#d4e$f fd9d:Z% fd;d<Z& fd=d>Z'd?d@ Z(dAdB Z)dCdD Z*d-e	fdEdFZ+d-e	fdGdHZ,d-e	fdIdJZ-dKe.dLe.dMe.de.fdNdOZ/dZd-e
e	 fdPdQZ0dRdS Z1e2j3fd4e4dTe2f fdUdVZ5e6de	d-e	de7e4df fdWdXZ8  Z9S )[RimeTTSServicezText-to-Speech service using Rime's websocket API.

    Uses Rime's websocket JSON API to convert text to speech with word-level timing
    information. Supports interruptions and maintains context across multiple messages
    within a turn.
    	_settingsc                   @   s   e Zd ZU dZejZee ed< dZ	ee
 ed< dZee ed< dZee ed< dZee ed< dZee ed< dZee ed	< dZee ed
< dZee ed< dZee ed< dZee ed< dS )zRimeTTSService.InputParamsa  Configuration parameters for Rime TTS service.

        .. deprecated:: 0.0.105
            Use ``settings=RimeTTSService.Settings(...)`` instead.

        Parameters:
            language: Language for synthesis. Defaults to English.
            segment: Text segmentation mode ("immediate", "bySentence", "never").
            speed_alpha: Speech speed multiplier.
            repetition_penalty: Token repetition penalty (arcana only).
            temperature: Sampling temperature (arcana only).
            top_p: Cumulative probability threshold (arcana only).
            reduce_latency: Whether to reduce latency at potential quality cost (mistv2 only).
            pause_between_brackets: Whether to add pauses between bracketed content (mistv2 only).
            phonemize_between_brackets: Whether to phonemize bracketed content (mistv2 only).
            no_text_normalization: Whether to disable text normalization (mistv2 only).
            save_oovs: Whether to save out-of-vocabulary words (mistv2 only).
        r#   Nr<   speed_alpharD   rE   rF   reduce_latencypause_between_bracketsphonemize_between_bracketsno_text_normalization	save_oovs)rJ   rK   rL   rM   r   r-   r#   r   rO   r<   rN   rU   rP   rD   rE   rF   rV   rQ   rW   rX   rY   rZ   r1   r1   r1   r2   InputParams   s   
 r[   Nzwss://users-ws.rime.ai/ws3)	voice_idurlmodelsample_rateparamssettingstext_aggregatortext_aggregation_modeaggregate_sentencesapi_keyr\   r]   r^   r_   r`   ra   rb   rc   rd   c       
            sB  | j ddddddddddddddd}|dur | dd ||_|dur-| dd ||_|durd| d |sd|j|_|j|_|j|_|j|_|j	|_	|j
|_
|j|_|j|_|j|_|j|_|j|_|durm|| t jd|	|
dd	d	d	||d
| d| _d| _|stdg| jd| _|| _|| _d| _d| _ i | _!dS )a  Initialize Rime TTS service.

        Args:
            api_key: Rime API key for authentication.
            voice_id: ID of the voice to use.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeTTSService.Settings(voice=...)`` instead.

            url: Rime websocket API endpoint.
            model: Model ID to use for synthesis.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeTTSService.Settings(model=...)`` instead.

            sample_rate: Audio sample rate in Hz.
            params: Additional configuration parameters.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeTTSService.Settings(...)`` instead.

            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            text_aggregator: Custom text aggregator for processing input text.

                .. deprecated:: 0.0.95
                    Use an LLMTextProcessor before the TTSService for custom text aggregation.

            text_aggregation_mode: How to aggregate incoming text before synthesis.
            aggregate_sentences: Deprecated. Use text_aggregation_mode instead.

                .. deprecated:: 0.0.104
                    Use ``text_aggregation_mode`` instead.

            **kwargs: Additional arguments passed to parent class.
        arcanaN)r^   rH   r#   r<   rC   r=   rD   rE   rF   r>   r?   r@   rA   rB   r\   rH   r^   r`   FT)rc   rd   push_text_framespush_stop_framespause_frame_processingappend_trailing_spacer_   ra   pcmr   )spell())aggregation_typer1   )"Settings"_warn_init_param_moved_to_settingsrH   r^   r#   r<   rU   r=   rD   rE   rF   rV   r>   rW   r?   rX   r@   rY   rA   rZ   rB   apply_updatesuper__init___audio_format_sampling_rater   _text_aggregation_mode_text_aggregator_api_key_url_receive_task_cumulative_time_extra_msg_fields)selfre   r\   r]   r^   r_   r`   ra   rb   rc   rd   kwargsdefault_settings	__class__r1   r2   rs      sz   4

	
zRimeTTSService.__init__r$   c                 C      dS )zCheck if this service can generate processing metrics.

        Returns:
            True, as Rime service supports metrics generation.
        Tr1   r}   r1   r1   r2   can_generate_metrics3     z#RimeTTSService.can_generate_metricsr#   c                 C      t |S zConvert pipecat language to Rime language code.

        Args:
            language: The language to convert.

        Returns:
            The Rime-specific language code, or None if not supported.
        r3   r}   r#   r1   r1   r2   language_to_service_language;     	z+RimeTTSService.language_to_service_languagec                 C   sN  | j j| j j| j| jd}| j jdur| j j|d< | j jdur%| j j|d< | j jdur1| j j|d< | j jdkr]| j jdurC| j j|d< | j j	durO| j j	|d< | j j
dur[| j j
|d	< |S | j jduri| j j|d
< | j jdurxt| j j|d< | j jdurt| j j|d< | j jdurt| j j|d< | j jdurt| j j|d< |S )a-  Build query params for the WebSocket URL from current settings.

        Returns:
            Dictionary of query parameters for the WebSocket URL.
            Only explicitly-set values are included. Boolean mistv2 params
            are serialized with ``json.dumps()`` for the wire format.
        rG   modelIdaudioFormatsamplingRateNlangr<   r=   rf   rD   rE   rF   r>   r?   r@   rA   rB   )rT   rH   r^   rt   ru   r#   r<   r=   rD   rE   rF   r>   r?   jsondumpsr@   rA   rB   )r}   r`   r1   r1   r2   _build_ws_paramsF  s@   	zRimeTTSService._build_ws_paramstextc                 C   s   d|  dS )z!Wrap text in Rime spell function.rl   rm   r1   )r   r1   r1   r2   SPELLs  s   zRimeTTSService.SPELLsecondsc                 C   s   d| d  dS )z)Convenience method to create a pause tag.<i  >r1   )r   r1   r1   r2   	PAUSE_TAGw  s   zRimeTTSService.PAUSE_TAGwordphonemec                 C   s   d| j d< ||| S )zConvenience method to support Rime's custom pronunciations feature.

        https://docs.rime.ai/api-reference/custom-pronunciation
        Tr@   )r|   replace)r}   r   r   r   r1   r1   r2   	PRONOUNCE{  s   
zRimeTTSService.PRONOUNCEspeedc                 C   sF   | j si | _ | j ddd}d|t|g | j d< d| dS )z,Convenience method to support inline speeds.rC    ,[])r|   getsplitjoinrN   )r}   r   r   
speed_valsr1   r1   r2   INLINE_SPEED  s
   zRimeTTSService.INLINE_SPEEDdeltac                    s>   t  |I dH }|r| jr|  I dH  |  I dH  |S )Apply a settings delta and reconnect if necessary.

        Since all settings are WebSocket URL query parameters,
        any setting change requires reconnecting to apply the new values.
        N)rr   _update_settings
_websocket_disconnect_connectr}   r   changedr   r1   r2   r     s   
zRimeTTSService._update_settingsr   
context_idc                 C   s$   ||d}| j r|| j O }i | _ |S )z Build JSON message for Rime API.)r   	contextId)r|   )r}   r   r   msgr1   r1   r2   
_build_msg  s
   

zRimeTTSService._build_msgc                 C      ddiS )zBuild clear operation message.	operationclearr1   r   r1   r1   r2   _build_clear_msg     zRimeTTSService._build_clear_msgc                 C   r   )z&Build end-of-stream operation message.r   eosr1   r   r1   r1   r2   _build_eos_msg  r   zRimeTTSService._build_eos_msgframec                    .   t  |I dH  | j| _|  I dH  dS )zStart the service and establish websocket connection.

        Args:
            frame: The start frame containing initialization parameters.
        Nrr   startr_   ru   r   r}   r   r   r1   r2   r        zRimeTTSService.startc                    &   t  |I dH  |  I dH  dS )z`Stop the service and close connection.

        Args:
            frame: The end frame.
        Nrr   stopr   r   r   r1   r2   r        zRimeTTSService.stopc                    r   )zcCancel current operation and clean up.

        Args:
            frame: The cancel frame.
        Nrr   cancelr   r   r   r1   r2   r     r   zRimeTTSService.cancelc                    L   t   I dH  |  I dH  | jr"| js$| | | j| _dS dS dS )z6Establish websocket connection and start receive task.Nrr   r   _connect_websocketr   rz   create_task_receive_task_handler_report_errorr   r   r1   r2   r     s   zRimeTTSService._connectc                    B   t   I dH  | jr| | jI dH  d| _|  I dH  dS )z.Close websocket connection and clean up tasks.Nrr   r   rz   cancel_task_disconnect_websocketr   r   r1   r2   r     s   zRimeTTSService._disconnectc              
      s   zB| j r| j jtju rW dS |  }ddd | D }| j d| }dd| j i}t	||dI dH | _ | 
d	I dH  W dS  typ } z!| jd
| |dI dH  d| _ | 
d| I dH  W Y d}~dS d}~ww )z7Connect to Rime websocket API with configured settings.N&c                 s   *    | ]\}}|d ur| d| V  qd S N=r1   .0kvr1   r1   r2   	<genexpr>     ( z4RimeTTSService._connect_websocket.<locals>.<genexpr>?AuthorizationBearer )additional_headerson_connectedzError connecting: 	error_msg	exceptionon_connection_error)r   stater"   OPENr   r   itemsry   rx   websocket_connect_call_event_handler	Exception
push_error)r}   	ws_paramsr`   r]   headerser1   r1   r2   r     s    "z!RimeTTSService._connect_websocketc              
      s  zlz"|   I dH  | jr#| jt|  I dH  | j I dH  W n tyC } z| jd| |dI dH  W Y d}~nd}~ww W | 	 I dH  d| _| 
dI dH  dS W | 	 I dH  d| _| 
dI dH  dS | 	 I dH  d| _| 
dI dH  w )z+Close websocket connection and reset state.NzError disconnecting: r   on_disconnected)stop_all_metricsr   sendr   r   r   closer   r   remove_active_audio_contextr   r}   r   r1   r1   r2   r     s,   &z$RimeTTSService._disconnect_websocketc                 C      | j r| j S td)z3Get active websocket connection or raise exception.Websocket not connectedr   r   r   r1   r1   r2   _get_websocket     zRimeTTSService._get_websocketc                    s:   |   I dH  |r|  t|  I dH  dS dS )z-Clear the Rime speech queue and stop metrics.N)r   r   r   r   r   r   r}   r   r1   r1   r2   _close_context  s
   "zRimeTTSService._close_contextc                       |  |I dH  dS )zIClear the Rime speech queue and stop metrics when the bot is interrupted.Nr   r   r1   r1   r2   on_audio_context_interrupted  s   z+RimeTTSService.on_audio_context_interruptedc                    r   )aS  Clear server-side state and stop metrics after the Rime context finishes playing.

        Rime does not send a server-side completion signal (e.g. ``done`` / ``end_of_stream`` /
        ``audio_end``), so we explicitly send a ``clear`` message to clean up
        any residual server-side state once all audio has been delivered.
        Nr   r   r1   r1   r2   on_audio_context_completed  s   z)RimeTTSService.on_audio_context_completedwordsstartsendsc                 C   s   g }t t|||D ]4\}\}}}| sq
|| j }	t|ddk}
|
r7|r7|d \}}|| |f|d< q
|||	f q
|S )a@  Calculate word timing pairs with proper spacing and punctuation.

        Args:
            words: List of words from Rime.
            starts: List of start times for each word.
            ends: List of end times for each word.

        Returns:
            List of (word, timestamp) pairs with proper timing.
        z,.!?r   )	enumeratezipstripr{   rQ   append)r}   r   r   r  
word_pairsir   
start_time_adjusted_startis_punctuation	prev_word	prev_timer1   r1   r2   _calculate_word_times  s   
z$RimeTTSService._calculate_word_timesc                    sN   |p|   }|r| jsdS t|  d |  tddiI dH  dS )"Flush any pending audio synthesis.N: flushing audior   flush)get_active_audio_context_idr   r	   tracer   r   r   r   )r}   r   flush_idr1   r1   r2   flush_audio.  s   
"zRimeTTSService.flush_audioc           
         s`  |   2 z3 dH W }t|}|r| |dsq|d }|d dkr=tt|d | jd|d}| 	||I dH  q|d dkr|d	i }|d
g }|dg }|dg }|r|r| 
|||}	|	r| j|	|dI dH  |d | j | _td| j  q|d dkr| t I dH  |  I dH  | jd|d  dI dH  |   q6 dS )z$Process incoming websocket messages.Nr   typechunkdata   audior_   num_channelsr   
timestampsword_timestampsr   r   endr   r  zUpdated cumulative time to: errorError: message)r   )r   r   loadsaudio_context_availabler   r   base64	b64decoder_   append_to_audio_contextr  add_word_timestampsr{   r	   debug
push_framer   r   r   reset_active_audio_context)
r}   r$  r   r   r   r  r   r   r  r  r1   r1   r2   _receive_messages7  sB   
z RimeTTSService._receive_messages	directionc                    sL   t  ||I dH  t|ttfr"t|tr$| dgI dH  dS dS dS )zPush frame and handle end-of-turn conditions.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        N)Resetr   )rr   r,  
isinstancer   r   r*  r}   r   r/  r   r1   r2   r,  _  s   
zRimeTTSService.push_framec              
   C  sZ  t |  d| d z| jr| jjtju r|  I dH  z;| |s<| |I dH  | 	 I dH  t
|dV  d| _| j||d}|  t|I dH  | |I dH  W n1 ty } z%td| dV  t|dV  |  I dH  |  I dH  W Y d}~W dS d}~ww dV  W dS  ty } ztd| dV  W Y d}~dS d}~ww )	a  Generate speech from text using Rime's streaming API.

        Args:
            text: The text to convert to speech.
            context_id: Unique identifier for this TTS context.

        Yields:
            Frame: Audio frames containing the synthesized speech.
        : Generating TTS [r   Nr!  r   )r   r   Unknown error occurred: r"  )r	   r+  r   r   r"   CLOSEDr   r&  create_audio_contextstart_ttfb_metricsr   r{   r   r   r   r   r   start_tts_usage_metricsr   r   r   r   )r}   r   r   r   r   r1   r1   r2   run_ttsk  s6   
 zRimeTTSService.run_tts)r   r   r6   ):rJ   rK   rL   rM   r4   ro   rO   r
   r[   rN   r   intr   r   rQ   rs   r   r   r   dictr   r   r   rP   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   listr  r  r.  r   
DOWNSTREAMr   r,  r    r   r:  __classcell__r1   r1   r   r2   rS   ~   s   
 &	
 	-
			
		((rS   c                       s   e Zd ZU dZeZeed< G dd deZddddddde	de
e	 d	ejd
e
e	 de
e de
e de
e f fddZdefddZdede	dB fddZede	de	deedf fddZ  ZS )RimeHttpTTSServicezRime HTTP-based text-to-speech service.

    Provides text-to-speech synthesis using Rime's HTTP API for batch processing.
    Suitable for use cases where streaming is not required.
    rT   c                   @   st   e Zd ZU dZejZee ed< dZ	ee
 ed< dZee
 ed< dZee ed< dZee ed	< dZee
 ed
< dS )zRimeHttpTTSService.InputParamsar  Configuration parameters for Rime HTTP TTS service.

        .. deprecated:: 0.0.105
            Use ``settings=RimeHttpTTSService.Settings(...)`` instead.

        Parameters:
            language: Language for synthesis. Defaults to English.
            pause_between_brackets: Whether to add pauses between bracketed content.
            phonemize_between_brackets: Whether to phonemize bracketed content.
            inline_speed_alpha: Inline speed control markup.
            speed_alpha: Speech speed multiplier. Defaults to 1.0.
            reduce_latency: Whether to reduce latency at potential quality cost.
        r#   FrW   rX   Ninline_speed_alphag      ?rU   rV   )rJ   rK   rL   rM   r   r-   r#   r   rO   rW   rQ   rX   rA  rN   rU   rP   rV   r1   r1   r1   r2   r[     s   
 r[   N)r\   r^   r_   r`   ra   re   r\   aiohttp_sessionr^   r_   r`   ra   c          
         s   | j ddddddddddddddd}	|dur | dd ||	_|dur-| dd ||	_|durU| d |sU|j|	_|j|	_|j|	_|j	|	_
|j|	_|jrR|jnd|	_|dur^|	| t jd|d	d	|	d
| || _|| _d| _d| _dS )a  Initialize Rime HTTP TTS service.

        Args:
            api_key: Rime API key for authentication.
            voice_id: ID of the voice to use.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeHttpTTSService.Settings(voice=...)`` instead.

            aiohttp_session: Shared aiohttp session for HTTP requests.
            model: Model ID to use for synthesis.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeHttpTTSService.Settings(model=...)`` instead.

            sample_rate: Audio sample rate in Hz.
            params: Additional configuration parameters.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeHttpTTSService.Settings(...)`` instead.

            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            **kwargs: Additional arguments passed to parent TTSService.
        mistv2Nr'   )r^   rH   r#   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   r\   rH   r^   r`   T)r_   rh   push_start_framera   z!https://users.rime.ai/v1/rime-ttsrk   r1   )ro   rp   rH   r^   r#   rU   r=   rV   r>   rW   r?   rX   r@   rA  rC   rq   rr   rs   rx   _session	_base_urlrt   )
r}   re   r\   rB  r^   r_   r`   ra   r~   r   r   r1   r2   rs     s\   &


zRimeHttpTTSService.__init__r$   c                 C   r   )zCheck if this service can generate processing metrics.

        Returns:
            True, as Rime HTTP service supports metrics generation.
        Tr1   r   r1   r1   r2   r     r   z'RimeHttpTTSService.can_generate_metricsr#   c                 C   r   r   r   r   r1   r1   r2   r     r   z/RimeHttpTTSService.language_to_service_languager   r   c              
   C  s  t |  d| d dd| j dd}| jj| jj| jj| jj| jjd}| jj	dur4| jj	|d	< ||d
< | jj
|d< | jj|d< | j|d< |d dkrVd|d< d}nd}zzu| jj| j||d4 I dH Z}|jdkrd|j }t|dV  	 W d  I dH  W W |  I dH  dS | |I dH  | j}| j|j|||d2 z3 dH W }	|  I dH  |	V  q6 W d  I dH  n1 I dH sw   Y  W n ty }
 ztd|
 dV  W Y d}
~
nd}
~
ww W |  I dH  dS W |  I dH  dS |  I dH  w )a  Generate speech from text using Rime's HTTP API.

        Args:
            text: The text to synthesize into speech.
            context_id: The context ID for tracking audio frames.

        Yields:
            Frame: Audio frames containing the synthesized speech.
        r3  r   z	audio/pcmr   zapplication/json)Acceptr   zContent-Type)r   r=   r>   r?   r@   NrC   r   rG   r   r   rf   z	audio/wavrG  TF)r   r      zRime TTS error: HTTP r5  )strip_wav_headerr   r4  )r	   r+  rx   rT   r#   r=   r>   r?   r@   rC   rH   r^   r_   rE  postrF  statusr   stop_ttfb_metricsr9  
chunk_size"_stream_audio_frames_from_iteratorcontentiter_chunkedr   )r}   r   r   r   payloadneed_to_strip_wav_headerresponseerror_message
CHUNK_SIZEr   r   r1   r1   r2   r:  "  sj   



("zRimeHttpTTSService.run_tts)rJ   rK   rL   rM   r4   ro   rO   r
   r[   rN   r   aiohttpClientSessionr;  rs   rQ   r   r   r   r    r   r   r:  r?  r1   r1   r   r2   r@    s:   
 	_(r@  c                       s  e Zd ZU dZeZeed< G dd deZdddddddddd	d	e	d
e
e	 de	de
e	 de	de
e de
e de
e de
e de
e f fddZdefddZdede	fddZdef fddZdef fddZdef fd d!Zejfded"ef fd#d$Z fd%d&Z fd'd(Zd)d* Zd+d, Zd-d. Z d:d/e
e	 fd0d1Z!d2d3 Z"e#d4e	d/e	de$edf fd5d6Z%d7e&de'e	e(f f fd8d9Z)  Z*S );RimeNonJsonTTSServicea  Pipecat TTS service for Rime's non-JSON WebSocket API.

    .. deprecated:: 0.0.102
        Arcana now supports JSON WebSocket with word-level timestamps via the
        ``wss://users-ws.rime.ai/ws3`` endpoint. Use :class:`RimeTTSService`
        with ``model="arcana"`` instead.

    This service enables Text-to-Speech synthesis over WebSocket endpoints
    that require plain text (not JSON) messages and return raw audio bytes.

    Limitations:
        - Does not support word-level timestamps or context IDs.
        - Intended specifically for integrations where the TTS provider only
          accepts and returns non-JSON messages.
    rT   c                   @   sz   e Zd ZU dZdZee ed< dZee	 ed< dZ
ee ed< dZee ed< dZee ed< dZeee	ef  ed< dS )	z!RimeNonJsonTTSService.InputParamsam  Configuration parameters for Rime Non-JSON WebSocket TTS service.

        .. deprecated:: 0.0.105
            Use ``settings=RimeNonJsonTTSService.Settings(...)`` instead.

        Args:
            language: Language for synthesis. Defaults to English.
            segment: Text segmentation mode ("immediate", "bySentence", "never").
            repetition_penalty: Token repetition penalty (1.0-2.0).
            temperature: Sampling temperature (0.0-1.0).
            top_p: Cumulative probability threshold (0.0-1.0).
            extra: Additional parameters to pass to the API (for future compatibility).
        Nr#   r<   rD   rE   rF   extra)rJ   rK   rL   rM   r#   r   r   rO   r<   rN   rD   rP   rE   rF   rY  r<  r   r1   r1   r1   r2   r[   y  s   
 r[   Nzwss://users.rime.ai/wsrk   )	r\   r]   r^   audio_formatr_   r`   ra   rd   rc   re   r\   r]   r^   rZ  r_   r`   ra   rd   rc   c       
            s   | j dddddddd}|dur| dd ||_|dur&| dd ||_|durE| d |sE|j|_|j|_|j|_|j|_|j|_|durN|	| t
 jd
||	|
dddd|d	| || _|| _|| _|| _|ry|jry| jj|j d| _dS )a  Initialize Rime Non-JSON WebSocket TTS service.

        Args:
            api_key: Rime API key for authentication.
            voice_id: ID of the voice to use.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeNonJsonTTSService.Settings(voice=...)`` instead.

            url: Rime websocket API endpoint.
            model: Model ID to use for synthesis.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeNonJsonTTSService.Settings(model=...)`` instead.

            audio_format: Audio format to use.
            sample_rate: Audio sample rate in Hz.
            params: Additional configuration parameters.

                .. deprecated:: 0.0.105
                    Use ``settings=RimeNonJsonTTSService.Settings(...)`` instead.

            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            aggregate_sentences: Deprecated. Use text_aggregation_mode instead.

                .. deprecated:: 0.0.104
                    Use ``text_aggregation_mode`` instead. Set to ``TextAggregationMode.SENTENCE``
                    to aggregate text into sentences before synthesis, or
                    ``TextAggregationMode.TOKEN`` to stream tokens directly for lower latency.

            text_aggregation_mode: How to aggregate text before synthesis.
            **kwargs: Additional arguments passed to parent class.
        Nrf   )rH   r^   r#   r<   rD   rE   rF   r\   rH   r^   r`   T)r_   rd   rc   rh   rD  ri   rj   ra   r1   )ro   rp   rH   r^   r#   r<   rD   rE   rF   rq   rr   rs   rt   ru   rx   ry   rY  rT   updaterz   )r}   re   r\   r]   r^   rZ  r_   r`   ra   rd   rc   r~   r   r   r1   r2   rs     sX   2

	

zRimeNonJsonTTSService.__init__r$   c                 C   r   )zCheck if this service can generate processing metrics.

        Returns:
            True, as Rime Non-JSON WebSocket service supports metrics generation.
        Tr1   r   r1   r1   r2   r     r   z*RimeNonJsonTTSService.can_generate_metricsr#   c                 C   r   )a>  Convert pipecat Language enum to Rime language code.

        Args:
            language: The Language enum value to convert.

        Returns:
            Three-letter Rime language code (e.g., 'eng' for English).
            Falls back to the language's base code with a warning if not in the verified list.
        r   r   r1   r1   r2   r     s   
z2RimeNonJsonTTSService.language_to_service_languager   c                    r   )zStart the Rime Non-JSON WebSocket TTS service.

        Args:
            frame: The start frame containing initialization parameters.
        Nr   r   r   r1   r2   r     r   zRimeNonJsonTTSService.startc                    r   )z&Stop the service and close connection.Nr   r   r   r1   r2   r        zRimeNonJsonTTSService.stopc                    r   )z&Cancel current operation and clean up.Nr   r   r   r1   r2   r     r\  zRimeNonJsonTTSService.cancelr/  c                    s   t  ||I dH  dS )zPush a frame downstream with special handling for stop conditions.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        N)rr   r,  r2  r   r1   r2   r,  !  s   z RimeNonJsonTTSService.push_framec                    r   )z6Establish WebSocket connection and start receive task.Nr   r   r   r1   r2   r   *  s   zRimeNonJsonTTSService._connectc                    r   )z.Close WebSocket connection and clean up tasks.Nr   r   r   r1   r2   r   2  s   z!RimeNonJsonTTSService._disconnectc              
      s|  z| j r| j jtju rW dS | jj| jj| j| jd}| jj	dur(| jj	|d< | jj
dur4| jj
|d< | jjdur@| jj|d< | jjdurL| jj|d< | jjdurX| jj|d< || jj dd	d
 | D }| j d| }dd| j i}t||ddI dH | _ | dI dH  W dS  ty } z!| jd| |dI dH  d| _ | d| I dH  W Y d}~dS d}~ww )z:Establish WebSocket connection to Rime non-JSON websocket.Nr   r   r<   rD   rE   rF   r   c                 s   r   r   r1   r   r1   r1   r2   r   S  r   z;RimeNonJsonTTSService._connect_websocket.<locals>.<genexpr>r   r   r   i   )r   max_sizer   r4  r   r   )r   r   r"   r   rT   rH   r^   rt   ru   r#   r<   rD   rE   rF   r[  rY  r   r   ry   rx   r   r   r   r   )r}   settings_dictr`   r]   r   r   r1   r1   r2   r   ;  sB   "z(RimeNonJsonTTSService._connect_websocketc              
      s   z^z"|   I dH  | jr#| jdI dH  | j I dH  td W n tyC } z| jd| |dI dH  W Y d}~nd}~ww W d| _| dI dH  dS W d| _| dI dH  dS d| _| dI dH  w )z.Close WebSocket connection and clean up state.Nz<EOS>z)Disconnected from Rime non-JSON websocketr4  r   r   )	r   r   r   r   r	   r+  r   r   r   r   r1   r1   r2   r   _  s(   
&z+RimeNonJsonTTSService._disconnect_websocketc                 C   r   )z3Get active WebSocket connection or raise exception.r   r   r   r1   r1   r2   r   n  r   z$RimeNonJsonTTSService._get_websocketr   c                    s2   | j sdS t|  d | j dI dH  dS )r  Nr  z<FLUSH>)r   r	   r  r   r   r1   r1   r2   r  t  s
   z!RimeNonJsonTTSService.flush_audioc                    s   |   2 zF3 dH W }z!t|tr+|  I dH  t|| jd|  d}| |I dH  W q tyK } z| j	d| |dI dH  W Y d}~qd}~ww 6 dS )z6Process incoming WebSocket messages (raw audio bytes).Nr  r  r#  r   )
r   r1  bytesrL  r   r_   r  r,  r   r   )r}   r$  r   r   r1   r1   r2   r.  |  s$   
&z'RimeNonJsonTTSService._receive_messagesr   c              
   C  s  t |  d| d z]| jr| jjtju r|  I dH  z|  |I dH  | 	|I dH  W n1 t
yc } z%td| dV  t|dV  |  I dH  |  I dH  W Y d}~W dS d}~ww dV  W dS  t
y } ztd| dV  W Y d}~dS d}~ww )a  Generate speech from text using Rime's streaming API.

        Args:
            text: The text to synthesize into speech.
            context_id: The context ID for tracking audio frames.

        Yields:
            Frame: Audio frames containing the synthesized speech.
        r3  r   Nr4  r5  r!  )r	   r+  r   r   r"   r6  r   r   r   r9  r   r   r   r   )r}   r   r   r   r1   r1   r2   r:    s*    zRimeNonJsonTTSService.run_ttsr   c                    sB   t  |I dH }|rtd |  I dH  |  I dH  |S )r   Nz<Settings changed, reconnecting WebSocket with new parameters)rr   r   r	   r+  r   r   r   r   r1   r2   r     s   
z&RimeNonJsonTTSService._update_settingsr6   )+rJ   rK   rL   rM   rR   ro   rO   r
   r[   rN   r   r;  rQ   r   rs   r   r   r   r   r   r   r   r   r   r   r>  r   r,  r   r   r   r   r   r  r.  r    r   r:  r   r<  r   r   r?  r1   r1   r   r2   rX  e  sf   
 	
j
		$ &rX  )?rM   r'  r   dataclassesr   r   typingr   r   r   r   r   rV  logurur	   pydanticr
   pipecat.frames.framesr   r   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   pipecat.services.settingsr   r   r   pipecat.services.tts_servicer   r   r   r   pipecat.transcriptions.languager   r   'pipecat.utils.text.base_text_aggregatorr   'pipecat.utils.text.skip_tags_aggregatorr   (pipecat.utils.tracing.service_decoratorsr    websockets.asyncio.clientr!   r   websockets.protocolr"   ModuleNotFoundErrorr   r"  r   rN   r3   r4   rR   rS   r@  rX  r1   r1   r1   r2   <module>   sL   ,
      V