o
    i/                     @   s   d Z ddlmZmZ ddlZG dd deZdededeeef fd	d
Z	dededeeeef fddZ
		ddedededejdejf
ddZdejdejfddZ		ddejdejdededeejejejejf f
ddZdS )z(Utility functions for Transducer models.    )ListTupleNc                       s2   e Zd ZdZdedededdf fddZ  ZS )	TooShortUttErrorzRaised when the utt is too short for subsampling.

    Args:
        message: Error message to display.
        actual_size: The size that cannot pass the subsampling.
        limit: The size limit for subsampling.

    messageactual_sizelimitreturnNc                    s   t  | || _|| _dS )z$Construct a TooShortUttError module.N)super__init__r   r   )selfr   r   r   	__class__ P/home/ubuntu/.local/lib/python3.10/site-packages/espnet2/asr_transducer/utils.pyr
      s   
zTooShortUttError.__init__)__name__
__module____qualname____doc__strintr
   __classcell__r   r   r   r   r      s    &	r   
sub_factorsizer   c                 C   s@   | dkr
|dk r
dS | dkr|dk rdS | dkr|dk rdS d	S )
a	  Check if the input is too short for subsampling.

    Args:
        sub_factor: Subsampling factor for Conv2DSubsampling.
        size: Input size.

    Returns:
        : Whether an error should be sent.
        : Size limit for specified subsampling factor.

          )T      r         )Tr   )Fr   )r   r   r   r   r   check_short_utt   s   r    
input_sizec                 C   sj   | dkrdd|d d d fS | dkr dd|d d d d fS | dkr1dd|d d d d fS t d)a-  Get conv2D second layer parameters for given subsampling factor.

    Args:
        sub_factor: Subsampling factor (1/X).
        input_size: Input size.

    Returns:
        : Kernel size for second convolution.
        : Stride for second convolution.
        : Conv2DSubsampling output size.

    r   r      r   r      z?subsampling_factor parameter should be set to either 2, 4 or 6.)
ValueError)r   r!   r   r   r   sub_factor_to_params0   s   r%   
chunk_sizeleft_chunk_sizedevicec                 C   sr   t j| | |t jd}t| D ]'}|dkrd}nt|| | | d}t|| d | | }d||||f< q| S )a  Create chunk mask for the subsequent steps (size, size).

    Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py

    Args:
        size: Size of the source mask.
        chunk_size: Number of frames in chunk.
        left_chunk_size: Size of the left context in chunks (0 means full context).
        device: Device for the mask tensor.

    Returns:
        mask: Chunk mask. (size, size)

    )r(   dtyper   r"   T)torchzerosboolrangemaxmin)r   r&   r'   r(   maskistartendr   r   r   make_chunk_maskI   s   r4   lengthsc                 C   s8   |   }| d}t|||| }|| dkS )zCreate source mask for given lengths.

    Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py

    Args:
        lengths: Sequence lengths. (B,)

    Returns:
        : Mask for the sequence lengths. (B, max_len)

    r   r"   )r.   r   r*   arangeexpandto	unsqueeze)r5   max_len
batch_sizeexpanded_lengthsr   r   r   make_source_maskk   s   
r=   r   labelsencoder_out_lens	ignore_idblank_idc                    s   ddt tj dtfdd}| j}fdd| D }| d |g | fdd|D ||}|||tj|}t	t
t|}t||}	td	d |D |}
|||	|
fS )a  Get Transducer loss I/O.

    Args:
        labels: Label ID sequences. (B, L)
        encoder_out_lens: Encoder output lengths. (B,)
        ignore_id: Padding symbol ID.
        blank_id: Blank symbol ID.

    Returns:
        decoder_in: Decoder inputs. (B, U)
        target: Target label ID sequences. (B, U)
        t_len: Time lengths. (B,)
        u_len: Label lengths. (B,)

    r   r>   padding_valuec                 S   st   t | }| d j|tdd | D g| d  dd R  |}t|D ]}| | ||d| | df< q&|S )zCreate padded batch of labels from a list of labels sequences.

        Args:
            labels: Labels sequences. [B x (?)]
            padding_value: Padding value.

        Returns:
            labels: Batch of padded labels sequences. (B,)

        r   c                 s   s    | ]}| d V  qdS r   Nr   ).0xr   r   r   	<genexpr>   s    z;get_transducer_task_io.<locals>.pad_list.<locals>.<genexpr>r"   N)lennewr.   r   fill_r-   )r>   rB   r;   paddedr1   r   r   r   pad_list   s    z(get_transducer_task_io.<locals>.pad_listc                    s   g | ]}|| k qS r   r   rE   y)r@   r   r   
<listcomp>   s    z*get_transducer_task_io.<locals>.<listcomp>c                    s   g | ]}t j |gd dqS )r   )dim)r*   cat)rE   label)blankr   r   rO      s    c                 S   s   g | ]}| d qS r   rD   rM   r   r   r   rO      s    NrT   )r   r*   Tensorr   r(   rI   r8   typeint32listmap	IntTensor)r>   r?   r@   rA   rL   r(   labels_unpad
decoder_intargett_lenu_lenr   )rS   r@   r   get_transducer_task_io   s   r`   rC   )r   r   )r   typingr   r   r*   	Exceptionr   r   r,   r    r%   r(   rU   r4   r=   r`   r   r   r   r   <module>   sB     
"