o
    eii5                     @   s   d Z ddlZddlZddlZddlmZ ddlm  mZ	 ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ G d
d dejZG dd dejZG dd dejZdd Zdd Zdd ZdS )zComponents necessary to build a wav2vec 2.0 architecture following the
original paper: https://arxiv.org/abs/2006.11477.

Authors
* Rudolf A Braun 2022
* Guillermo Cambara 2022
* Titouan Parcollet 2022
    N)length_to_mask)ConvolutionFrontEnd)PositionalEncoding)Conv1d)	LayerNorm)GumbelVectorQuantizer)batch_pad_rightc                       sR   e Zd ZdZg dg dg dddf fdd	Zdd
dZdejfddZ  Z	S )W2VLatentExtractora  Convolution based feature extractor from raw audio.
    Channel numbers increasing is based on https://arxiv.org/abs/2109.06870

    Arguments
    ---------
    out_channels : list of ints
        Out channels of convolutional layers.
    kernel_sizes : list of ints
        Kernels of convolutional layers.
    strides : list of ints
        Strides of convolutional layers.
    dropout : float
        Dropout of CNN.
    conv_init : str
        Type of initialization to use, default "kaiming"

    Example
    -------
    >>> extractor = W2VLatentExtractor()
    >>> inputs = torch.rand(10, 5000)
    >>> outputs = extractor(inputs)
    >>> outputs.shape
    torch.Size([10, 14, 512])
    )   r
   r
   r
   r
   r
   r
   )      r   r   r   r   r   )      r   r   r   r   r   g        kaimingc                    s   t    t|t|  krt|ksJ  J t|}|| _|| _|d | _td|d|||dg| dg| ttj	t
|dd|d| _t
|d | _d S )N)Ni>     r   Fvalid)
num_blocksnum_layers_per_blockout_channelskernel_sizesstrides	dilations	residualsconv_module
activationnormdropout	conv_biaspadding	conv_init)super__init__lenr   r   out_dimr   r   nnGELUr   	extractorr   )selfr   r   r   r   r    r   	__class__ ^/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/speechbrain/lobes/models/wav2vec.pyr"   4   s0   
(
zW2VLatentExtractor.__init__Tc                 C   s8   |rt ||jdd }|d}| |}| |S )z$Calculates latents from audio input.r   Nr   )F
layer_normshape	unsqueezer'   r   )r(   xnormalize_signallatentsr+   r+   r,   forwardY   s
   


zW2VLatentExtractor.forwardinput_lengthsc                 C   s8   dd }t | j| jD ]
\}}||||}q|tjS )z2Calculates output lengths for given input lengths.c                 S   s   t | | | d S )Nr   )torchfloor)input_lengthkernel_sizestrider+   r+   r,   _conv_out_lengthd   s   z?W2VLatentExtractor.get_output_lengths.<locals>._conv_out_length)zipr   r   tor6   long)r(   r5   r;   r9   r:   r+   r+   r,   get_output_lengthsa   s   z%W2VLatentExtractor.get_output_lengths)T)
__name__
__module____qualname____doc__r"   r4   r6   
LongTensorr?   __classcell__r+   r+   r)   r,   r	      s    
%r	   c                       s4   e Zd ZdZddeddf fdd	Zdd	 Z  ZS )
W2VTargetQuantisera  Wraps ``nnet.quantiser.GumbelVectorQuantizer``, see for documentation on
    arguments.

    Arguments
    ---------
    in_dim : int
        Input dimension (channels).
    out_dim : int
        Output dimension
    quantiser : class
        Default GumbelVectorQuantizer
    num_vars : int
        Number of quantized vectors per group.
    temperature_decay : tuple
        Temperature for training. this should be a tuple of 3 elements: (start, stop, decay factor).

    Example
    -------
    >>> quantiser = W2VTargetQuantiser()
    >>> inputs = torch.rand(10, 12, 512)
    >>> output, meta = quantiser(inputs)
    >>> output.shape
    torch.Size([10, 12, 256])
    r
      i@  )g       @g      ?g;?c                    s.   t    ||||d|| _t||| _d S )Nr   )r!   r"   	quantiserr%   Linearproj)r(   in_dimr$   rH   num_varstemperature_decayr)   r+   r,   r"      s
   

