o
    in                  	   @   s  d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZmZmZmZ ddlZddlZzddlmZ W nA ey   e
e ZdZejgeej D ]Zed d Ze rle eZ nqZer{eej!vr{ej!"de ddlmZ Y nw dd	l#m$Z$m%Z% e&e'Z(d
e de)de)fddZ*dd
e de+de+de+fddZ,dd
e de-de-de-fddZ.eG dd dZ/G dd dZ0dS )z
Spark TTS BiCodec Decoder

Decodes BiCodec audio tokens (semantic + global) to audio waveforms.
Replaces SNAC decoder for Spark TTS model.
    N)	dataclass)Path)DictListOptionalTuple)BiCodecTokenizerexternalsparktts)BICODEC_TOKENIZER_PATHAUDIO_SAMPLE_RATEnamedefaultreturnc                 C   s&   t j| d}|s|S |  dv S )N >   1onyestrue)osenvirongetstriplower)r   r   raw r   ;/home/ubuntu/veenaModal/veena3modal/core/bicodec_decoder.py	_env_flag-   s   r      minimumc                 C   D   t j| d}|s|S zt|}W n ty   | Y S w t||S Nr   )r   r   r   int
ValueErrormaxr   r   r   r   valuer   r   r   _env_int4      
r'           c                 C   r    r!   )r   r   r   floatr#   r$   r%   r   r   r   
_env_float?   r(   r+   c                   @   sH   e Zd ZU ee ed< ee ed< eed< eed< ejed< e	ed< dS )_BatchDecodeRequestsemantic_ids
global_idsuse_sliding_windowtrim_warmupfuturesubmitted_atN)
__name__
__module____qualname__r   r"   __annotations__boolasyncioFuturer*   r   r   r   r   r,   J   s   
 
r,   c                   @   s  e Zd ZdZdedfdededefddZd	ee	 d
ee	 de
ej fddZd	ee	 d
ee	 de
e fddZd	ee	 d
ee	 de
e fddZed
ee	 dee	 fddZdeeee	 ee	 eef  dee
e  fddZdejdejfddZd/de
ej de	fddZdejdejde	ddfdd Zde	fd!d"Zdeeef fd#d$Zd	ee	 d
ee	 defd%d&Zd	ee	 defd'd(Z 		d0d	ee	 d
ee	 d)ed*ede
e f
d+d,Z!		d0d	ee	 d
ee	 d*ed)ede
e f
d-d.Z"dS )1BiCodecDecodera  
    BiCodec Decoder for Spark TTS.
    
    Decodes semantic and global tokens to audio waveforms using BiCodec.
    This replaces the SNAC decoder used in the old Indic Orpheus model.
    
    Supports streaming with sliding window approach (like SNAC).
    cudaFdevice
