o
    3NPi2                  
   @   s>  d Z ddlZddlZddlmZmZmZmZmZ ddl	m
Z
mZ ddlZddlZedZdadadd Zdeejeeef d	ee fd
dZdejded	eeeef  fddZd#ded	eeeef  fddZd$dee ded	ee fddZ	d%dee deded	ee fddZ	d&dee ded ed	efd!d"ZdS )'an  
Voice Activity Detection with TRUE parallelism + Compute Monitoring.

=== OPTIMIZATION (v6.2) ===
Key improvements:
- Persistent workers: Load Silero model ONCE per worker via initializer
- Memory-efficient: Accept numpy array directly (no file re-read)
- Adaptive worker count based on detected CPU cores
- Optimized for 0.4s event detection (min_speech_ms = 200)
    N)ListDictTupleOptionalUnion)ProcessPoolExecutoras_completedzFastPipelineV6.VADc            	      C   s
  d} t | D ]|}ztjjdddddd\aaW  dS  ty } z]|| d k rHd	d
|  }ddl}ddl}|	dd	}|
||  W Y d}~qddl}|jd}|j|rsztjj|ddddd\aaW W Y d}~ dS    Y td|  d| dd}~ww dS )a  
    Worker initializer - loads Silero VAD model ONCE per worker process.
    
    === OPTIMIZATION (v6.2) ===
    Previously: Each worker loaded model for EVERY chunk (~200MB x 32 workers = 6.4GB!)
    Now: Model loaded ONCE per worker at startup, reused for all chunks
    
    === EDGE CASE HANDLING (v6.7) ===
    - Retry logic for network errors when multiple workers start
    - Graceful fallback to cached model
    - Exponential backoff on failures
    
    Benefit: ~50% VAD speedup, 80% less memory
       snakers4/silero-vad
silero_vadFTrepo_or_dirmodelforce_reloadonnx
trust_repoN         ?   r   z-~/.cache/torch/hub/snakers4_silero-vad_masterlocal)r   r   sourcer   r   zFailed to load VAD model after z attempts. Last error: z:. Try running the main script first to download the model.)rangetorchhubload
_vad_model
_vad_utils	Exceptiontimerandomuniformsleepospath
expanduserexistsRuntimeError)	max_retriesattempte	wait_timer   r   jitterr"   	cache_dir r-   M/home/ubuntu/.cursor/worktrees/maya3data__SSH__216.81.248.184_/zxg/src/vad.py_init_vad_worker   sR   

r/   argsreturnc                 C   s   | \}}}}t }t\}}}}}t| }||||dd|dd|dd|dd|dd	d
|d	}	g }
|	D ]}||d  }||d  }|
t|t|t|| d q;|
S )z
    Worker function for parallel VAD processing.
    
    Uses globally initialized model (loaded via _init_vad_worker).
    Accepts tuple for map() compatibility.
    vad_thresholdr   vad_min_speech_ms   vad_min_silence_msvad_window_size_samplesi   vad_speech_pad_ms   T)	thresholdmin_speech_duration_msmin_silence_duration_mswindow_size_samplesspeech_pad_msreturn_secondssampling_ratestartendr@   rA   duration)r   r   r   
from_numpyfloatgetappend)r0   audio_chunksample_rateconfig_dictchunk_startr   get_speech_timestamps_
wav_tensorspeech_timestampssegmentstsglobal_start
global_endr-   r-   r.   _vad_workerZ   s2   