zW2VTargetQuantiser.__init__c           	      C   s\   |  |}| |d }|d }|d }|d }|d }|| | }|||||d}||fS )z0Returns quantised targets plus meta information.r1   code_perplexityprob_perplexrL   temp)diversity_losscode_perplexrO   rL   rP   )rH   rJ   )	r(   r1   targetsrR   rO   rL   rP   rQ   metar+   r+   r,   r4      s   
zW2VTargetQuantiser.forward)r@   rA   rB   rC   r   r"   r4   rE   r+   r+   r)   r,   rF   l   s    rF   c                       s0   e Zd ZdZedf fdd	ZdddZ  ZS )	EncoderWrappera  A wrapper that adds positional information,
    masks the input and then runs the latent encoder.

    Arguments
    ---------
    in_dim : int
        Last dimension of input tensor.
    embedding_dim : int
        Dimension to project input to and that the latent encoder will use.
    latent_encoder : torch.nn.module
        Initialized latent encoder object.
    positional_encoding : torch.nn.module
        Uninitialized nn.module for adding positional information, will use ``embedding_dim``.
    dropout_encoder_input : float
        Dropout on encoder input.

    Example
    -------
    >>> from speechbrain.lobes.models.transformer.Transformer import TransformerEncoder
    >>> encoder = TransformerEncoder(d_model=768, num_layers=4, nhead=4, d_ffn=1024)
    >>> wrapper = EncoderWrapper(1024, 768, encoder)
    >>> inputs = torch.rand(10, 12, 1024)
    >>> outputs = wrapper(inputs)
    >>> outputs["embeddings"].shape
    torch.Size([10, 12, 768])
    g?c                    sR   t    t||| _|| _||| _t|| _tj	t
| dd| _d S )NT)requires_grad)r!   r"   r%   rI   input_projectorlatent_encoderpositional_encodingDropoutdropout_encoder_input	Parameterr6   FloatTensoruniform_mask_emb)r(   rK   embedding_dimrX   rY   r[   r)   r+   r,   r"      s   

zEncoderWrapper.__init__Nc           
      C   s   i }| d}| |}| |}|dur.| j|j||< | }||d< ||  |d< |dur@t	|| }t
|td }|| | }| j||d\}}	||d< |S )a  
        Arguments
        ---------
        latents : torch.Tensor, shape (B, T, C)
            Batch of latent representations (AKA frames) output from latent extractor.
        wav_lens : torch.Tensor, shape (B,)
            The actual (unpadded) relative lengths for each sample of the batch (0<wav_lens<1).
        padding_mask : torch.Tensor, shape (B, T,)
            Can be provided instead of wav_lens.
        mask : torch.Tensor, shape (B, T)
            Boolean mask which decides which latent frames will be masked.

        Returns
        -------
        results : dict
            Has the following terms:
                "num_masked" : number of masked terms
                "ratio_masked" : ratio of masked terms
                "embeddings" : features
        r   N
num_maskedratio_maskeddtype)src_key_padding_mask
embeddings)sizerW   r[   r_   r=   rd   sumnumelr6   roundr   boolrY   rX   )
r(   r3   wav_lenspadding_maskmaskresultsTra   feats_r+   r+   r,   r4      s$   



zEncoderWrapper.forward)NNN)r@   rA   rB   rC   r   r"   r4   rE   r+   r+   r)   r,   rU      s     rU   c                    s$  | \}}t |}t|| t t  d }g }t|D ].}	||	 }
tjj|
 |dd t fddtt D  |	t
  |