model_pathenable_batchingc                 C   s  t t j r	|nd| _|| _t|}tddrd}tddr"d}|o)| jjdk| _t	ddd	d
| _
tdddd
d | _t	dd	d	d
| _t	dddd
| _tjdd  | _| jdvrad| _t | _i | _i | _i | _d| _d| _d| _d| _d| _d| _ d| _!d| _"d| _#d| _$d| _%d| _&d	| _'t(d| d t(d| j  t)|t*| jd| _+| j| j+_| j+j,-| j t(dt. d | jrt(d| j d| j d| j d | j
 d!| jd" d#d$ dS dS )%a  
        Initialize BiCodec decoder.
        
        Args:
            device: Device for BiCodec model (cuda/cpu)
            model_path: Path to BiCodec model checkpoint
            enable_batching: Enable async batching for streaming (not used yet)
        cpu VEENA3_BICODEC_BATCHING_FORCE_ONFT!VEENA3_BICODEC_BATCHING_FORCE_OFFr;   VEENA3_BICODEC_BATCH_MAX   r   )r   VEENA3_BICODEC_BATCH_TIMEOUT_MSg      @g?     @@VEENA3_BICODEC_BATCH_WORKERS"VEENA3_BICODEC_BATCH_SCALE_PENDINGr   VEENA3_BICODEC_BATCH_SCALE_MODEsticky>   rI   dynamicr)   Nu*   🎵 Loading BiCodec audio tokenizer from z...z   Device: )r<   u.   ✅ BiCodec decoder initialized (sample rate: zHz)u+      ℹ️  Micro-batching enabled (workers=z, scale_mode=z, scale_pending=z, max_batch=z, timeout_ms=i  z.1f))/torchr<   r;   is_availabler=   r7   r   typer>   r'   batch_max_sizer+   batch_timeout_sbatch_worker_countbatch_worker_scale_pendingr   r   r   r   r   batch_worker_scale_mode	threadingLock_batch_map_lock_batch_queues_batch_workers_batch_scaled_loops_batch_total_batches_batch_total_requests_batch_max_seen_batch_queue_wait_ms_total_batch_queue_wait_ms_min_batch_queue_wait_ms_max_batch_compute_ms_total_batch_compute_ms_min_batch_compute_ms_max_batch_queue_depth_total_batch_queue_depth_max_batch_inflight_requests_batch_workers_target_lastprintr   straudio_tokenizermodeltor   )selfr<   r=   r>   requested_batchingr   r   r   __init__^   sj   





zBiCodecDecoder.__init__r-   r.   r   c           	   
   C   sZ  |r|st dt| dt|  dS d}t||krNt d| dt|  t||kr<|d| }t d|  n|dg|t|   }t d	|  z<t| d| j}t| d| j}t  | j	
||}W d   |W S 1 sw   Y  |W S  ty } zt d
|  ddl}|  W Y d}~dS d}~ww )uT  
        Decode BiCodec tokens to audio waveform.
        
        CRITICAL: BiCodec expects EXACTLY 32 global tokens!
        The decoder pools them via speaker_encoder → d_vector, then broadcasts
        via d_vector.unsqueeze(-1) across the time dimension.
        
        Args:
            semantic_ids: List of semantic token IDs (variable length)
            global_ids: List of global token IDs (MUST be exactly 32!)
        
        Returns:
            Audio waveform as numpy array (float32, 16kHz mono)
            Shape: (samples,)
            Returns None if decode fails
        u$   ⚠️  Empty token lists: semantic=z	, global=N    u   ❌ BiCodec requires EXACTLY z global tokens, got       └─ Truncated to r         └─ Padded to u   ❌ BiCodec decode error: )rg   lenrL   tensorlong	unsqueezerk   r<   inference_moderi   
detokenize	Exception	traceback	print_exc)	rl   r-   r.   EXPECTED_GLOBAL_TOKENSpred_semanticpred_globalwav_npery   r   r   r   decode   s<   

zBiCodecDecoder.decodec                 C   s>   |  ||}|du rdS t|dd}|d tj}| S )a6  
        Decode BiCodec tokens to audio bytes (int16 PCM).
        
        Args:
            semantic_ids: List of semantic token IDs
            global_ids: List of global token IDs
        
        Returns:
            Audio as bytes (int16 PCM, 16kHz mono)
            Returns None if decode fails
        N            ?  )r   npclipastypeint16tobytes)rl   r-   r.   audioaudio_int16r   r   r   decode_to_bytes   s   zBiCodecDecoder.decode_to_bytesc                    s"   t  }|d| j||I dH S )z
        Async wrapper for decode_to_bytes.

        Runs decode in the default executor so the event loop stays responsive
        under high request concurrency.
        N)r8   get_running_looprun_in_executorr   )rl   r-   r.   loopr   r   r   decode_to_bytes_async  s   
z$BiCodecDecoder.decode_to_bytes_asyncc                 C   s:   d}t | |krt| d | S t| dg|t |    S )Nro   r   )rr   list)r.   expectedr   r   r   _normalize_global_ids  s   z$BiCodecDecoder._normalize_global_idsbatch_itemsc                    sX  |sg S t |dkr|d \}}}} j||||dgS tdd |D r. fdd|D S tdd |D r@ fd	d|D S zt |}td
d |D }tj||ftj jd}tj|dftj jd}	g }
t|D ]0\}\}}}}t |}|
	| tj
