o
    }oikN                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZ eee	 eee	  ee	 eee	  f ZeG dd dZ eG dd dZ!dd Z"G dd deZ#dS )    N)ABCabstractmethod)	dataclass)AnyDictListOptionalTupleUnion)
DataLoader)tqdm)audio_rttm_mapget_uniqname_from_filepath)PostProcessingParamsload_postprocessing_from_yaml)move_data_to_device)loggingc                   @   s   e Zd ZU dZdZeej ed< dZeej ed< dZ	e
ed< dZee ed< dZeed	< d
Zeed< dZee ed< dZee ed< dS )InternalDiarizeConfigzHInternal diarization configuration parameters for diarization inference.NdevicedtypeFtraining_modelogging_level        dither_valuer   pad_to_valuetemp_dirmanifest_filepath)__name__
__module____qualname____doc__r   r   torch__annotations__r   r   boolr   r   r   floatr   intr   strr    r'   r'   a/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/asr/parts/mixins/diarization.pyr   "   s   
 r   c                   @   sz   e Zd ZU dZdZeed< dZeed< dZ	eed< dZ
ee ed< d	Zeed
< dZeed< dZeed< dZee ed< dS )DiarizeConfigz3Configuration parameters for diarization inference.session_len_sec   
batch_sizenum_workersNpostprocessing_yamlTverboseFinclude_tensor_outputspostprocessing_params	_internal)r   r   r   r    r+   r$   r"   r-   r%   r.   r/   r   r&   r0   r#   r1   r2   r   r3   r   r'   r'   r'   r(   r)   5   s   
 r)   c              	   C   s6   t | |r
