o
    iH[                     @   s  d Z ddlZddlmZmZmZ ddlZddlZdd Z	dd Z
dd	 Zd;ddZd<ddZd=ddZdd Zdd Zdededeeejf fddZG dd dejjZdd ZG dd deZd ed!ed"eeef fd#d$Zd ed%ed"eeeef fd&d'Z		d>d!ed(ed)ed*ejd"ejf
d+d,Zd-ejd"ejfd.d/Z 	
	d?d0ejd1ejd2ed3ed"eejejejejf f
d4d5Z!d6ejd7ed8efd9d:Z"dS )@zNetwork related utility tools.    N)DictListTuplec                 C   sN   t | tjjrt|  j}nt | tjr| j}n	tdt	|  |
|S )zSend tensor into the device of the module.

    Args:
        m (torch.nn.Module): Torch module.
        x (Tensor): Torch tensor.

    Returns:
        Tensor: Torch tensor located in the same place as torch module.

    z3Expected torch.nn.Module or torch.tensor, bot got: )
isinstancetorchnnModulenext
parametersdeviceTensor	TypeErrortypeto)mxr    r   ^/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/transformer/utils/nets_utils.py	to_device   s   
r   c                 C   sx   t | }tdd | D }| d j||g| d  dd R  |}t|D ]}| | ||d| | df< q(|S )  Perform padding for the list of tensors.

    Args:
        xs (List): List of Tensors [(T_1, `*`), (T_2, `*`), ..., (T_B, `*`)].
        pad_value (float): Value for padding.

    Returns:
        Tensor: Padded tensor (B, Tmax, `*`).

    Examples:
        >>> x = [torch.ones(4), torch.ones(2), torch.ones(1)]
        >>> x
        [tensor([1., 1., 1., 1.]), tensor([1., 1.]), tensor([1.])]
        >>> pad_list(x, 0)
        tensor([[1., 1., 1., 1.],
                [1., 1., 0., 0.],
                [1., 0., 0., 0.]])

    c                 s       | ]}| d V  qdS r   Nsize.0r   r   r   r   	<genexpr>5       zpad_list.<locals>.<genexpr>r      N)lenmaxnewr   fill_range)xs	pad_valuen_batchmax_lenpadir   r   r   pad_list    s   . r*   c              
      s&  t | }t | d j}g }t|D ] |t fdd| D  q| d j|g|R  |}t|D ]\ |dkrJ|   | d|   df< q4|dkrf|   | d|   dd|   df< q4|dkr|   | d|   dd|   dd|   df< q4td	||S )	r   r   c                 3   s    | ]}|  V  qd S Nr   r   r)   r   r   r   V   r   z#pad_list_all_dim.<locals>.<genexpr>r   N      zApad_list_all_dim only support 1-D, 2-D and 3-D tensors, not {}-D.)
r   shaper#   appendr    r!   r"   r   
ValueErrorformat)r$   r%   r&   num_dimmax_len_all_dimr(   r   r,   r   pad_list_all_dim>   s"    0@r5   c           
         s.   dkrt d t| ts|  } tt| }|du r/|du r)tt| }n| }n|du s5J |tt| ks?J t	j
