o
    oi	D                     @   sB  d Z ddlZddlZddlmZmZmZmZmZ ddl	m
Z
 ddlZddlZddlZddlm  mZ ddlZddlmZmZmZ ddlmZ ddlmZ ddlmZmZ e d	 e
G d
d dZ!G dd dZ"G dd dZ#de$de!de%fddZ&G dd dZ'				dde%de%dee% dee! dee% defddZ(dS )a6  
WavLM Audio Embedding Generator for HuggingFace Datasets
=========================================================

Clean OOP implementation for generating speaker embeddings using the WavLM model
and adding them as new columns to HuggingFace datasets with multi-GPU support.

Model: Orange/Speaker-wavLM-tbr
    N)ListDictAnyOptionalTuple)	dataclass)DatasetDatasetDictload_dataset)tqdm   )EmbeddingsModelcompute_embeddingignorec                   @   sZ   e Zd ZU dZdZeed< dZeed< dZ	e
ed< dZeed	< d
Zeed< dZeed< dS )EmbedderConfigz+Configuration for the WavLM audio embedder.zOrange/Speaker-wavLM-tbr
model_name>  	target_srg      >@max_audio_sec    
batch_sizewavlm_embeddingembedding_column_nameTuse_multiprocessingN)__name__
__module____qualname____doc__r   str__annotations__r   intr   floatr   r   r   bool r#   r#   E/home/ubuntu/kanitts-2-dataset-pipeline/utils/speaker_emb/embedder.pyr       s   
 r   c                   @   s^   e Zd ZdZddedefddZdeee	f d	e
ej fd
dZdejd	e
ej fddZdS )AudioProcessorzA
    Handles audio loading, preprocessing and normalization.
    r         9@r   r   c                 C   s   || _ || _t|| | _dS )z
        Initialize audio processor.

        Args:
            target_sr: Target sample rate in Hz
            max_audio_sec: Maximum audio duration in seconds
        N)r   r   r    target_samples)selfr   r   r#   r#   r$   __init__1   s   zAudioProcessor.__init__
audio_dictreturnc              
   C   s   z,|d }t |tjrt| }ntj|tjd}| dkr'|j	dd}| 
|W S  tyF } ztd|  W Y d}~dS d}~ww )aw  
        Process audio from HuggingFace dataset format.

        Note: Audio is expected to be already resampled to target_sr via
        dataset.cast_column('audio', Audio(sampling_rate=target_sr))

        Args:
            audio_dict: Dictionary with 'array' and 'sampling_rate' keys

        Returns:
            Processed audio array or None if processing fails
        arraydtyper   r   dimzAudio processing error: N)
isinstancenpndarraytorch
from_numpyr!   tensorfloat32r0   mean_finalize_waveform	Exceptionprint)r(   r*   audio_arraywaver#   r#   r$   process_audio=   s   zAudioProcessor.process_audior=   c                 C   sf   |  dkrdS |jd }|| jkr|d| j }| S || jk r/t|d| j| fdd}| S )z
        Truncate or pad waveform to target length.

        Args:
            wav: Audio waveform tensor

        Returns:
            Processed numpy array or None if empty
        r   Nconstant        )numelshaper'   Fpadnumpy)r(   r=   current_lenr#   r#   r$   r9   ^   s   



z!AudioProcessor._finalize_waveformN)r   r&   )r   r   r   r   r    r!   r)   r   r   r   r   r2   r3   r?   r4   Tensorr9   r#   r#   r#   r$   r%   ,   s
     !r%   c                   @   sF   e Zd ZdZdededefddZdd Zd	e	e
 d
ejfddZdS )WavLMWorkerzX
    Worker class for processing dataset chunks with WavLM model on a specific GPU.
    rankconfigaudio_processorc                 C   s"   || _ || _|| _d| _d| _dS )z
        Initialize WavLM GPU worker.

        Args:
            rank: GPU device index
            config: Embedder configuration
            audio_processor: Audio preprocessing instance
        N)rJ   rK   rL   devicemodel)r(   rJ   rK   rL   r#   r#   r$   r)   z   s
   
zWavLMWorker.__init__c              
   C   s   zt j| j t d| j | _W n# ty6 } ztd| j d|  t d| _W Y d}~nd}~ww td| j d t| j	j
| _| j| j  td| j d dS )z0Initialize WavLM model and move to assigned GPU.zcuda:[GPU z] Failed to set device: cpuNz] Loading WavLM model...z] WavLM model ready)r4   cuda
set_devicerJ   rM   r:   r;   WavLMEmbeddingsModelfrom_pretrainedrK   r   rN   toeval)r(   r>   r#   r#   r$   
initialize   s   zWavLMWorker.initializeaudio_dictsr+   c                 C   s  t | jj| jj }g }g }t|D ]k\}}z:|d }t|tjr)t	|
 }ntj|tjd}| dkr=|jdd}|jd |krJ|d| }|| W q ty} }	 z!td| j d| d	|	  |td || W Y d}	~	qd}	~	ww td