|tj jd||d|f< tj
 |tj jd|	|< qlt   j|	|}W d   n1 sw   Y  g }t|
D ]R\}}t|dddkr|| n|}t|tjr|   }t|d}|d }|dkrt ||kr|d| }t|dd}|d tj}|	|  q|W S  ty+   td  fdd|D  Y S w )z
        Run a single batched BiCodec forward pass for multiple stream decode requests.

        Falls back to sequential decode_streaming if shapes or settings are not batch-safe.
        r   r   r/   r0   c                 s   s    | ]	\}}}}|V  qd S Nr   ).0_use_swr   r   r   	<genexpr>,  s    z>BiCodecDecoder._decode_streaming_batch_sync.<locals>.<genexpr>c                    &   g | ]\}}}} j ||||d qS r   decode_streamingr   semglobr   trimrl   r   r   
<listcomp>-      
z?BiCodecDecoder._decode_streaming_batch_sync.<locals>.<listcomp>c                 s   s$    | ]\}}}}t |d k V  qdS )   Nrr   r   r   r   r   r   r   r   3  s   " c                    r   r   r   r   r   r   r   r   4  r   c                 s   s     | ]\}}}}t |V  qd S r   r   r   r   r   r   r   ;  s    )dtyper<   ro   Nndim@  r   r   r   z@BiCodec batched decode failed; falling back to sequential decodec                    r   r   r   r   r   r   r   r   i  r   )rr   r   anyr$   rL   zerosrt   r<   	enumerateappendrs   r   rv   ri   rw   getattr
isinstanceTensordetachr?   numpyr   asarrayreshaper   r   r   r   rx   logger	exception)rl   r   r   r   r   r   
batch_sizemax_sem_lensemantic_batchglobal_batchsemantic_lengthsidxr-   r.   r   sem_len	wav_batchresultsr   audio_npexpected_samplesr   r   r   r   _decode_streaming_batch_sync  s|   	






z+BiCodecDecoder._decode_streaming_batch_syncr   c                 C   sZ  t |}| j | j|}| j|g }dd |D }|d u r1tjt| jd dd}|| j|< | j	}| j	dkri| j
dkri| | j }| jdkrb| j|d	}|pW|| j
k}	|	| j|< |	sad}n|| j
k rid}t||k rt|}
|j| |||
d
| d|
 d}|| t||k sot|| _|| j|< |W  d    S 1 sw   Y  d S )Nc                 S   s   g | ]}|  s|qS r   done)r   workerr   r   r   r   s  s    z7BiCodecDecoder._ensure_batch_worker.<locals>.<listcomp>r      )maxsizer   r   rI   Fzbicodec-batch-worker--)r   )idrV   rW   r   rX   r8   Queuer$   rO   rQ   rR   qsizere   rS   rY   rr   create_task_batch_workerr   rf   )rl   r   keyqueueworkerslive_workerstarget_workerspending_nowalready_scaledshould_scale
worker_idxr   r   r   r   _ensure_batch_workern  s@   






$z#BiCodecDecoder._ensure_batch_workerNr   c                 C   sr   | j dkrdS | jdkr| j S | j | j}W d    n1 s w   Y  |d ur/|| 7 }|| jkr7| j S dS )Nr   r   )rQ   rR   rV   re   r   )rl   r   r   r   r   r   _desired_worker_count  s   


