o
    iR                     @   s  d Z ddlZddlZddlm  mZ ddlm	Z	 ddl
Z
ddlmZ ddlmZmZ ddlmZmZmZ ddlZG dd deZd	d
 ZG dd dejjZG dd dejjZG dd dejjZG dd dejjZG dd dejjZG dd dejjZG dd dejjZdS )zSubsampling layer definition.    N)PositionalEncoding)sequence_mask)sub_factor_to_params
pad_to_len)OptionalTupleUnionc                       s    e Zd ZdZ fddZ  ZS )TooShortUttErrorzRaised when the utt is too short for subsampling.

    Args:
        message (str): Message for error catch
        actual_size (int): the short size that cannot pass the subsampling
        limit (int): the limit size for subsampling

    c                    s   t  | || _|| _dS )z/Construct a TooShortUttError for error handler.N)super__init__actual_sizelimit)selfmessager   r   	__class__ _/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/transformer/utils/subsampling.pyr      s   
zTooShortUttError.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r	      s    	r	   c                 C   s\   t | tr|dk rdS t | tr|dk rdS t | tr!|dk r!dS t | tr,|dk r,dS d	S )
z4Check if the utterance is too short for subsampling.   )Tr      )Tr      )Tr      )Tr   )F)
isinstanceConv2dSubsampling2Conv2dSubsamplingConv2dSubsampling6Conv2dSubsampling8)inssizer   r   r   check_short_utt$   s   r%   c                       2   e Zd ZdZd	 fdd	Zdd Zdd Z  ZS )
r    Convolutional 2D subsampling (to 1/4 length).

    Args:
        idim (int): Input dimension.
        odim (int): Output dimension.
        dropout_rate (float): Dropout rate.
        pos_enc (torch.nn.Module): Custom position encoding layer.

    Nc              
      s   t t|   tjtjd|ddtj tj||ddtj | _tjtj	||d d d d  ||dur>|nt
||| _dS )&Construct an Conv2dSubsampling object.   r      N)r
   r    r   torchnn