rT   waveform_nprI   c                    sj  t | | }td|j d t }ztjjdddddd W n ty; } zt	d|  W Y d	}~nd	}~ww t
|j| }g }|j|j|j|j|jd
}tdt | |D ]}	| |	|	|  }
|	| }||
|||f qZtdt | d|j d g }d}t|jtd`  fddt|D }t|D ]H}z,|jdd}|| |d7 }|tdt |d  dkrtd| dt | d W q ty } ztd|  W Y d	}~qd	}~ww W d	   n1 sw   Y  |jdd d t|dd}tdd  |D }t | }td!|d"d#t | d$|d"d% td&|| d"d'|j  |S )(a*  
    Run Silero VAD on pre-loaded audio buffer with persistent workers.
    
    === OPTIMIZATION (v6.2) ===
    - Accepts numpy array directly (no file re-read)
    - Uses initializer to load model ONCE per worker
    
    === EDGE CASE HANDLING (v6.7) ===
    - Pre-download model before starting workers to avoid race conditions
    
    Args:
        waveform_np: Audio samples as numpy array (mono)
        sample_rate: Sample rate
        config: Pipeline configuration
    
    Returns:
        List of speech segments with start/end/duration
    u#   🎤 Running Silero VAD (parallel: z workers, persistent model)...r
   r   FTr   z8   Pre-download attempt failed (will retry in workers): N)r2   r3   r5   r6   r7   r   z   Processing z	 chunks (z
s each)...)max_workersinitializerc                    s   i | ]\}}  t||qS r-   )submitrT   ).0ir0   executorr-   r.   
<dictcomp>   s    z'run_vad_from_buffer.<locals>.<dictcomp>x   )timeoutr   
   z   VAD progress: /z chunkszVAD chunk failed: c                 S   s   | d S )Nr@   r-   )xr-   r-   r.   <lambda>   s    z%run_vad_from_buffer.<locals>.<lambda>)key皙?)	merge_gapc                 s   s    | ]}|d  V  qdS )rC   Nr-   )rY   sr-   r-   r.   	<genexpr>   s    z&run_vad_from_buffer.<locals>.<genexpr>u	   ✅ VAD: z.1fzs | z segments | zs speechz   Speedup: zx realtime | Workers: )lenloggerinfovad_workersr   r   r   r   r   warningintvad_chunk_sizer2   r3   r5   r6   r7   r   rG   r   r/   	enumerater   resultextendmaxerrorsort_merge_adjacent_vad_segmentssum)rU   rI   configtotal_durationr@   r)   chunk_size_samples
chunk_argsrJ   rZ   
chunk_datachunk_start_timeall_segments	completedfuturesfuturerP   mergedtotal_speechelapsedr-   r[   r.   run_vad_from_buffer   sr   

&r   
audio_pathc                 C   sH   |durt |j|j|S ddl}|| \}}|d }t |||S )a  
    Run Silero VAD with TRUE parallelism and persistent workers.
    
    === OPTIMIZATION (v6.2) ===
    - If audio_buffer provided, uses pre-loaded numpy array (no file re-read)
    - Uses initializer to load Silero model ONCE per worker (not per chunk!)
    
    Args:
        audio_path: Path to audio file (used if audio_buffer is None)
        config: Pipeline configuration
        audio_buffer: Optional AudioBuffer with pre-loaded waveform
    
    Returns:
        List of speech segments with start/end/duration
    Nr   )r   rU   rI   
torchaudior   squeezenumpy)r   rx   audio_bufferr   waveformsrrU   r-   r-   r.   run_vad_parallel   s   r   re   rP   rf   c                 C   s   | sg S g }| d   }| dd D ](}|d |d  }||k r1|d |d< |d |d  |d< q|| |  }q|| |S )z
    Merge VAD segments that were split at chunk boundaries.
    
    Args:
        segments: Sorted list of VAD segments
        merge_gap: Maximum gap (seconds) to merge across
    r   r   Nr@   rA   rC   )copyrG   )rP   rf   r   currentseggapr-   r-   r.   rv      s   


rv   333333?vad_segmentsry   min_silencec                 C   s   g }| r| d d |kr| d| d d | d d d tt| d D ]!}| | d }| |d  d }|| }||krF| |||d q%| rf|| d d  |krf| | d d ||| d d  d |S )a  
    Find silence regions (gaps between VAD segments).
    
    Useful for:
    - Finding optimal chunk boundaries
    - Marking non-speech regions
    
    Args:
        vad_segments: List of speech segments from VAD
        total_duration: Total audio duration
        min_silence: Minimum silence duration to report
    
    Returns:
        List of silence segments with start/end/duration
    r   r@           rB   r   rA   )rG   r   ri   )r   ry   r   silencesrZ   	gap_startgap_endgap_durationr-   r-   r.   find_silence_boundaries  s2   


r         >@target_timewindowc           
      C   sr   |}d}t t| d D ]*}| | d }| |d  d }|| }|| d }	t|	| |kr6||kr6|}|	}q|S )u  
    Find the best silence point near target_time for chunking.
    
    Strategy: Find the largest gap within ±window of target_time.
    This ensures we don't cut in the middle of speech.
    
    Args:
        vad_segments: List of speech segments from VAD
        target_time: Ideal cut point
        window: Search window (seconds) around target
    
    Returns:
        Best cut point (center of largest gap, or target if no gaps)
    r   r   rA   r@   r   )r   ri   abs)
r   r   r   best_cutbest_gap_sizerZ   r   r   gap_size
gap_centerr-   r-   r.   find_optimal_cut_pointQ  s   r   )N)re   )r   )r   )__doc__r   loggingtypingr   r   r   r   r   concurrent.futuresr   r   r   npr   	getLoggerrj   r   r   r/   ndarrayrn   dictrE   rT   strr   r   rv   r   r   r-   r-   r-   r.   <module>   s>   
$?$, _ 
6