z$BiCodecDecoder._desired_worker_countr   c                    s  	 | j dkr | |}|| _||kr tt| jdI d H  q| I d H }|g}| | j }t	|| j
k rh||  }|dkrCn%ztj| |dI d H }	W n
 tjy[   Y nw ||	 t	|| j
k s8dd |D }
t   fdd|D }| }|  j|7  _|| jkr|| _|D ]}|  j|7  _| jd u s|| jk r|| _|| jkr|| _q| j |  jt	|7  _W d    n1 sw   Y  t }zz.|d | j|
I d H }t	|t	|krtd	t||D ]\}	}|	j s|	j| qW n4 tjy	     ty3 } zt !d
| |D ]}	|	j s'|	j"| qW Y d }~nd }~ww W t | d }|  j#|7  _#| j$d u sP|| j$k rS|| _$|| j%kr\|| _%|  j&d7  _&|  j't	|7  _'t	|| j(kryt	|| _(| j t)d| jt	| | _W d    n	1 sw   Y  ndt | d }|  j#|7  _#| j$d u s|| j$k r|| _$|| j%kr|| _%|  j&d7  _&|  j't	|7  _'t	|| j(krt	|| _(| j t)d| jt	| | _W d    w 1 sw   Y  w q)NTrJ   gMb`?r   )timeoutc                 S   s    g | ]}|j |j|j|jfqS r   )r-   r.   r/   r0   r   reqr   r   r   r         z0BiCodecDecoder._batch_worker.<locals>.<listcomp>c                    s    g | ]}t d  |j d qS )r)   rE   )r$   r2   r   batch_startr   r   r     r   z!batch decode result size mismatchzBiCodec batch worker %d failedrE   r   )*rS   r   rf   r8   sleepminrP   r   timerr   rO   wait_forTimeoutErrorr   perf_counterr   rc   rd   r]   r^   r_   rV   re   r   r   RuntimeErrorzipr1   r   
set_resultCancelledErrorrx   r   r   set_exceptionr`   ra   rb   rZ   r[   r\   r$   )rl   r   r   r   r   firstbatchdeadline	remainingr   batch_inputsqueue_wait_msqueue_depth_nowwait_mst0r   resultexc
compute_msr   r   r   r     s   









 zBiCodecDecoder._batch_workerc                 C   s`   | j sdS | j tdd | j D }ttd|| j W  d    S 1 s)w   Y  d S )Nr   c                 s   s    | ]}|  V  qd S r   )r   )r   r   r   r   r   r     s    z6BiCodecDecoder.get_pending_requests.<locals>.<genexpr>)r>   rV   sumrW   valuesr"   r$   re   )rl   pendingr   r   r   get_pending_requests  s   $z#BiCodecDecoder.get_pending_requestsc                 C   sT  t d| j}t d| j}| j tdd | j D }W d    n1 s&w   Y  i d| jr2dnddt| jdt| jd	t| j| d