SequentialConv2dReLUconvLinearr   outr   idimodimdropout_ratepos_encr   r   r   r   <       
zConv2dSubsampling.__init__c                 C   s   | d}| |}| \}}}}| |dd |||| }|du r,|dfS ||dddddddf dddddddf fS )  Subsample x.

        Args:
            x (torch.Tensor): Input tensor (#batch, time, idim).
            x_mask (torch.Tensor): Input mask (#batch, 1, time).

        Returns:
            torch.Tensor: Subsampled tensor (#batch, time', odim),
                where time' = time // 4.
            torch.Tensor: Subsampled mask (#batch, 1, time'),
                where time' = time // 4.

        r)   r*   N	unsqueezer0   r$   r2   	transpose
contiguousviewr   xx_maskbctfr   r   r   forwardJ      

$8zConv2dSubsampling.forwardc                 C      |dkrt d| j| S zGet item.

        When reset_parameters() is called, if use_scaled_pos_enc is used,
            return the positioning encoding.

        r   z+Support only `-1` (for `reset_parameters`).NotImplementedErrorr2   r   keyr   r   r   __getitem__`      
zConv2dSubsampling.__getitem__Nr   r   r   r   r   rG   rO   r   r   r   r   r   r    1   
    
r    c                       r&   )
Conv2dSubsamplingPadr'   Nc                    s   t t|   tjtjjd|ddddtj tjj||ddddtj | _tjtj	||d d d d  ||durB|nt
||| _tjdd| _dS )	r(   r)   r   r*   )r   r   )paddingN)r              )r
   rT   r   r+   r,   r-   r.   r/   r0   r1   r   r2   ConstantPad1dpad_fnr3   r   r   r   r   w   s    zConv2dSubsamplingPad.__init__c           	      C   s   | dd}| |}| dd}|d}| |}| \}}}}| | dd |||| }|du r=|dfS tj	|dddddf dd}|d d d }|d d d }t
|d|j|d j}||dddddf fS )r9   r)   r*   Nr   r   dim)r=   rY   r<   r0   r$   r2   r>   r?   r+   sumr   dtypedevice)	r   rA   rB   rC   rD   rE   rF   x_lenmaskr   r   r   rG      s   


$ zConv2dSubsamplingPad.forwardc                 C   rI   rJ   rK   rM   r   r   r   rO      rP   z Conv2dSubsamplingPad.__getitem__rQ   rR   r   r   r   r   rT   l   s
    
rT   c                       r&   )
r   zConvolutional 2D subsampling (to 1/2 length).

    Args:
        idim (int): Input dimension.
        odim (int): Output dimension.
        dropout_rate (float): Dropout rate.
        pos_enc (torch.nn.Module): Custom position encoding layer.

    Nc              
      s   t t|   tjtjd|ddtj tj||ddtj | _tjtj	||d d d  ||dur<|nt
||| _dS )z'Construct an Conv2dSubsampling2 object.r)   r   r*   N)r
   r   r   r+   r,   r-   r.   r/   r0   r1   r   r2   r3   r   r   r   r      s   
zConv2dSubsampling2.__init__c                 C   s   | d}| |}| \}}}}| |dd |||| }|du r,|dfS ||dddddddf dddddddf fS )a  Subsample x.

        Args:
            x (torch.Tensor): Input tensor (#batch, time, idim).
            x_mask (torch.Tensor): Input mask (#batch, 1, time).

        Returns:
            torch.Tensor: Subsampled tensor (#batch, time', odim),
                where time' = time // 2.
            torch.Tensor: Subsampled mask (#batch, 1, time'),
                where time' = time // 2.

        r)   r*   Nr:   r;   r@   r   r   r   rG      rH   zConv2dSubsampling2.forwardc                 C   rI   rJ   rK   rM   r   r   r   rO      rP   zConv2dSubsampling2.__getitem__rQ   rR   r   r   r   r   r      rS   r   c                       *   e Zd ZdZd fdd	Zdd Z  ZS )r!   zConvolutional 2D subsampling (to 1/6 length).

    Args:
        idim (int): Input dimension.
        odim (int): Output dimension.
        dropout_rate (float): Dropout rate.
        pos_enc (torch.nn.Module): Custom position encoding layer.

    Nc              
      s   t t|   tjtjd|ddtj tj||ddtj | _tjtj	||d d d d  ||dur>|nt
||| _dS )z'Construct an Conv2dSubsampling6 object.r)   r   r*      N)r
   r!   r   r+   r,   r-   r.   r/   r0   r1   r   r2   r3   r   r   r   r      r8   zConv2dSubsampling6.__init__c                 C   s   | d}| |}| \}}}}| |dd |||| }|du r,|dfS ||dddddddf dddddddf fS )a  Subsample x.

        Args:
            x (torch.Tensor): Input tensor (#batch, time, idim).
            x_mask (torch.Tensor): Input mask (#batch, 1, time).

        Returns:
            torch.Tensor: Subsampled tensor (#batch, time', odim),
                where time' = time // 6.
            torch.Tensor: Subsampled mask (#batch, 1, time'),
                where time' = time // 6.

        r)   r*   Nr:   r   r;   r@   r   r   r   rG     rH   zConv2dSubsampling6.forwardrQ   r   r   r   r   r   rG   r   r   r   r   r   r!      s    
r!   c                       ra   )r"   zConvolutional 2D subsampling (to 1/8 length).

    Args:
        idim (int): Input dimension.
        odim (int): Output dimension.
        dropout_rate (float): Dropout rate.
        pos_enc (torch.nn.Module): Custom position encoding layer.

    Nc                    s   t t|   tjtjd|ddtj tj||ddtj tj||ddtj | _tjtj	||d d d d d d  ||durN|nt
||| _dS )z'Construct an Conv2dSubsampling8 object.r)   r   r*   N)r
   r"   r   r+   r,   r-   r.   r/   r0   r1   r   r2   r3   r   r   r   r   %  s   (
zConv2dSubsampling8.__init__c                 C   s   | d}| |}| \}}}}| |dd |||| }|du r,|dfS ||dddddddf dddddddf dddddddf fS )a  Subsample x.

        Args:
            x (torch.Tensor): Input tensor (#batch, time, idim).
            x_mask (torch.Tensor): Input mask (#batch, 1, time).

        Returns:
            torch.Tensor: Subsampled tensor (#batch, time', odim),
                where time' = time // 8.
            torch.Tensor: Subsampled mask (#batch, 1, time'),
                where time' = time // 8.

        r)   r*   Nr:   r;   r@   r   r   r   rG   5  s   

$PzConv2dSubsampling8.forwardrQ   rd   r   r   r   r   r"     s    
r"   c                       sF   e Zd ZdZ		ddedef fddZdefd	d
Zdd Z  Z	S )Conv1dSubsamplingzConvolutional 1D subsampling (to 1/2 length).

    Args:
        idim (int): Input dimension.
        odim (int): Output dimension.
        dropout_rate (float): Dropout rate.
        pos_enc (torch.nn.Module): Custom position encoding layer.

    stride_conv!seq2seq/proj_encoder/downsampling!tf2torch_tensor_name_prefix_torchtf2torch_tensor_name_prefix_tfc                    sN   t t|   tj||||| _tj|d| _|| _	|| _
|| _|| _d S )NrW   )r
   re   r   r+   r,   Conv1dr0   rX   rY   strider5   rh   ri   )r   r4   r5   kernel_sizerk   padrh   ri   r   r   r   r   W  s   

zConv1dSubsampling.__init__returnc                 C   s   | j S rQ   )r5   )r   r   r   r   output_sizei  s   zConv1dSubsampling.output_sizec                 C   s`   | dd}| |}tj| |dd}| dd}|du r#|dfS |d | j d }||fS )zSubsample x.r)   r*   rW   )negative_slopeN)r=   rY   F
leaky_relur0   rk   )r   rA   r_   r   r   r   rG   l  s   
zConv1dSubsampling.forward)rf   rg   )
r   r   r   r   strr   intro   rG   r   r   r   r   r   re   L  s    re   c                       s   e Zd ZdZ				ddedeeef ded	ed
edee ddf fddZ	de
jdee
j dee
j dee
je
jf fddZde
jde
jfddZde
jde
jfddZdedefddZ  ZS )StreamingConvInputa  Streaming ConvInput module definition.
    Args:
        input_size: Input size.
        conv_size: Convolution size.
        subsampling_factor: Subsampling factor.
        vgg_like: Whether to use a VGG-like network.
        output_size: Block output dimension.
    rV   Tr   N