d |D }
g }|D ]}|jd }||
k rt|d|
| fdd}|| qt|| j}t  | |}W d   n1 sw   Y  |  }|D ]	}td||< q|S )a  
        Process a batch of audio samples with WavLM using true batching with dynamic padding.

        Uses dynamic padding: pads to the maximum length in the current batch (up to max_audio_sec),
        rather than always padding to max_audio_sec. This dramatically reduces computation for
        batches with shorter audio samples.

        Args:
            audio_dicts: List of audio dictionaries from dataset

        Returns:
            Numpy array of embeddings (batch_size, embedding_dim)
        r,   r-   r   r   r/   NrO   z"] Error processing audio at index z: c                 s   s    | ]}|j d  V  qdS )r   N)rC   ).0sigr#   r#   r$   	<genexpr>   s    z,WavLMWorker.process_batch.<locals>.<genexpr>r@   rA      )r    rK   r   r   	enumerater1   r2   r3   r4   r5   r!   r6   r7   r0   r8   rC   appendr:   r;   rJ   zerosmaxrD   rE   stackrU   rM   no_gradrN   rP   rF   )r(   rX   max_allowed_lengthprocessed_audioserror_indicesidxr*   r<   rZ   r>   batch_max_lengthbatch_tensorsrG   batch
embeddingsembeddings_npr#   r#   r$   process_batch   sF   

zWavLMWorker.process_batchN)r   r   r   r   r    r   r%   r)   rW   r   r   r2   r3   rl   r#   r#   r#   r$   rI   u   s    
rI   rJ   rK   audio_columnc                    s(  t jj}t|}|  g }tfddtdjD tj	ddt
  fdd}t
j|dd	}	|	  td
 dd}
ttD ]} }||}|D ]}|| qd|
t| qW|
  |	jdd ||f td dt| d dS )a  
    Worker process function for multi-GPU processing with data prefetching.

    Uses a background thread to load data asynchronously while GPU processes the current batch,
    eliminating data loading bottlenecks.

    Args:
        rank: GPU device index
        config: Embedder configuration
        dataset_shard: Subset of dataset to process
        audio_column: Name of audio column
        result_queue: Queue to store results
    c                    s$   g | ]}|t | j  qS r#   )minr   rY   i)rK   dataset_shardnum_samplesr#   r$   
<listcomp>   s    z&gpu_worker_process.<locals>.<listcomp>r      maxsizec               
      s   z;zD ]} |   } | qW n ty- } ztd d|  W Y d}~nd}~ww W   dS W   dS   w )z4Background thread that loads batches asynchronously.rO   z] Data loader thread error: N)putr:   r;   set)ri   rX   r>   )rm   batchesloading_completeprefetch_queuerJ   r#   r$   data_loader_thread  s    z.gpu_worker_process.<locals>.data_loader_threadTtargetdaemonzGPU )totaldescpositionleave      @timeoutrO   z] Completed processing z samplesN)r%   r   r   rI   rW   lenranger   queueQueue	threadingEventThreadstartr   getrl   r^   updateclosejoinrw   r;   )rJ   rK   rq   rm   result_queuerL   workerresultsr|   loader_threadpbar_rX   rj   embr#   )rm   ry   rK   rq   rz   rr   r{   rJ   r$   gpu_worker_process   s:   
r   c                   @   s   e Zd ZdZdefddZ			dded	ed
ee dedef
ddZ	ded	ede
e
e  fddZded	ede
e
e  fddZdS )WavLMEmbedderz
    Main class for generating WavLM speaker embeddings on HuggingFace datasets.

    Supports multi-GPU parallel processing with automatic batching and progress tracking.
    rK   c                 C   s<   || _ t|j|jd| _tj rtj | _	dS d| _	dS )z
        Initialize the embedder.

        Args:
            config: Configuration object with model and processing parameters
        )r   r   r   N)
rK   r%   r   r   rL   r4   rQ   is_availabledevice_countnum_gpus)r(   rK   r#   r#   r$   r)   :  s   $zWavLMEmbedder.__init__audioNtraindatasetrm   output_path
split_namer+   c                 C   s   t dd  t d| dt| d t d| j d t d d ||jvr4td| d	|j | jjrD| jd
krD| ||}n| ||}|	| jj
|}|S )a  
        Add embedding column to a HuggingFace dataset.

        Args:
            dataset: HuggingFace Dataset object
            audio_column: Name of the column containing audio data
            output_path: Optional path to save the dataset locally
            split_name: Name of the split being processed (for logging)

        Returns:
            Dataset with added embedding column
        