d|t	jd}|d||}|| d}||k}|dur|d|ksqJ |d|f dk r{|    t fddt| D }	||	 ||j}|S )a  Make mask tensor containing indices of padded part.

    Args:
        lengths (LongTensor or List): Batch of lengths (B,).
        xs (Tensor, optional): The reference tensor.
            If set, masks will be the same shape as this tensor.
        length_dim (int, optional): Dimension indicator of the above tensor.
            See the example.

    Returns:
        Tensor: Mask tensor containing indices of padded part.
                dtype=torch.uint8 in PyTorch 1.2-
                dtype=torch.bool in PyTorch 1.2+ (including 1.2)

    Examples:
        With only lengths.

        >>> lengths = [5, 3, 2]
        >>> make_pad_mask(lengths)
        masks = [[0, 0, 0, 0 ,0],
                 [0, 0, 0, 1, 1],
                 [0, 0, 1, 1, 1]]

        With the reference tensor.

        >>> xs = torch.zeros((3, 2, 4))
        >>> make_pad_mask(lengths, xs)
        tensor([[[0, 0, 0, 0],
                 [0, 0, 0, 0]],
                [[0, 0, 0, 1],
                 [0, 0, 0, 1]],
                [[0, 0, 1, 1],
                 [0, 0, 1, 1]]], dtype=torch.uint8)
        >>> xs = torch.zeros((3, 2, 6))
        >>> make_pad_mask(lengths, xs)
        tensor([[[0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1]],
                [[0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1]],
                [[0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)

        With the reference tensor and dimension indicator.

        >>> xs = torch.zeros((3, 6, 6))
        >>> make_pad_mask(lengths, xs, 1)
        tensor([[[0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [1, 1, 1, 1, 1, 1]],
                [[0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1]],
                [[0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1]]], dtype=torch.uint8)
        >>> make_pad_mask(lengths, xs, 2)
        tensor([[[0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1],
                 [0, 0, 0, 0, 0, 1]],
                [[0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1],
                 [0, 0, 0, 1, 1, 1]],
                [[0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1],
                 [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)

    r   zlength_dim cannot be 0: {}Ndtyper6   c                 3   s(    | ]}|d  fv rt dndV  qdS r   )slice)r   r)   
length_dimr   r   r      s   & z make_pad_mask.<locals>.<genexpr>)r1   r2   r   listtolistintr   r    r   r   arangeint64	unsqueezeexpandr!   dimtupler#   	expand_asr   r   )
lengthsr$   r;   maxlenbs	seq_rangeseq_range_expandseq_length_expandmaskindr   r:   r   make_pad_maskh   s,   V
 rN   c                 C   s   t | || S )a  Make mask tensor containing indices of non-padded part.

    Args:
        lengths (LongTensor or List): Batch of lengths (B,).
        xs (Tensor, optional): The reference tensor.
            If set, masks will be the same shape as this tensor.
        length_dim (int, optional): Dimension indicator of the above tensor.
            See the example.

    Returns:
        ByteTensor: mask tensor containing indices of padded part.
                    dtype=torch.uint8 in PyTorch 1.2-
                    dtype=torch.bool in PyTorch 1.2+ (including 1.2)

    Examples:
        With only lengths.

        >>> lengths = [5, 3, 2]
        >>> make_non_pad_mask(lengths)
        masks = [[1, 1, 1, 1 ,1],
                 [1, 1, 1, 0, 0],
                 [1, 1, 0, 0, 0]]

        With the reference tensor.

        >>> xs = torch.zeros((3, 2, 4))
        >>> make_non_pad_mask(lengths, xs)
        tensor([[[1, 1, 1, 1],
                 [1, 1, 1, 1]],
                [[1, 1, 1, 0],
                 [1, 1, 1, 0]],
                [[1, 1, 0, 0],
                 [1, 1, 0, 0]]], dtype=torch.uint8)
        >>> xs = torch.zeros((3, 2, 6))
        >>> make_non_pad_mask(lengths, xs)
        tensor([[[1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0]],
                [[1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0]],
                [[1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)

        With the reference tensor and dimension indicator.

        >>> xs = torch.zeros((3, 6, 6))
        >>> make_non_pad_mask(lengths, xs, 1)
        tensor([[[1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [0, 0, 0, 0, 0, 0]],
                [[1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0]],
                [[1, 1, 1, 1, 1, 1],
                 [1, 1, 1, 1, 1, 1],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 0, 0, 0]]], dtype=torch.uint8)
        >>> make_non_pad_mask(lengths, xs, 2)
        tensor([[[1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0],
                 [1, 1, 1, 1, 1, 0]],
                [[1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 0, 0, 0]],
                [[1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0],
                 [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)

    )rN   )rF   r$   r;   r   r   r   make_non_pad_mask   s   VrO   c                 C   s^   |  dt|ksJ | jj|    |}t|D ]\}}| |d|f ||d|f< q|S )a}  Mask tensor according to length.

    Args:
        xs (Tensor): Batch of input tensor (B, `*`).
        lengths (LongTensor or List): Batch of lengths (B,).
        fill (int or float): Value to fill masked part.

    Returns:
        Tensor: Batch of masked input tensor (B, `*`).

    Examples:
        >>> x = torch.arange(5).repeat(3, 1) + 1
        >>> x
        tensor([[1, 2, 3, 4, 5],
                [1, 2, 3, 4, 5],
                [1, 2, 3, 4, 5]])
        >>> lengths = [5, 3, 2]
        >>> mask_by_length(x, lengths)
        tensor([[1, 2, 3, 4, 5],
                [1, 2, 3, 0, 0],
                [1, 2, 0, 0, 0]])

    r   N)r   r   datar!   r"   	enumerate)r$   rF   fillretr)   lr   r   r   mask_by_length6  s
   rU   c                 C   s   t | tjr| jjdkrddlm} || S t| S t | t	r@ddlm} d| vs.d| vr7t
