o
    }oi}M                     @   s  d dl Z d dlmZ d dlZd dlZd dlmZ d dlZd dl	Z	d dl
mZmZ d dlmZmZ dZdedefdd	Zd
defdejdededededejfddZdSdejdedejfddZd
ddefdejdejdededededefddZ		 	dTdejdejded ed!edejfd"d#Zdejdefd$d%ZdSd&edee defd'd(Zd)edefd*d+ZdSd,edee defd-d.Zdejd/ejdefd0d1Z	2	2		3		4dUd5ejd6ejd7e d8e d9ee d:e d;ee dedefd<d=Z!de	j"de	j"fd>d?Z#dVdejd@edAedBee dejf
dCdDZ$dVdejd@edAedBee dejf
dEdFZ%dWd5ejd6ejdedejfdGdHZ&	4dXd5ejd6ejdJededejf
dKdLZ'de	j"de	j"fdMdNZ(	4dYde	j"dOee	j" dPe dede	j"f
dQdRZ)dS )Z    N)Optional)	rearrangereduce)pdist
squareformg     pu@xreturnc                 C   s   t | t j S )zcUnnormalized sinc.

    Args:
        x: input value

    Returns:
        Calculates sin(x)/x
    )npsincpir    r   \/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/audio/parts/utils/audio.pysinc_unnormalized   s   	r   	sphericali   mic_positionssample_ratefield
fft_lengthsound_velocityc                 C   s  | j d dksJ d| j d }|dk rtd| |d d }dtj | td| | }t|||f}tt| }	t|D ]G}
d|dd|
|
f< t|
d |D ]4}|	|
|f }|d	krpt	|| | |dd|
|f< ntd
| d|dd|
|f |dd||
f< qTqB|S )a  Calculate a theoretical coherence matrix for given mic positions and field type.

    Args:
        mic_positions: 3D Cartesian coordinates of microphone positions, shape (num_mics, 3)
        field: string denoting the type of the soundfield
        sample_rate: sampling rate of the input signal in Hz
        fft_length: length of the fft in samples
        sound_velocity: speed of sound in m/s

    Returns:
        Calculated coherence with shape (num_subbands, num_mics, num_mics)
          z!Expecting 3D microphone positionsr      z+Expecting at least 2 microphones, received       ?Nr   zUnknown noise field .)
shape
ValueErrorr	   r   arangezerosr   r   ranger   )r   r   r   r   r   num_micsnum_subbandsangular_freqdesired_coherencemic_distancepqdist_pqr   r   r   theoretical_coherence)   s$   
 "	r(   缉ؗҜ<Sepsc           
   
   C   s0  | j dkr	td| j\}}}|dk rtdtjt| d dd}tj|||ftd}t	|D ]d}d|d	d	||f< t	|d |D ]Q}tj| d	d	d	d	|f t
| d	d	d	d	|f  dd}	|	t|d	d	|f |d	d	|f  |  |d	d	||f< t
|d	d	||f |d	d	||f< qCq1|S )
a0  Estimate complex-valued coherence for the input STFT-domain signal.

    Args:
        S: STFT of the signal with shape (num_subbands, num_frames, num_channels)
        eps: small regularization constant

    Returns:
        Estimated coherence with shape (num_subbands, num_channels, num_channels)
    r   z)Expecting the input STFT to be a 3D arrayr    Expecting at least 2 microphonesr   axis)dtyper   N)ndimRuntimeErrorr   r   r	   meanabsr   complexr   	conjugatesqrt)
r*   r+   r!   
num_framesnum_channelspsdestimated_coherencer%   r&   	cross_psdr   r   r   r:   V   s   

<<(r:   choleskynoise_signalmethodc           	      C   sJ   |d dksJ | j d }|dk rtdt| ||||d}t|||dS )ay  
    Args:
        mic_positions: 3D microphone positions, shape (num_mics, 3)
        noise_signal: signal used to generate the approximate noise field, shape (num_samples, num_mics).
                      Different channels need to be independent.
        sample_rate: sampling rate of the input signal
        field: string denoting the type of the soundfield
        fft_length: length of the fft in samples
        method: coherence decomposition method
        sound_velocity: speed of sound in m/s

    Returns:
        Signal with coherence approximately matching the desired coherence, shape (num_samples, num_channels)

    References:
        E.A.P. Habets, I. Cohen and S. Gannot, 'Generating nonstationary multisensor
        signals under a spatial coherence constraint', Journal of the Acoustical Society
        of America, Vol. 124, Issue 5, pp. 2911-2917, Nov. 2008.
    r   r   r,   )r   r   r   r   r   )signalr#   r>   )r   r   r(   transform_to_match_coherence)	r   r=   r   r   r   r>   r   r    r#   r   r   r    generate_approximate_noise_fieldv   s   
rA   皙?r?   r#   ref_channelcorrcoef_thresholdc                 C   s  | j d }|j d }|j d |ksJ |j d |ksJ d|d  }| tj| dd } tjt| d dd}| t||  t| } t|  }	t|	d tt|	|krlt	d| dt|	
  dtj|  |d	}