k   qt||fd}| }t|D ]2\}	}t||k r|t| }t||	 }t||}tjj||dd}d||	|f< d||	|f< q]|S )a>  This creates the boolean mask for a target shape which respects
    the sample lengths and will half roughly ``mask_prob`` entries set to
    ``True``.

    Arguments
    ---------
    shape : list of ints, like (N, M)
        Shape of boolean mask to return.
    sample_lens: list of ints
        Absolute lengths of per sample lengths.
    mask_prob : float
        Percentage to mask.
    mask_length: int
        Length of contiguous subsequence to mask.

    Returns
    -------
    mask : numpy.ndarray
        Boolean mask with shape of input argument ``shape``.
    r   F)replacec                    s&   g | ]}t D ]} | | qqS r+   )range).0joffsetmask_indicesmask_lengthr+   r,   
<listcomp>)  s    
z compute_mask.<locals>.<listcomp>T)minintfloatrandomrt   npchoiceasarrayr#   appenduniquefull	enumeratearangedelete)r/   sample_lens	mask_probrz   bspadded_sample_lenmin_sample_lennum_mask	mask_idcsi
sample_lenrn   num_mask_totalmask_idcnum_mask_missingr   extra_indcsr+   rx   r,   compute_mask  s<   

	r   c           	      C   s   | j \}}}|d }t , t|dd| }tjd|||| fd}|||k  d7  < W d   n1 s=w   Y  |t|d|  }| d|} | |d }|||||	dddd}|S )a  Samples negatives from target tensor y.

    Arguments
    ---------
    y : torch.Tensor
        Tensor of shape (B, T, C)
    num_neg : int
        Number of negatives to sample.

    Returns
    -------
    negs : torch.Tensor
        Negatives in shape (N, B, T, C)
    r   r   r   )lowhighrg   Nr   r   )
r/   r6   no_gradr   r0   expandflattenrandintviewpermute)	ynum_negBrp   Cr   rS   	neg_indcsnegsr+   r+   r,   sample_negativesD  s   
r   c                 C   s   g g }}g }| D ]#}| |d  |d }| | |t|d}	| |	  q	t|}
t|\}}t|}t|
|f|||}t|t|tj|tj	dfS )a  This creates a batch from a list of samples and also creates
    the boolean mask that will be used to mask the inputs of the latent
    encoder. To create the mask we need to know the output shape after the
    latent extractor, therefore the argument `get_out_len_fn`.
    One could also create masks per sample (when loading the audio file) and
    then collate them but at that time one doesn't know the length of the
    shortest sample in the batch (which determines the number of masked frames)
    so it's better this way.

    Arguments
    ---------
    samples_lst : list
        List of samples returned by the audio_pipeline.
    get_out_len_fn : function
        Function that calculates length of sample after it passes through feature extractor.
    mask_prob : float
        Approximate percentage of frames to mask.
    mask_length : int
        Number of contiguous frames that will be masked.

    Returns
    -------
    wavs_padded : torch.Tensor, shape (B, T)
        Audio arrays with right-sided padding.
    wav_lens : torch.Tensor, shape (B,)
        For each sample the percentage of the array that is not padding.
    mask : torch.Tensor, shape (B, T)
        Boolean mask to mask frames.
    idsigr   rc   )
r   r6   	as_tensorrg   itemr#   r   maxr   rk   )samples_lstget_out_len_fnr   rz   wav_lstlatent_length_lstidssampler   latent_lengthr   wavs_paddedrl   batch_time_lenrn   r+   r+   r,   w2v_mask_collate_fnb  s.   


r   )rC   r   numpyr   r6   torch.nnr%   torch.nn.functional
functionalr-   speechbrain.dataio.dataior   $speechbrain.lobes.models.convolutionr   0speechbrain.lobes.models.transformer.Transformerr   speechbrain.nnet.CNNr   speechbrain.nnet.normalizationr   speechbrain.nnet.quantisersr   speechbrain.utils.data_utilsr   Moduler	   rF   rU   r   r   r   r+   r+   r+   r,   <module>   s&    	R;ZC