z<============================================================zProcessing z split with z	 exampleszUsing z GPU(s)zColumn 'z+' not found in dataset. Available columns: r   )r;   r   r   column_names
ValueErrorrK   r   _multi_gpu_embed_single_gpu_embed
add_columnr   )r(   r   rm   r   r   rj   r#   r#   r$   embed_datasetJ  s    

zWavLMEmbedder.embed_datasetc                    s   t djj}|  g }tfddtdjjD tjdd fdd}t	j
|dd	}|  td
d}ttD ]} }	||	}
||
  |t|	 qH|  |jdd |S )z
        Generate embeddings using single GPU with data prefetching.

        Args:
            dataset: Dataset to process
            audio_column: Name of audio column

        Returns:
            List of embeddings
        r   c                    s"   g | ]}|t |jj  fqS r#   )rn   rK   r   ro   )rr   r(   r#   r$   rs     s    z3WavLMEmbedder._single_gpu_embed.<locals>.<listcomp>rt   ru   c               
      sh   zD ]\} }| | }|  } | qW dS  ty3 } ztd|  W Y d}~dS d}~ww )z2Background thread for loading data asynchronously.zData loader thread error: N)rw   r:   r;   )	start_idxend_idxri   rX   r>   )rm   batch_indicesr   r{   r#   r$   r|     s   z;WavLMEmbedder._single_gpu_embed.<locals>.data_loader_threadTr}   zGenerating embeddings)r   r   r   r   )rI   rK   rL   rW   r   r   r   r   r   r   r   r   r   r   rl   extendtolistr   r   r   )r(   r   rm   r   rj   r|   r   r   r   rX   batch_embeddingsr#   )rm   r   r   rr   r{   r(   r$   r   t  s(   
zWavLMEmbedder._single_gpu_embedc                 C   s*  t |}|| j d | j }g }t| jD ]}|| }t|| |}||k r1||t|| qtd}	|	 }
g }tt |D ]}|	j	t
|| j|| ||
fd}|  || qCi }tt |D ]}|
 \}}|||< qf|D ]}|  qug }tt |D ]	}|||  qdd |D S )z
        Generate embeddings using multiple GPUs in parallel.

        Args:
            dataset: Dataset to process
            audio_column: Name of audio column

        Returns:
            List of embeddings in original order
        r   spawn)r~   argsc                 S   s$   g | ]}t |tjr| n|qS r#   )r1   r2   r3   r   )rY   r   r#   r#   r$   rs     s    z2WavLMEmbedder._multi_gpu_embed.<locals>.<listcomp>)r   r   r   rn   r^   selectmpget_contextr   Processr   rK   r   r   r   r   )r(   r   rm   rr   
shard_sizeshardsrp   r   r   
mp_contextr   	processesrJ   presults_dictr   rj   all_embeddingsr#   r#   r$   r     s>   


zWavLMEmbedder._multi_gpu_embed)r   Nr   )r   r   r   r   r   r)   r   r   r   r   r   r!   r   r   r#   r#   r#   r$   r   3  s&    
*":r   r   dataset_namer   splitr+   c                 C   s   |du rt  }td|  d t| |d}t|}t|trBt }| D ]\}}	|r2| d| nd}
|j|	||
|d||< q%|S |j||||pJddS )a  
    Convenience function to process a HuggingFace dataset.

    Args:
        dataset_name: Name of the dataset on HuggingFace Hub
        audio_column: Name of the column containing audio data
        output_path: Optional path to save processed dataset
        config: Optional custom configuration
        split: Specific split to process (e.g., 'train', 'test') or None for all

    Returns:
        Processed dataset with embeddings
    NzLoading dataset: z...)r   /)rm   r   r   r   )r   r;   r
   r   r1   r	   itemsr   )r   rm   r   rK   r   r   embedder	processedr   split_datasetsplit_outputr#   r#   r$   process_dataset  s,   
r   )r   NNN))r   oswarningstypingr   r   r   r   r   dataclassesr   r   r   r4   torch.nn.functionalnn
functionalrD   rF   r2   datasetsr   r	   r
   r   torch.multiprocessingmultiprocessingr   	spk_wavLMr   rS   r   filterwarningsr   r%   rI   r    r   r   r   r   r#   r#   r#   r$   <module>   sN   

IsK 6