|
ddd}
t|
}|d
krtj|dd }|dd}n.|dkrtj|dd \}}t|dddddf | }|dd}ntd| t|
dddf ||dddf< tj|dddt| d}| }|S )a  Transform the input multichannel signal to match the desired coherence.

    Note: It's assumed that channels are independent.

    Args:
        signal: independent noise signals with shape (num_samples, num_channels)
        desired_coherence: desired coherence with shape (num_subbands, num_channels, num_channels)
        method: decomposition method used to construct the transformation matrix
        ref_channel: reference channel for power normalization of the input signal
        corrcoef_threshold: used to detect input signals with high correlation between channels

    Returns:
        Signal with coherence approximately matching the desired coherence, shape (num_samples, num_channels)

    References:
        E.A.P. Habets, I. Cohen and S. Gannot, 'Generating nonstationary multisensor
        signals under a spatial coherence constraint', Journal of the Acoustical Society
        of America, Vol. 124, Issue 5, pp. 2911-2917, Nov. 2008.
    r   r   r   r-   g        z2Input channels are correlated above the threshold z:. Max abs off-diagonal element of the coefficient matrix: r   )n_fftr<   NevdzUnknown method .)length)r   r	   r2   r3   r6   corrcoef	transposefill_diagonalanyr1   maxlibrosastft
zeros_likelinalgr<   swapaxeseigr   matmulistftlen)r?   r#   r>   rC   rD   r8   r!   r   signal_powercorrcoef_matrixr*   XLAwVr   r   r   r   r@      s:   


 $r@   c                 C   s   t t t | d S )zCalculate RMS value for the input signal.

    Args:
        x: input signal

    Returns:
        RMS of the input signal.
    r   )r	   r6   r2   r3   r   r   r   r   rms   s   	r]   magc                 C      dt | |  S )zConvert magnitude ratio from linear scale to dB.

    Args:
        mag: linear magnitude value
        eps: small regularization constant

    Returns:
        Value in dB.
       r	   log10)r^   r+   r   r   r   mag2db     