input_size	conv_sizesubsampling_factorvgg_likeconv_kernel_sizero   rn   c                    s2  t    |r|dkrz|\}}tjtjjd||d|d d dtj tjj|||d|d d dtj tjdtjj|||d|d d dtj tjj|||d|d d dtj tjd
| _||d d  }	d| _	d| _
| j| _n|\}}t|d }
tjtjjd||d|d d dtj tjj|||d|d d dtj tj|
dftjj|||d|d d dtj tjj|||d|d d dtj tjd
| _||d d  }	|| _	| j| _|
| _
n|dkr8tjtjd|dddgddgtj tj|||ddgddgtj | _||d d d d  }	|| _	|| _d| _| j| _nAt||\}}}tjtjd|ddddgtj tj|||||d d dgtj | _|| }	|| _	|| _|| _| j| _|| _d| _|d	urtj|	|| _|| _d	S d	| _|	| _d	S )
zConstruct a ConvInput object.r)   r*   )rk   rU   )r)   r*   )r*   r*   r   r   r   N)r
   r   r+   r,   r-   r.   r/   	MaxPool2dr0   rx   stride_1create_new_vgg_maskcreate_new_maskrt   kernel_2stride_2create_new_conv2d_maskr   ry   min_frame_lengthr1   outputro   )r   rv   rw   rx   ry   rz   ro   
conv_size1
conv_size2output_projkernel_1r   r   conv_2_output_sizer   r   r   r     s   







%





%


	


zStreamingConvInput.__init__rA   r`   
chunk_sizec                    st  |dur|  |}t|dd}| \}}}|d}|dur^t|| j t	t
||| j    t fdd|}t|}tj|dd} || j  }||| d|| j |}| |}| \}	}
}	}|dur|dd |d|
| ddd|ddf }n|dd |d|
| }| jdur| |}||ddd|f ddd|df fS )	a'  Encode input sequences.
        Args:
            x: ConvInput input sequences. (B, T, D_feats)
            mask: Mask of input sequences. (B, 1, T)
        Returns:
            x: ConvInput output sequences. (B, sub(T), D_out)
            mask: Mask of output sequences. (B, 1, sub(T))
        Nr   r)   c                    s   t |  dS )Nr)   )r   )inputsmax_input_lengthr   r   <lambda>=  s    z,StreamingConvInput.forward.<locals>.<lambda>rZ   r*   r   )r~   maxeqr\   r$   r<   rt   rx   mathceilfloatmaplistr+   stackr?   r0   r=   r>   r   )r   rA   r`   r   olensrC   rE   rF   N_chunks_rD   r   r   r   rG   %  s4   


6

.zStreamingConvInput.forwardc                 C   s   | j dkrK|d|d| j d   }|ddd|f dddd| j d f }|d|dd  }|ddd|f dddddf }|S |}|S )zCreate a new mask for VGG output sequences.
        Args:
            mask: Mask of input sequences. (B, T)
        Returns:
            mask: Mask of output sequences. (B, sub(T))
        r)   r*   N)rx   r$   )r   r`   
vgg1_t_len
vgg2_t_lenr   r   r   r}   P  s   
,&z&StreamingConvInput.create_new_vgg_maskc                 C   s8   | j dkr|dddddf dddd| jf S |S )zCreate new conformer mask for Conv2d output sequences.
        Args:
            mask: Mask of input sequences. (B, T)
        Returns:
            mask: Mask of output sequences. (B, sub(T))
        r)   Nr*   )rx   r   )r   r`   r   r   r   r   b  s   
*z)StreamingConvInput.create_new_conv2d_maskr$   c                 C   s
   || j  S )zReturn the original size before subsampling for a given size.
        Args:
            size: Number of frames after subsampling.
        Returns:
            : Number of frames before subsampling.
        )rx   )r   r$   r   r   r   get_size_before_subsamplingn  s   
z.StreamingConvInput.get_size_before_subsampling)rV   Tr   N)r   r   r   r   rt   r   r   boolr   r   r+   TensorrG   r}   r   r   r   r   r   r   r   ru   {  sD    
 !
+ru   ) r   numpynpr+   torch.nn.functionalr,   
functionalrq   #funasr.models.transformer.embeddingr   loggingfunasr.models.scama.utilsr   *funasr.models.transformer.utils.nets_utilsr   r   typingr   r   r   r   	Exceptionr	   r%   Moduler    rT   r   r!   r"   re   ru   r   r   r   r   <module>   s&   ;C;02/