dt| || d | d S t | tjrH| S dt| }zddlm} W n tyb   t
|w t | |rj| S t
|)a{  Change to torch.Tensor or ComplexTensor from numpy.ndarray.

    Args:
        x: Inputs. It should be one of numpy.ndarray, Tensor, ComplexTensor, and dict.

    Returns:
        Tensor or ComplexTensor: Type converted inputs.

    Examples:
        >>> xs = np.ones(3, dtype=np.float32)
        >>> xs = to_torch_tensor(xs)
        tensor([1., 1., 1.])
        >>> xs = torch.ones(3, 4, 5)
        >>> assert to_torch_tensor(xs) is xs
        >>> xs = {'real': xs, 'imag': xs}
        >>> to_torch_tensor(xs)
        ComplexTensor(
        Real:
        tensor([1., 1., 1.])
        Imag;
        tensor([1., 1., 1.])
        )

    cr   )ComplexTensorrealimagzhas 'real' and 'imag' keys: {}zox must be numpy.ndarray, torch.Tensor or a dict like {{'real': torch.Tensor, 'imag': torch.Tensor}}, but got {})r   npndarrayr8   kindtorch_complex.tensorrW   r   
from_numpydictr1   r2   r<   r   r   	Exception)r   rW   errorr   r   r   to_torch_tensorU  s.   



rb   c                 C   s  |dkr
t dgS |dkr3|dkr3t j| jd t jd}td tddd	d
 |D   |S |dkr;|dv sK|dkrC|dksK|dkr|dkrt j| jd t jd}| j	
dr| j	ds| jd}tt| jd t|D ]
}t|| ||< qtntd tdddd
 |D   |S |dkr|dkrt j| j| j d t jd}| j	
dr| j	ds| jd}tt| j| j d t|D ]
}t|| ||< qntd tdddd
 |D   |S |dkrd|dkrdg }t| jD ]a}t j| j| d t jd}| j	| 
drD| j	| dsD| j| d}tt| j| d t|D ]}t|| ||< q7ntd|d  tdddd
 |D   || q |S td||)a\  Parse the subsampling factors from the args for the specified `mode` and `arch`.

    Args:
        train_args: argument Namespace containing options.
        mode: one of ('asr', 'mt', 'st')
        arch: one of ('rnn', 'rnn-t', 'rnn_mix', 'rnn_mulenc', 'transformer')

    Returns:
        np.ndarray / List[np.ndarray]: subsampling factors.
    transformerr   mtrnnr7   z5Subsampling is not performed for machine translation.zsubsample:  c                 S      g | ]}t |qS r   strr   r   r   r   
