o
    }oi                      @   sp   d dl mZmZ d dlZd dlZd dlmZ d dlm	Z	 d dl
mZmZmZ G dd de	ZG dd de	ZdS )	    )OptionalUnionN)NeuralModule)AcousticEncodedRepresentationLengthsType
NeuralTypec                       s   e Zd ZdZ						dded	ed
edee dededef fddZe	dd Z
e	dd ZdejdejfddZdejdejfddZdejdejfddZ  ZS )RandomBlockMaskinga  
    Performs random block masking on sequence of features.
    Args:
        mask_prob (float): percentage of sequence to mask
        block_size (int): size of each block to mask
        mask_value (Optional[float]): value to use for masking, if None, use random values
        feat_in (Optional[int]): size of input features, required if mask_value is None
        freeze (bool): if True, mask embedding is not trainable
        allow_overlap (bool): if True, masked blocks can overlap
          ?0   NTF皙?feat_in	mask_prob
block_size
mask_valuefreezeallow_overlapmax_mask_ratioc                    s   t    || _|| _|| _|| _|d u r)tt	|| _
tjj| j
ddd ntjt|| dd| _
|r>|   d S d S )Ng        g?)meanstdF)requires_grad)super__init__r   r   r   r   nn	ParametertorchFloatTensormask_embeddinginitnormal_onesr   )selfr   r   r   r   r   r   r   	__class__ d/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/asr/modules/ssl_modules/masking.pyr   &   s   

zRandomBlockMasking.__init__c                 C   s   t dt t tdt dS )z)Returns definitions of module input typesBDTr&   input_featsinput_lengths)r   r   tupler   r    r#   r#   r$   input_types>   s   
zRandomBlockMasking.input_typesc                 C   s   t dt t dt dS )z*Returns definitions of module output typesr%   )maksed_featsmasks)r   r   r-   r#   r#   r$   output_typesF   s   

zRandomBlockMasking.output_typesr*   r+   c                 C   s   | j r	| ||S | ||S )y  
        Args:
            input_feats (Tensor): input sequence features, shape=(batch, features, time)
            input_length (Tensor): length of each sequence in the batch, shape=(batch)
        Returns:
            masked_feats (Tensor): masked features, shape=(batch, features, time)
            masks (Tensor): the generated masks, shape=(batch, features, time)
        )r   forward_with_overlapforward_without_overlap)r    r*   r+   r#   r#   r$   forwardN   s   	zRandomBlockMasking.forwardc                 C   sP  | d}| jd}t|}| }t|D ]}| j|| | j kr,d}d}	dg}
nNt	|| | j
 | j  }	tjd| jd|jdd }| j}|	d | j || krbtj|| |	d dd}tj|| |dd}tj|d |jdd	|	 }
t|	D ]$}|
| | | }|| }d
||d	d	||f< |||d	d	||f< q~q||fS )r2   r         )r8   devicetrunc)rounding_modeN      ?)sizer   	unsqueezer   
zeros_likecloneranger   r   ceilr   intrandintr:   divrandperm)r    r*   r+   
batch_sizer   r0   r/   ir   num_patches	patch_idxoffsetmax_num_patchesjstartendr#   r#   r$   r4   \   s.   
	
z*RandomBlockMasking.forward_without_overlapc                 C   s  | d}| jd}t|}| }t|D ]m}| j|| | j kr,d}d}	dg}
n/| j}|| 	 
  }tjtd|| j | j}	tjtd|| j |jd}
|
d|	 }
t|	D ]%}|
| }t|| || }d||dd||f< |||dd||f< q_q||fS )r2   r   r6   r7   r8   r9   Nr=   )r>   r   r?   r   r@   rA   rB   r   r   detachcpunumpynprandombinomialmaxr   rG   r:   min)r    r*   r+   rH   r   r0   r/   rI   curr_block_sizerJ   patch_idicescurr_lenrN   rO   rP   r#   r#   r$   r3   ~   s*   
	
z'RandomBlockMasking.forward_with_overlap)r	   r
   NTFr   )__name__
__module____qualname____doc__rD   floatr   boolr   propertyr.   r1   r   Tensorr5   r4   r3   __classcell__r#   r#   r!   r$   r      s<    

"r   c                       s`   e Zd ZdZdejdeejef ddf fddZdd	 Z	d
e
fddZdd Zdd Z  ZS )ConvFeatureMaksingWrapperzl
    A wrapper module that applies masking to the features after subsampling layer of ConformerEncoder.
    pre_encode_modulemasking_modulereturnNc                    s,   t    || _|| _d| _d| _d| _dS )z
        Args:
            pre_encode_module: the pre_encode module of the ConformerEncoder instance
            masking_module: the module that performs masking on the extracted features
        NF)r   r   
pre_encodemasking	curr_mask	curr_feat
apply_mask)r    rf   rg   r!   r#   r$   r      s   

z"ConvFeatureMaksingWrapper.__init__c                 C   sr   | j ||d\}}| | _| jr-|dd}| j||d\}| _|dd }||fS |}t|| _||fS )z?
        Same interface as ConformerEncoder.pre_encode
        )xlengthsr8      r)   )	ri   rQ   rl   rm   	transposerj   rk   r   r@   )r    rn   ro   featsmasked_featsr#   r#   r$   r5      s   
z!ConvFeatureMaksingWrapper.forwardrm   c                 C   s
   || _ d S N)rm   )r    rm   r#   r#   r$   set_masking_enabled   s   
z-ConvFeatureMaksingWrapper.set_masking_enabledc                 C      | j S rt   )rk   r-   r#   r#   r$   get_current_mask      z*ConvFeatureMaksingWrapper.get_current_maskc                 C   rv   rt   )rl   r-   r#   r#   r$   get_current_feat   rx   z*ConvFeatureMaksingWrapper.get_current_feat)r\   r]   r^   r_   r   Moduler   r   r   r5   ra   ru   rw   ry   rd   r#   r#   r!   r$   re      s    &re   )typingr   r   rS   rT   r   torch.nnr   nemo.core.classesr   nemo.core.neural_typesr   r   r   r   re   r#   r#   r#   r$   <module>   s    