t| j	dt| j
dt|dt| jdt| jd| jdt| j| dt| jpwddt| jdt| j| dt| jpddt| jdt| j| t| jt|  dS )Nr   c                 s   s&    | ]}|D ]	}|  sd V  qqdS )r   Nr   )r   r   r   r   r   r   r     s    z4BiCodecDecoder.get_batching_stats.<locals>.<genexpr>enabledr   r)   batch_total_batchesbatch_total_requestsbatch_avg_sizebatch_max_seenbatch_workers_configuredbatch_workers_livebatch_workers_targetrR   rS   batch_queue_wait_ms_avgbatch_queue_wait_ms_minbatch_queue_wait_ms_maxbatch_compute_ms_avgbatch_compute_ms_minbatch_compute_ms_maxbatch_queue_depth_avg)batch_queue_depth_maxbatch_pending_now)r$   rZ   r[   rV   r   rX   r   r>   r*   r\   rQ   rf   rR   rS   r]   r^   r_   r`   ra   rb   rc   rd   r   )rl   batchesreqsworkers_liver   r   r   get_batching_stats  sZ   






	



z!BiCodecDecoder.get_batching_statsc                 C   sh   |st d dS |st d dS t|dk r!t dt|  dS t|dk r2t dt|  dS dS )z
        Validate BiCodec tokens before decoding.
        
        Args:
            semantic_ids: List of semantic token IDs
            global_ids: List of global token IDs
        
        Returns:
            True if valid, False otherwise
        u   ❌ No semantic tokensFu   ❌ No global tokensr   u   ❌ Too few semantic tokens: u   ❌ Too few global tokens: T)rg   rr   )rl   r-   r.   r   r   r   validate_tokens  s   zBiCodecDecoder.validate_tokensc                 C   s   t |d }|t S )z
        Estimate audio duration from semantic tokens.
        
        Args:
            semantic_ids: List of semantic token IDs
        
        Returns:
            Estimated duration in seconds
        r   )rr   r   )rl   r-   estimated_samplesr   r   r   get_audio_duration2  s   z!BiCodecDecoder.get_audio_durationr/   r0   c                 C   s(  d}t ||k rtdt | d|  dS d}t ||krRtdt | d|  t ||kr?|d| }td| d	 n|d
g|t |   }td| d	 | ||}|du r^dS |rt |dkrt |}td|d }	||	 d }
|
|	 }||
| }t|dd}|d tj}| S )a  
        Decode BiCodec tokens with streaming support.
        
        CRITICAL: BiCodec decoder expects EXACTLY 32 global tokens always!
        The decoder internally broadcasts d_vector via d_vector.unsqueeze(-1).
        
        From sparktts/models/bicodec.py line 184-186:
            d_vector = self.speaker_encoder.detokenize(global_tokens)  # Expects 32 tokens
            x = self.prenet(z_q, d_vector)
            x = x + d_vector.unsqueeze(-1)  # Broadcasts across time automatically!
        
        Args:
            semantic_ids: List of semantic token IDs (variable length, 50 TPS)
            global_ids: List of global token IDs (MUST be exactly 32 tokens!)
            use_sliding_window: If True, return only middle samples
            trim_warmup: Legacy parameter (not used for BiCodec)
        
        Returns:
            Audio bytes (int16 PCM) or None if decode fails
        r   u!   ⚠️  Too few semantic tokens (z) for decode, need >= Nro   u   ⚠️  WARNING: Got z global tokens, expected rp   z tokensr   rq   i      r   r   r   )	rr   rg   r   r   r   r   r   r   r   )rl   r-   r.   r/   r0   MIN_SEMANTIC_TOKENSr{   r   total_sampleskeep_samplesstartendr   r   r   r   r   B  s0   zBiCodecDecoder.decode_streamingc           	         s|   t  }| jr0|s0| |}| }tt|t||||t d}|	|I dH  |I dH S |
d| j||||I dH S )a  
        Async wrapper for streaming decode - runs in executor to avoid blocking event loop.
        
        OPTIMIZATION: Previously ran synchronously, blocking all concurrent coroutines
        during GPU decode (~20-50ms). Now uses run_in_executor to unblock the event loop
        so other streams can progress while this decode runs on GPU.
        
        Args:
            semantic_ids: List of semantic token IDs
            global_ids: List of global token IDs  
            trim_warmup: Legacy parameter (not used)
            use_sliding_window: Use sliding window mode
        
        Returns:
            Audio bytes (int16 PCM) or None
        )r-   r.   r/   r0   r1   r2   N)r8   r   r>   r   create_futurer,   r   r   r   putr   r   )	rl   r-   r.   r0   r/   r   r   r1   r   r   r   r   decode_single_async  s.   



z"BiCodecDecoder.decode_single_asyncr   )FF)#r3   r4   r5   __doc__r   rh   r7   rn   r   r"   r   r   ndarrayr   bytesr   r   staticmethodr   r   r   r8   AbstractEventLoopr   r   r   r   r   r   r*   r  r  r  r   r  r   r   r   r   r:   T   s    
J
?



R"
O 
Jr:   )r   )r)   )1r  r8   loggingr   sysrT   r   dataclassesr   pathlibr   typingr   r   r   r   r   r   rL   sparktts.models.audio_tokenizerr   ImportError__file__resolve
_this_file_sparktts_pathparentr   parents_p
_candidateis_dirrh   pathinsertveena3modal.core.constantsr   r   	getLoggerr3   r   r7   r   r"   r'   r*   r+   r,   r:   r   r   r   r   <module>   sF    
	