<listcomp>      z!get_subsample.<locals>.<listcomp>asr)re   zrnn-tstpvgg_zTSubsampling is not performed for vgg*. It is performed in max pooling layers at CNN.c                 S   rg   r   rh   r   r   r   r   rj     rk   rnn_mixc                 S   rg   r   rh   r   r   r   r   rj     rk   
rnn_mulencz`Encoder %d: Subsampling is not performed for vgg*. It is performed in max pooling layers at CNN.c                 S   rg   r   rh   r   r   r   r   rj     rk   z!Invalid options: mode={}, arch={})rZ   arrayoneselayersint32loggingwarninginfojoinetypeendswith
startswith	subsamplesplitr#   minr   r>   
elayers_sdnum_encsr0   r1   r2   )
train_argsmodearchr~   ssjsubsample_listidxr   r   r   get_subsample  sd   
"$ r   
old_prefix
new_prefix
state_dictc                    s`    fdd|D }t |dkrtd  d|  |D ]}||}| |}|||< qdS )z9Replace keys of old prefix with new prefix in state dict.c                    s   g | ]	}|  r|qS r   )r}   )r   kr   r   r   rj     s    z%rename_state_dict.<locals>.<listcomp>r   zRename: z -> N)r   rw   rx   popreplace)r   r   r   old_keysr   vnew_kr   r   r   rename_state_dict  s   

r   c                       sF   e Zd ZdZddededdf fdd	Zd
ejdejfddZ	  Z
S )Swisha  Swish activation definition.

    Swish(x) = (beta * x) * sigmoid(x)
                 where beta = 1 defines standard Swish activation.

    References:
        https://arxiv.org/abs/2108.12943 / https://arxiv.org/abs/1710.05941v1.
        E-swish variant: https://arxiv.org/abs/1801.07145.

    Args:
        beta: Beta parameter for E-Swish.
                (beta >= 1. If beta < 1, use standard Swish).
        use_builtin: Whether to use PyTorch function if available.

          ?Fbetause_builtinreturnNc                    sL   t    | _|dkr fdd _d S |rtj  _d S dd  _d S )Nr   c                    s    j |  t|  S r+   )r   r   sigmoidr   selfr   r   <lambda>  rk   z Swish.__init__.<locals>.<lambda>c                 S   s   | t |  S r+   )r   r   r   r   r   r   r     s    )super__init__r   swishr   r   SiLU)r   r   r   	__class__r   r   r     s   
zSwish.__init__r   c                 C   s
   |  |S )zForward computation.)r   )r   r   r   r   r   forward  s   
zSwish.forward)r   F)__name__
__module____qualname____doc__floatboolr   r   r   r   __classcell__r   r   r   r   r     s    r   c                 C   s*   t jjt jjt jjt jjtd}||   S )zReturn activation function.)hardtanhtanhreluselur   )r   r   HardtanhTanhReLUSELUr   )actactivation_funcsr   r   r   get_activation  s   
r   c                       s2   e Zd ZdZdedededdf fddZ  ZS )	TooShortUttErrorzRaised when the utt is too short for subsampling.

    Args:
        message: Error message to display.
        actual_size: The size that cannot pass the subsampling.
        limit: The size limit for subsampling.

    messageactual_sizelimitr   Nc                    s   t  | || _|| _dS )z$Construct a TooShortUttError module.N)r   r   r   r   )r   r   r   r   r   r   r   r   +  s   
zTooShortUttError.__init__)r   r   r   r   ri   r>   r   r   r   r   r   r   r   !  s    &	r   
sub_factorr   r   c                 C   s@   | dkr
|dk r
dS | dkr|dk rdS | dkr|dk rdS d	S )
a	  Check if the input is too short for subsampling.

    Args:
        sub_factor: Subsampling factor for Conv2DSubsampling.
        size: Input size.

    Returns:
        : Whether an error should be sent.
        : Size limit for specified subsampling factor.

    r-   r.   )T      r         )Tr   )Fr6   r   )r   r   r   r   r   check_short_utt3  s   r   
