o
    mi)                     @   sl   d Z ddlmZmZ ddlZddlZddlZddlmZ ddl	m
Z
 eG dd dZG d	d
 d
ejZdS )z)Residual vector quantizer implementation.    )	dataclassfieldN)nn   )ResidualVectorQuantizationc                   @   sR   e Zd ZU ejed< ejed< ejed< dZejej ed< e	e
dZe
ed< dS )QuantizedResult	quantizedcodes	bandwidthNpenalty)default_factorymetrics)__name__
__module____qualname__torchTensor__annotations__r   tpOptionalr   dictr    r   r   ?/home/ubuntu/SpeechTokenizer/speechtokenizer/quantization/vq.pyr      s   
 


r   c                       s   e Zd ZdZ							dd	ed
edededededef fddZd dej	d
e
je de
je defddZd dej	d
e
je de
je dej	fddZd!dej	dedej	fddZ  ZS )"ResidualVectorQuantizera  Residual Vector Quantizer.
    Args:
        dimension (int): Dimension of the codebooks.
        n_q (int): Number of residual vector quantizers used.
        bins (int): Codebook size.
        decay (float): Decay for exponential moving average over the codebooks.
        kmeans_init (bool): Whether to use kmeans to initialize the codebooks.
        kmeans_iters (int): Number of iterations used for kmeans initialization.
        threshold_ema_dead_code (int): Threshold for dead code expiration. Replace any codes
            that have an exponential moving average cluster size less than the specified threshold with
            randomly selected vector from the current batch.
             Gz?T2      	dimensionn_qbinsdecaykmeans_initkmeans_itersthreshold_ema_dead_codec              	      s^   t    || _|| _|| _|| _|| _|| _|| _t	| j| j| j| j| j| j| jd| _
d S )N)dimcodebook_sizenum_quantizersr#   r$   r%   r&   )super__init__r!   r    r"   r#   r$   r%   r&   r   vq)selfr    r!   r"   r#   r$   r%   r&   	__class__r   r   r+   )   s"   

z ResidualVectorQuantizer.__init__Nxlayersreturnc                 C   sd   |r|n| j }|rt||krtdt| d| j  d| j|||d\}}}}||t||fS )a  Residual vector quantization on the given input tensor.
        Args:
            x (torch.Tensor): Input tensor.
            n_q (int): Number of quantizer used to quantize. Default: All quantizers.
            layers (list): Layer that need to return quantized. Defalt: None.
        Returns:
            QuantizedResult:
                The quantized (or approximately quantized) representation with
                the associated numbert quantizers and layer quantized required to return.
        zLast layer index in layers: A z!. Number of quantizers in RVQ: B z. A must less than B.)r!   r1   )r!   max
ValueErrorr,   r   mean)r-   r0   r!   r1   r   r	   commit_lossquantized_listr   r   r   forwardE   s
   zResidualVectorQuantizer.forwardstc                 C   s,   |r|n| j }|p
d}| jj|||d}|S )a  Encode a given input tensor with the specified sample rate at the given bandwidth.
        The RVQ encode method sets the appropriate number of quantizer to use
        and returns indices for each quantizer.
        Args:
            x (torch.Tensor): Input tensor.
            n_q (int): Number of quantizer used to quantize. Default: All quantizers.
            st (int): Start to encode input from which layers. Default: 0.
        r   )r!   r9   )r!   r,   encode)r-   r0   r!   r9   r	   r   r   r   r:   W   s   	zResidualVectorQuantizer.encoder   r	   c                 C   s   | j j||d}|S )zDecode the given codes to the quantized representation.
        Args:
            codes (torch.Tensor): Input indices for each quantizer.
            st (int): Start to decode input codes from which layers. Default: 0.
        )r9   )r,   decode)r-   r	   r9   r   r   r   r   r;   e   s   zResidualVectorQuantizer.decode)r   r   r   r   Tr   r   )NN)r   )r   r   r   __doc__intfloatboolr+   r   r   r   r   listr   r8   r:   r;   __classcell__r   r   r.   r   r      s6    *,$r   )r<   dataclassesr   r   mathtypingr   r   r   core_vqr   r   Moduler   r   r   r   r   <module>   s   