rc   dbc                 C   s   d| d  S )zConvert value in dB to linear magnitude ratio.

    Args:
        db: magnitude ratio in dB

    Returns:
        Magnitude ratio in linear scale.
    
   r`   r   )re   r   r   r   db2mag  s   	rg   powerc                 C   r_   )zConvert power ratio from linear scale to dB.

    Args:
        power: power ratio in linear scale
        eps: small regularization constant

    Returns:
        Power in dB.
    rf   ra   )rh   r+   r   r   r   pow2db  rd   ri   segmentc                 C   sH   t | t |krtdt | dt |  tjj| |dd}t|S )a  Get starting point of `segment` in `signal`.
    We assume that `segment` is a sub-segment of `signal`.
    For example, `signal` may be a 10 second audio signal,
    and `segment` could be the signal between 2 seconds and
    5 seconds. This function will then return the index of
    the sample where `segment` starts (at 2 seconds).

    Args:
        signal: numpy array with shape (num_samples,)
        segment: numpy array with shape (num_samples,)

    Returns:
        Index of the start of `segment` in `signal`.
    z4segment must be shorter than signal: len(segment) = z, len(signal) = valid)mode)rU   r   scipyr?   	correlater	   argmax)r?   rj   ccr   r   r   get_segment_start*  s   
rq   FT:0yE>estimatetargetscale_invariantconvolution_invariantconvolution_filter_lengthremove_meansdr_maxc                 C   s   |r|rt d|r| t|  } |t| }|s |r(|dkr(t| ||d}n
|r2t| |||d}tt|d }tt| | d }	|durW|	d| d  |  }	dt||	|  |  }
|
S )aK  Calculate signal-to-distortion ratio.

        SDR = 10 * log10( ||t||_2^2 / (||e-t||_2^2 + alpha * ||t||^2)

    where
        alpha = 10^(-sdr_max/10)

    Optionally, apply scale-invariant scaling to target signal.

    Args:
        estimate: estimated signal
        target: target signal

    Returns:
        SDR in dB.
    zRArguments scale_invariant and convolution_invariant cannot be used simultaneously.r   )rs   rt   r+   )rs   rt   filter_lengthr+   r   Nrf   )r   r	   r2   scale_invariant_target_numpy"convolution_invariant_target_numpyr3   rb   )rs   rt   ru   rv   rw   rx   ry   r+   
target_powdistortion_powsdrr   r   r   calculate_sdr_numpyA  s"   r   c                 C   s*   t jtj| jd}t | | d| | S )zWrap angle in radians to [-pi, pi]

    Args:
        x: angle in radians

    Returns:
        Angle in radians wrapped to [-pi, pi]
    )devicer   )torchtensormathr   r   	remainder)r   r   r   r   r   
wrap_to_pis  s   	r   rz   delayn_stepsc              
   C   s   | j dkrtd| j |du rt| }tt|| g}|t|  }dkr4t|t|g}n|d| }tj	|t|d t|d gS )a  Construct a causal convolutional matrix from x delayed by `delay` samples.

    Args:
        x: input signal, shape (N,)
        filter_length: length of the filter in samples
        delay: delay the signal by a number of samples
        n_steps: total number of time steps (rows) for the output matrix

    Returns:
        Convolutional matrix, shape (n_steps, filter_length)
    r   z=Expecting one-dimensional signal. Received signal with shape Nr   )
r0   r   r   rU   r	   hstackr   rm   rP   toeplitz)r   rz   r   r   x_padpad_lenr   r   r   convmtx_numpy  s   
&r   c              	   C   s^   | j dkrtd| j g }t| jd D ]}|t| dd|f |||d qt|S )a  Construct a causal multi-channel convolutional matrix from `x` delayed by `delay` samples.

    Args:
        x: input signal, shape (N, M)
        filter_length: length of the filter in samples
        delay: delay the signal by a number of samples
        n_steps: total number of time steps (rows) for the output matrix

    Returns:
        Multi-channel convolutional matrix, shape (n_steps, M * filter_length)
    r   z=Expecting two-dimensional signal. Received signal with shape r   N)rz   r   r   )r0   r   r   r   appendr   r	   r   )r   rz   r   r   mc_mtxmr   r   r   convmtx_mc_numpy  s   
$
r   c                 C   s^   |j | j   krdksJ d J dt| | }tt|d }|||  }|| S )a  Calculate convolution-invariant target for a given estimated signal.

    Calculate scaled target obtained by solving

        min_scale || scale * target - estimate ||^2

    Args:
        estimate: one-dimensional estimated signal, shape (T,)
        target: one-dimensional target signal, shape (T,)
        eps: regularization constans

    Returns:
        Scaled target signal, shape (T,)
    r   %Only one-dimensional inputs supportedr   )r0   r	   r2   r3   )rs   rt   r+   estimate_dot_targetr}   scaler   r   r   r{     s
   (r{   ư>diag_regc                 C   s(  |j | j   krdksJ d J ddttt|t|  d  }tjj||d}tjj| |d}tjjt	|d |d}tjj|
 | |d}	|d| }|	d| }	|durm|d  ||d  | 7  < tj|}
tj|
|	}|tjj||d }tjj||d}|dt| S )a  Calculate convolution-invariant target for a given estimated signal.

    Calculate target filtered with a linear f obtained by solving

        min_filter || conv(filter, target) - estimate ||^2

    Args:
        estimate: one-dimensional estimated signal
        target: one-dimensional target signal
        filter_length: length of the (convolutive) filter
        diag_reg: multiplicative factor for relative diagonal loading
        eps: absolute diagonal loading
    r   r   r   )nNr   )r0   r   ceillog2rU   r	   fftrfftirfftr3   conjrm   rP   r   solve)rs   rt   rz   r   r+   rE   TEtt_corrte_corrTTfiltT_filttarget_filtr   r   r   r|     s   ($r|   c                 C   sF   |  d}tj| dddf jdd| gdd} | d|djddS )zCreate Toeplitz matrix for one-dimensional signals along the last dimension.

    Args:
        x: tensor with shape (..., T)

    Returns:
        Tensor with shape (..., T, T)
    .r   N)r   )dimsdim)sizer   catflipunfold)r   rG   r   r   r   r     s   
	&r   masknormalize_maskc                 C   s   | j dk rtd| j t| d} td| |  }|du r'|jdd}|S |j | j d kr;td	| j d
|j |j| jdd krQtd| j d
|j |r^||jddd|  }|d | }t	|dd}|S )aQ  Calculate covariance matrix of the input signal.

    If a mask is provided, the covariance matrix is calculated by weighting by the provided time-frequency mask and summing over the time dimension. The mask is normalized by default. If a mask is not provided, the covariance matrix is calculated by averaging over the time dimension.

    The provided mask can be real-valued or complex-valued, or a binary or boolean mask.

    Args:
        x: input signal with shape `(..., channel, freq, time)`
        mask: Time-frequency mask with shape `(..., freq, time)`. Default is `None`.
        normalize_mask: if `True`, normalize the mask by dividing by the sum of the mask across time. Default is `True`.
        eps: regularization constant. Default is `1e-10`.

    Returns:
        Covariance matrix with shape (..., freq, channel, channel)
    r   zBInput signal must have at least 3 dimensions. Input signal shape: z... c f t -> ... f t cz...tm,...tn->...tmnNr   r   zwMask must have the same number of dimensions as the input signal, excluding the channel dimension. Input signal shape: z, mask shape: r   zhMask must have the same shape as the input signal, excluding the channel dimension. Input signal shape: T)r   keepdim).NNz... f t m n -> ... f m nsum)
r0   r   r   r   r   einsumr   r2   r   r   )r   r   r   r+   p_xxr   r   r   covariance_matrix	  s(   

r   )r)   )r<   r   rB   )FFNTNrr   )r   N)rr   )r   rr   )NTrr   )*r   typingr   rM   numpyr	   numpy.typingnptrm   r   einopsr   r   scipy.spatial.distancer   r   SOUND_VELOCITYfloatr   NDArraystrintr(   r:   rA   r@   ndarrayr]   rc   rg   ri   rq   boolr   Tensorr   r   r   r{   r|   r   r   r   r   r   r   <module>   s   
-$
0
U	
2(("
0