t| |S td| d| d|  d |S )a  
    Utility function to get a value from the diarization config.
    If the value is not present in the diarization config, the default value is returned.

    Args:
        diarcfg: A dataclass that represents the diarization config.
        key: The name of the arg to retrieve.
        default: The default value to return if the key is not present in the diarization config.

    Returns:
        The value of the key in the diarization config or the default value.
    zUsing default value of z for zE because it is not present                 in the diarization config .)hasattrgetattrr   debug)diarcfgkeydefaultr'   r'   r(   !get_value_from_diarization_configE   s   

r;   c                   @   sL  e Zd ZdZdd Ze 						d(deee	e e
jef ded	ed
ee dededee defddZdee fddZde	e de	eeeeef f  fddZdeee	e f defddZdefddZde	e dededeeef fddZededefdd Zed!efd"d#Zededefd$d%Zdefd&d'ZdS ))SpkDiarizationMixina  
    An abstract class for diarize-able models.

    Creates a template function `diarize()` that provides an interface to perform transcription of audio tensors or
    filepaths.

    The following abstract classes must be implemented by the subclass:

        - `_setup_diarize_dataloader()`:
            Setup the dataloader for diarization. Receives the output from
            `_diarize_input_manifest_processing()`.

        - `_diarize_forward()`:
            Implements the model's custom forward pass to return outputs that are processed by
            `_diarize_output_processing()`.

        - `_diarize_output_processing()`:
            Implements the post processing of the model's outputs to return the results to
            the user. The result can be a list of objects, list of list of objects, tuple of objects, tuple of list of
            objects, or a dict of list of objects.

    c                 C   s
   i | _ d S )N)_diarize_audio_rttm_map)selfr'   r'   r(   __init__t   s   
zSpkDiarizationMixin.__init__r,   FNaudior-   r1   r/   r.   r0   override_configreturnc              	   K   sz  |du rt |}	td||||||	d|}
nt|ds td|jdu r)t |_|}
|
jdu r5t |
_n
t|
jts?tdd}zq| j||
d}|D ]d}t|tr^|du rXg }|	| qKt|t
r|du rpt
dd |D }t|d	 trt|D ]\}}|| 	| q{qKt|t|krtd
t| dt| dt|D ]\}}|| | qqKW |S  ty   Y |S w )zG
        Takes paths to audio files and returns speaker labels
        N)r-   r.   r0   r1   r/   r2   r3   y`diarize_cfg must have an `_internal` argument, which must be of an object of type InternalDiarizeConfig or its subclass.Z`diarize_cfg._internal` must be of an object of type InternalDiarizeConfig or its subclass)rA   c                 S   s   g | ]}g qS r'   r'   ).0_r'   r'   r(   
<listcomp>   s    z/SpkDiarizationMixin.diarize.<locals>.<listcomp>r   z&The number of elements in the result (z3) does not match the results of the current batch (z).r'   )r   r)   r5   
ValueErrorr3   r   
isinstancediarize_generatorlistextendtuple	enumeratelenRuntimeErrorappendStopIteration)r>   r@   r-   r1   r/   r.   r0   rA   config_kwargsr2   diarize_cfgresults	generatorprocessed_outputsiprocessed_outputr'   r'   r(   diarizew   sn   






 zSpkDiarizationMixin.diarizec              	   c   sj   |du rt  }t|dstd|jdu rt |_n
t|jts%td|}z| || t e}||j_	t|t
sC| ||}n|}t|drN|j}nd}tt|d| dD ]5\}}t||jj}t| j ||j |d	 |j  }	| |}
| |
|	|}|V  ~~
~tj  qZW d   n1 sw   Y  W | | dS W | | dS | | w )
z<
        A generator version of `diarize` function.
        Nr3   rC   rD   r0   T	Diarizing)descdisabler,   )r)   r5   rH   r3   r   rI   _diarize_on_begintempfileTemporaryDirectoryr   r   _diarize_input_processingr0   rN   r   r   r   rK   r=   keysr-   _diarize_forward_diarize_output_processingr!   cudaempty_cache_diarize_on_end)r>   r@   rA   rT   tmpdir
dataloaderr0   	batch_idx
test_batchuniq_idspred_outputsrW   r'   r'   r(   rJ      sN   






""z%SpkDiarizationMixin.diarize_generatoraudio_filesc                 C   s4   i }|D ]}t |}||ddddd}|||< q|S )z
        Generate manifest style dict if `audio` is a list of paths to audio files.

        Args:
            audio_files: A list of paths to audio files.

        Returns:
            audio_rttm_map_dict A list of manifest style dicts.
        r   N-infer)uniq_idaudio_filepathoffsetdurationtextlabel)r   )r>   rn   audio_rttm_map_dict
audio_filerq   entryr'   r'   r(   _input_audio_to_rttm_processing  s   

z3SpkDiarizationMixin._input_audio_to_rttm_processingr8   c                 C   s  |du ri S t |tr|g}t |trt|dkri S t|ddd}|du r6t|ddd}t|t d }t|dr>||_	| j
|j_t| drzt| jdrat| jjd	ra| jjj|j_d
| jj_t| jdrzt| jjdrz| jjj|j_d| jj_|   t |j_ttj dS )a  
        Internal function to setup the model for diarization. Perform all setup and pre-checks here.

        Args:
            audio (Union[str, List[str]]): Of type `GenericDiarizationType`
            diarcfg (DiarizeConfig): An instance of `DiarizeConfig`.
        Nr   r.   r,   )r:   r-   preprocessor
featurizerditherr   pad_to)rI   r&   rK   rO   r;   minos	cpu_countr5   r.   trainingr3   r   r{   r|   r}   r   r~   r   evalr   get_verbosityr   set_verbosityWARNING)r>   r@   r8   r.   _batch_sizer'   r'   r(   r^   *  s.   





z%SpkDiarizationMixin._diarize_on_beginc           	      C   s   t |ttfrt|dkrtdn|g}t |d trnt|dkr)|d ds0|d drQ|d |j_t	|d | _
g }| j
 D ]\}}||d  qDnt|}| j|d| _
|jj}| |||}| |}|S tdt|d  d	)
a  
        Internal function to process the input audio data and return a DataLoader. This function is called by
        `diarize()` and `diarize_generator()` to setup the input data for diarization.

        Args:
            audio: Of type `GenericDiarizationType`
            diarcfg: The diarization config dataclass. Subclasses can change this to a different dataclass if needed.

        Returns:
            A DataLoader object that is used to iterate over the input audio data.
        r   zInput `audio` is emptyr,   z.jsonz.jsonlrr   )rn   Input `audio` is of type z8. Only `str` (path to audio file) is supported as input.)rI   rK   rM   rO   rH   r&   endswithr3   r   r   r=   itemsrQ   rz   r   "_diarize_input_manifest_processing_setup_diarize_dataloadertype)	r>   r@   r8   rn   rq   	meta_dicttmp_dir	ds_configtemp_dataloaderr'   r'   r(   ra   Z  s,   (
z-SpkDiarizationMixin._diarize_input_processingr   c                 C   s   t tj|dddd<}|D ]1}t|tr'|ddd}|t|d  qt|t	r7|t|d  qt
d	t| d
W d   n1 sKw   Y  |t|dd|t|d|jt|ddd}|S )a  
        Internal function to process the input audio filepaths and return a config dict for the dataloader.

        Args:
            audio_files: A list of string filepaths for audio files.
            temp_dir: A temporary directory to store intermediate files.
            diarcfg: The diarization config dataclass. Subclasses can change this to a different dataclass if needed.

        Returns:
            A config dict that is used to setup the dataloader for diarization.
        zmanifest.jsonwzutf-8)encodingi  )rr   rt   ru   
r   zC. Only `str` (path to audio file) or `dict` are supported as input.Nr-   r,   r+   r.   )paths2audio_filesr-   r   r+   r.   )openr   pathjoinrI   r&   writejsondumpsdictrH   r   r;   r+   )r>   rn   r   r8   fprx   ry   r   r'   r'   r(   r     s&   



z6SpkDiarizationMixin._diarize_input_manifest_processingconfigc                 C      dS )a  
        Internal function to setup the dataloader for diarization. This function is called by
        `diarize()` and `diarize_generator()` to setup the input data for diarization.

        Args:
            config: A config dict that is used to setup the dataloader for diarization.
                It can be generated by `_diarize_input_manifest_processing()`.

        Returns:
            A DataLoader object that is used to iterate over the input audio data.
        Nr'   )r>   r   r'   r'   r(   r        z-SpkDiarizationMixin._setup_diarize_dataloaderbatchc                 C   r   )a  
        Internal function to perform the model's custom forward pass to return outputs that are processed by
        `_diarize_output_processing()`.
        This function is called by `diarize()` and `diarize_generator()` to perform the model's forward pass.

        Args:
            batch: A batch of input data from the data loader that is used to perform the model's forward pass.

        Returns:
            The model's outputs that are processed by `_diarize_output_processing()`.
        Nr'   )r>   r   r'   r'   r(   rc     r   z$SpkDiarizationMixin._diarize_forwardc                 C   r   )a  
        Internal function to process the model's outputs to return the results to the user. This function is called by
        `diarize()` and `diarize_generator()` to process the model's outputs.

        Args:
            outputs: The model's outputs that are processed by `_diarize_forward()`.
            uniq_ids: List of unique recording identificators in batch
            diarcfg: The diarization config dataclass. Subclasses can change this to a different dataclass if needed.

        Returns:
            The output can be a list of
            objects, list of list of objects, tuple of objects, tuple of list of objects.
            Its type is defined in `GenericDiarizationType`.
        Nr'   )r>   outputsrl   r8   r'   r'   r(   rd     s   z.SpkDiarizationMixin._diarize_output_processingc                 C   s   | j |jjd t| dr5t| jdr!t| jjdr!|jj| jj_t| jdr5t| jjdr5|jj| jj_	|jj
durDt|jj
 dS dS )z
        Internal function to teardown the model after transcription. Perform all teardown and post-checks here.

        Args:
            diarcfg: The diarization config dataclass. Subclasses can change this to a different dataclass if needed.
        )moder{   r|   r}   r~   N)trainr3   r   r5   r{   r|   r   r}   r   r~   r   r   r   )r>   r8   r'   r'   r(   rg     s   
z#SpkDiarizationMixin._diarize_on_end)r,   FNr,   FN) r   r   r   r    r?   r!   inference_moder
   r&   r   npndarrayr   r%   r#   r   r)   GenericDiarizationTyperZ   rJ   r   r$   rz   r^   ra   r   r   r   r   rc   rd   rg   r'   r'   r'   r(   r<   \   s^    
X*B0,

%r<   )$r   r   r_   abcr   r   dataclassesr   typingr   r   r   r   r	   r
   numpyr   r!   torch.utils.datar   r   .nemo.collections.asr.parts.utils.speaker_utilsr   r   *nemo.collections.asr.parts.utils.vad_utilsr   r   "nemo.collections.common.data.utilsr   
nemo.utilsr   r   r   r)   r;   r<   r'   r'   r'   r(   <module>   s*    (