input_sizec                 C   sj   | dkrdd|d d d fS | dkr dd|d d d d fS | dkr1dd|d d d d fS t d)a-  Get conv2D second layer parameters for given subsampling factor.

    Args:
        sub_factor: Subsampling factor (1/X).
        input_size: Input size.

    Returns:
        : Kernel size for second convolution.
        : Stride for second convolution.
        : Conv2DSubsampling output size.

    r-   r.   r   r   r      z?subsampling_factor parameter should be set to either 2, 4 or 6.)r1   )r   r   r   r   r   sub_factor_to_paramsI  s   r   
chunk_sizeleft_chunk_sizer   c                 C   sr   t j| | |t jd}t| D ]'}|dk rd}nt|| | | d}t|| d | | }d||||f< q| S )a  Create chunk mask for the subsequent steps (size, size).

    Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py

    Args:
        size: Size of the source mask.
        chunk_size: Number of frames in chunk.
        left_chunk_size: Size of the left context in chunks (0 means full context).
        device: Device for the mask tensor.

    Returns:
        mask: Chunk mask. (size, size)

    )r   r8   r   r   T)r   zerosr   r#   r    r   )r   r   r   r   rL   r)   startendr   r   r   make_chunk_mask`  s   r   rF   c                 C   s8   |   }| d}t|||| }|| dkS )zCreate source mask for given lengths.

    Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py

    Args:
        lengths: Sequence lengths. (B,)

    Returns:
        : Mask for the sequence lengths. (B, max_len)

    r   r   )r    r   r   r?   rB   r   rA   )rF   r'   
batch_sizeexpanded_lengthsr   r   r   make_source_mask  s   
r   labelsencoder_out_lens	ignore_idblank_idc                    s   ddt tj dtfdd}| j}fdd| D }| d |g | fdd|D ||}|||tj|}t	t
t|}t||}	td	d |D |}
|||	|
fS )a  Get Transducer loss I/O.

    Args:
        labels: Label ID sequences. (B, L)
        encoder_out_lens: Encoder output lengths. (B,)
        ignore_id: Padding symbol ID.
        blank_id: Blank symbol ID.

    Returns:
        decoder_in: Decoder inputs. (B, U)
        target: Target label ID sequences. (B, U)
        t_len: Time lengths. (B,)
        u_len: Label lengths. (B,)

    r   r   padding_valuec                 S   st   t | }| d j|tdd | D g| d  dd R  |}t|D ]}| | ||d| | df< q&|S )zCreate padded batch of labels from a list of labels sequences.

        Args:
            labels: Labels sequences. [B x (?)]
            padding_value: Padding value.

        Returns:
            labels: Batch of padded labels sequences. (B,)

        r   c                 s   r   r   r   r   r   r   r   r     r   z;get_transducer_task_io.<locals>.pad_list.<locals>.<genexpr>r   N)r   r!   r    r   r"   r#   )r   r   r   paddedr)   r   r   r   r*     s    z(get_transducer_task_io.<locals>.pad_listc                    s   g | ]}|| k qS r   r   r   y)r   r   r   rj     s    z*get_transducer_task_io.<locals>.<listcomp>c                    s   g | ]}t j |gd dqS )r   rC   )r   cat)r   label)blankr   r   rj     s    c                 S   s   g | ]}| d qS r   r   r   r   r   r   rj     s    Nr   )r   r   r   r>   r   r!   r   r   rv   r<   map	IntTensor)r   r   r   r   r*   r   labels_unpad
decoder_intargett_lenu_lenr   )r   r   r   get_transducer_task_io  s   r   tpad_lenrC   c                 C   sR   |  ||kr	| S t| j}||  | ||< tj| tj|| j| jdg|dS )zMPad the tensor `t` at `dim` to the length `pad_len` with right padding zeros.)r8   r   r   )r   r<   r/   r   r   r   r8   r   )r   r   rC   pad_sizer   r   r   
pad_to_len  s
   
$r   )Nr6   N)Nr6   r   r   )r6   r   )#r   rw   typingr   r   r   numpyrZ   r   r   r*   r5   rN   rO   rU   rb   r   ri   r   r   r   r   r   r   r`   r   r>   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s^   
*
u
YD K# 
"
A