o
    i                     @   s  d Z ddlmZmZmZ ddlZddlmZ ddl	m
Z ddlmZ ddlmZmZmZmZmZmZmZmZmZmZ eejedkZeejjZ				
		deddZ	dfde fddZ!			
	dgdee"ej#f de"de$fddZ%				dhdej#de$de$de de f
dd Z&	
						did!eej#ef d"eej#ef d#eej#ef de"dee"ej#df d$ee" de$de$de de d%eej#ef fd&d'Z'd(eej#ef d)eej#ef d%eej#ef fd*d+Z(				dhdeej#e"f de$de$de de f
d,d-Z)	.	/	
				djdeej#e"f d0e d1e$de"de$de$de de fd2d3Z*	.	/	
				djdeej#e"f d0e d1e$de"de$de$de de fd4d5Z+			
			dkde$d6e"d7e"de$de de fd8d9Z,					dld!eej#ef d:eej#ef dee"ej#df de$de$de de d%eej#ef fd;d<Z-dmd=ej#d>ej#fd?d@Z.dAdB Z/dndCdDZ0			
				dod#eej#ef d"eej#ef dee"ej#f de"de$de$de de d%eej#ef fdEdFZ1	/		dpdGeej#ef dHe"dIe"dJe"dKe$dLe"dMed%eej#ef fdNdOZ2	/dqdPeej#ef dQej#dJe"dRe"dSe$d%eej#ef fdTdUZ3				dhdVeej#ef dWeej#ef dej#de$de$de de d%eej#ef fdXdYZ4			drdVeej#ef dWeej#ef dej#de$de de d%eej#ef fdZd[Z5	
						dsd\eej#ef d"eej#ef d#eej#ef de"dee"ej#df d$ee" de$de$de de d%eej#ef fd]d^Z6d_eej#ef dPeej#ef dJe"dRe"d%eej#ef f
d`daZ7dtdbe de fdcddZ8dS )uzBeamformer module.    )ListOptionalUnionN)parse)
functional)ComplexTensor)
catcomplex_normeinsuminverse
is_complexis_torch_complex_tensormatmulreversesolve	to_doublez1.9.0mvdr      ư>c                    s  ddl m} ||v sJ dt|ttfrdd |D }nt|g}t|}	|ds8|ds8|dks8|d	krg|d
u rPjd j	d  fdd|D }nt||	ks^J t||	ffdd|D }
fdd|D }|dks|dks|dks|ds|
ds|d
urtt|}|	dkr|d
usJ |}n6g }t|	D ]/|drfddt|D }ntfddt|D }|d
ur|| n|}|| q|dv r|}n:|dkr|}dd |D }n,|dv rtd }n|dv rfdd|
D }n|dv r fdd|
D }|	dkr*|d }t|ttfr*|d }|d v r5|||d!S |
dsU|d"sU|d#ksU|d	ksU|dksU|dkrZ||d$S d
S )%a  Prepare necessary statistics for constructing the specified beamformer.

    Args:
        signal (torch.complex64/ComplexTensor): (..., F, C, T)
        masks_speech (List[torch.Tensor]): (..., F, C, T) masks for all speech sources
        mask_noise (torch.Tensor): (..., F, C, T) noise mask
        powers (List[torch.Tensor]): powers for all speech sources (..., F, T)
                                     used for wMPDR or WPD beamformers
        beamformer_type (str): one of the pre-defined beamformer types
        bdelay (int): delay factor, used for WPD beamformser
        btaps (int): number of filter taps, used for WPD beamformser
        eps (torch.Tensor): tiny constant
    Returns:
        beamformer_stats (dict): a dictionary containing all necessary statistics
            e.g. "psd_n", "psd_speech", "psd_distortion"
            Note:
            * When `masks_speech` is a tensor or a single-element list, all returned
              statistics are tensors;
            * When `masks_speech` is a multi-element list, some returned statistics
              can be a list, e.g., "psd_n" for MVDR, "psd_speech" and "psd_distortion".

    r   )BEAMFORMER_TYPESz%s is not supported yetc                 S      g | ]}t |qS  )r   .0mr   r   Q/home/ubuntu/.local/lib/python3.10/site-packages/espnet2/enh/layers/beamformer.py
<listcomp>?       z,prepare_beamformer_stats.<locals>.<listcomp>wmpdrwpdwlcmpwmwfN   c                    s   g | ]
} | j d dqS )dim)meanr   )power_inputr   r   r   M       c                    s   g | ]}d t j| d qS )   )min)torchclamp)r   p)epsr   r   r   P   s    c                    s   g | ]}t  |qS r   )!get_power_spectral_density_matrixr   signalr   r   r   R   s    mvdr_soudensdw_mwfr1mwfmvdr_tfs_soudenr*   c                    s   g | ]
\}}| kr|qS r   r   r   jpsdir   r   r   e   r)   c                 3   s     | ]\}}| kr|V  qd S )Nr   r8   r;   r   r   	<genexpr>g   s    z+prepare_beamformer_stats.<locals>.<genexpr>)r   r3   mvdr_tfs_soudenr4   r5   lcmvgevgev_banc                 S   r   r   )sum)r   psd_noise_ir   r   r   r   z   r   )mpdrmpdr_soudenlcmpmwf...ct,...et->...ce)r   wmpdr_soudenr!   r"   c              
      s.   g | ]}t d  |ddddf    qS )rH   .Nr
   conjr   inv_pr1   r   r   r   ~   s    )r    
wpd_soudenc              	      s   g | ]}t | d dqS )F)
get_vector)get_covariancesrL   )bdelaybtapsr2   r   r   r      s    )r   rD   r   r    rF   r!   r?   r6   )psd_n
psd_speechpsd_distortionr@   rG   )rS   rT   )!espnet2.enh.layers.dnn_beamformerr   
isinstancelisttupler   len
startswithrealimagendswithr0   range	enumeraterB   appendr
   rK   )r2   masks_speech
mask_noisepowersbeamformer_typerQ   rR   r/   r   num_spkinverse_powerspsd_speechespsd_bg	psd_noiserC   psd_sumrS   r   )rQ   rR   r/   r<   r(   r2   r   prepare_beamformer_stats   s    













rl   Tr'   V瞯<r/   c                 C   sn   |dkr|j ddd}n|dkr|jddd}ntd| |r+||jddd|  }td| | |  }|S )	aW  Return cross-channel power spectral density (PSD) matrix

    Args:
        xs (torch.complex64/ComplexTensor): (..., F, C, T)
        reduction (str): "mean" or "median"
        mask (torch.Tensor): (..., F, C, T)
        normalization (bool):
        eps (float):
    Returns
        psd (torch.complex64/ComplexTensor): (..., F, C, C)

    r'   r$   Tr&   keepdimmedianzUnknown reduction mode: %srH   )r'   rp   
ValueErrorrB   r
   rK   )xsmasknormalization	reductionr/   r:   r   r   r   r0      s   r0   powerreference_vector
iterationsuse_torch_solverc           
      C   s   |dkrA|rt | |}ntt|| }t|tr|d|df nt||dddddf }t|d D ]}t||}q2t| |}|S |dkratrOt| rOt|sQJ t| |d }	t||	d }|S t	d| )	a   Calculate the relative transfer function (RTF)

    Algorithm of power method:
        1) rtf = reference_vector
        2) for i in range(iterations):
             rtf = (psd_noise^-1 @ psd_speech) @ rtf
             rtf = rtf / ||rtf||_2  # this normalization can be skipped
        3) rtf = psd_noise @ rtf
        4) rtf = rtf / rtf[..., ref_channel, :]
    Note: 4) Normalization at the reference channel is not performed here.

    Args:
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        mode (str): one of ("power", "evd")
            "power": power method
            "evd": eigenvalue decomposition
        reference_vector (torch.Tensor or int): (..., C) or scalar
        iterations (int): number of iterations in power method
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
    Returns:
        rtf (torch.complex64/ComplexTensor): (..., F, C, 1)
    rw   .Nr#   evdr*   ).rq   NUnknown mode: %s)
r   r   r   rW   intr_   is_torch_1_9_plusr   $generalized_eigenvalue_decompositionrr   )
rT   rj   moderx   ry   rz   phirtf_e_vecr   r   r   get_rtf   s.   !
r   Hz>:0yE>diagonal_loadingdiag_epsc           
      C   V   |r	t |||d}|rt| |}ntt|| }|t|d |  }td||}	|	S )am  Return the MVDR (Minimum Variance Distortionless Response) vector:

        h = (Npsd^-1 @ Spsd) / (Tr(Npsd^-1 @ Spsd)) @ u

    Reference:
        On optimal frequency-domain multichannel linear filtering
        for noise reduction; M. Souden et al., 2010;
        https://ieeexplore.ieee.org/document/5089420

    Args:
        psd_s (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_n (torch.complex64/ComplexTensor):
            observation/noise covariance matrix (..., F, C, C)
        reference_vector (torch.Tensor): (..., C)
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    regr/   .NN...fec,...c->...fetik_regr   r   r   FCtracer
   )
psd_srS   rx   rz   r   r   r/   	numeratorwsbeamform_vectorr   r   r   get_mvdr_vector  s   r   rS   rT   rj   normalize_ref_channelreturnc
                 C   s   |r	t |||	d}t|||||d}
|rt|
| d}n
tt| |
d}td|
d |}|durP|
dd|df  }|| |j	d|	  }|S ||j	d|	  }|S )a  Return the MVDR (Minimum Variance Distortionless Response) vector
        calculated with RTF:

        h = (Npsd^-1 @ rtf) / (rtf^H @ Npsd^-1 @ rtf)

    Reference:
        On optimal frequency-domain multichannel linear filtering
        for noise reduction; M. Souden et al., 2010;
        https://ieeexplore.ieee.org/document/5089420

    Args:
        psd_n (torch.complex64/ComplexTensor):
            observation/noise covariance matrix (..., F, C, C)
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        iterations (int): number of iterations in power method
        reference_vector (torch.Tensor or int): (..., C) or scalar
        normalize_ref_channel (int): reference channel for normalizing the RTF
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   rx   ry   rz   rq   ...d,...d->...N.)
r   r   r   squeezer   r   r
   rK   r\   	unsqueeze)rS   rT   rj   ry   rx   r   rz   r   r   r/   r   r   denominatorscalebeamforming_vectorr   r   r   get_mvdr_vector_with_rtf5  s&   '	r   r   mixc                 C   s   t d|  |}|S )Nz...c,...ct->...trJ   )r   r   esr   r   r   apply_beamforming_vectorv  s   r   c           	      C   sZ   |r	t |||d}|rt| |}ntt|| }t|tr%|d|f }|S td||}|S )a  Return the MWF (Minimum Multi-channel Wiener Filter) vector:

        h = (Npsd^-1 @ Spsd) @ u

    Args:
        psd_s (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_n (torch.complex64/ComplexTensor):
            power-normalized observation covariance matrix (..., F, C, C)
        reference_vector (torch.Tensor or int): (..., C) or scalar
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   .r   )r   r   r   r   rW   r}   r
   )	r   rS   rx   rz   r   r   r/   r   r   r   r   r   get_mwf_vector  s   
r         ?Fdenoising_weightapprox_low_rank_psd_speechc
                 C   s   |r4|rt |||	d}t| |d|||d}
t|
|
 dd}t| t||	  }||d  }|} | ||  }|rCt |||	d}|rKt| |}ntt|| }t	|t
r_|d|f }|S td||}|S )	as  Return the SDW-MWF (Speech Distortion Weighted Multi-channel Wiener Filter) vector

        h = (Spsd + mu * Npsd)^-1 @ Spsd @ u

    Reference:
        [1] Spatially pre-processed speech distortion weighted multi-channel Wiener
        filtering for noise reduction; A. Spriet et al, 2004
        https://dl.acm.org/doi/abs/10.1016/j.sigpro.2004.07.028
        [2] Rank-1 constrained multichannel Wiener filter for speech recognition in
        noisy environments; Z. Wang et al, 2018
        https://hal.inria.fr/hal-01634449/document
        [3] Low-rank approximation based multichannel Wiener filter algorithms for
        noise reduction with application in cochlear implants; R. Serizel, 2014
        https://ieeexplore.ieee.org/document/6730918

    Args:
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        reference_vector (torch.Tensor or int): (..., C) or scalar
        denoising_weight (float): a trade-off parameter between noise reduction and
            speech distortion.
            A larger value leads to more noise reduction at the expense of more speech
            distortion.
            The plain MWF is obtained with `denoising_weight = 1` (by default).
        approx_low_rank_psd_speech (bool): whether to replace original input psd_speech
            with its low-rank approximation as in [2]
        iterations (int): number of iterations in power method, only used when
            `approx_low_rank_psd_speech = True`
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   rw   r   ry   rx   rz   rq   r$   r   .r   r   r   r   rK   	transposer   r   r   r   rW   r}   r
   )rT   rj   rx   r   r   ry   rz   r   r   r/   	recon_vecpsd_speech_r1sigma_speechrS   r   r   r   r   r   get_sdw_mwf_vector  s4   1	
r   c
                 C   s   |r5|rt |||	d}t| |d|||d}
t|
|
 dd}t| t||	  }||d  }|} n	|r>t |||	d}|rFt| |}ntt|| }||t|d  |	  }t	|t
rg|d|f }|S td||}|S )	a  Return the R1-MWF (Rank-1 Multi-channel Wiener Filter) vector

        h = (Npsd^-1 @ Spsd) / (mu + Tr(Npsd^-1 @ Spsd)) @ u

    Reference:
        [1] Rank-1 constrained multichannel Wiener filter for speech recognition in
        noisy environments; Z. Wang et al, 2018
        https://hal.inria.fr/hal-01634449/document
        [2] Low-rank approximation based multichannel Wiener filter algorithms for
        noise reduction with application in cochlear implants; R. Serizel, 2014
        https://ieeexplore.ieee.org/document/6730918

    Args:
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        reference_vector (torch.Tensor or int): (..., C) or scalar
        denoising_weight (float): a trade-off parameter between noise reduction and
            speech distortion.
            A larger value leads to more noise reduction at the expense of more speech
            distortion.
            When `denoising_weight = 0`, it corresponds to MVDR beamformer.
        approx_low_rank_psd_speech (bool): whether to replace original input psd_speech
            with its low-rank approximation as in [1]
        iterations (int): number of iterations in power method, only used when
            `approx_low_rank_psd_speech = True`
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   rw   r   rq   r$   r   .r   r   )rT   rj   rx   r   r   ry   rz   r   r   r/   r   r   r   r   r   r   r   r   r   get_rank1_mwf_vector  s4   .	
r   ref_channelrtf_iterationsc           	         sZ   t tr
t |tsJ t fddt|D dd}||ddddf  S )znCalculate the RTF matrix with each column the relative transfer function
    of the corresponding source.
    c              	      s8   g | ]\}}t | rt| d n|dqS )r   r   )r   r   )r   spkrS   r   r   r/   rh   r   r   rz   r   r   r   b  s    z"get_rtf_matrix.<locals>.<listcomp>rq   r%   .N)rW   rX   r   r`   )	rh   
psd_noisesr   r   r   rz   r   r/   rtf_matr   r   r   get_rtf_matrixS  s   
r   r   c                 C   s   |r	t | ||d} |rt|| }ntt| |}t| dd|}t|tr2t|d|df }	nt||}	t||	d}
|
S )u  Return the LCMV (Linearly Constrained Minimum Variance) vector
        calculated with RTF:

        h = (Npsd^-1 @ rtf_mat) @ (rtf_mat^H @ Npsd^-1 @ rtf_mat)^-1 @ p

    Reference:
        H. L. Van Trees, “Optimum array processing: Part IV of detection, estimation,
        and modulation theory,” John Wiley & Sons, 2004. (Chapter 6.7)

    Args:
        psd_n (torch.complex64/ComplexTensor):
            observation/noise covariance matrix (..., F, C, C)
        rtf_mat (torch.complex64/ComplexTensor):
            RTF matrix (..., F, C, num_spk)
        reference_vector (torch.Tensor or int): (..., num_spk) or scalar
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   rq   r$   .N)	r   r   r   r   rK   r   rW   r}   r   )rS   r   rx   rz   r   r   r/   r   r   r   r   r   r   r   get_lcmv_vector_with_rtfr  s   

r   abc                 C   s   zt j|}W n ty   t|||d}t j|}Y nw | }||  | dd }t j|\}}t 	| dd|}||fS )aO  Solves the generalized eigenvalue decomposition through Cholesky decomposition.

    ported from https://github.com/asteroid-team/asteroid/blob/master/asteroid/dsp/beamforming.py#L464

    a @ e_vec = e_val * b @ e_vec
    |
    |   Cholesky decomposition on `b`:
    |       b = L @ L^H, where `L` is a lower triangular matrix
    |
    |   Let C = L^-1 @ a @ L^-H, it is Hermitian.
    |
    => C @ y = lambda * y
    => e_vec = L^-H @ y

    Reference: https://www.netlib.org/lapack/lug/node54.html

    Args:
        a: A complex Hermitian or real symmetric matrix whose eigenvalues and
            eigenvectors will be computed. (..., C, C)
        b: A complex Hermitian or real symmetric definite positive matrix. (..., C, C)
    Returns:
        e_val: generalized eigenvalues (ascending order)
        e_vec: generalized eigenvectors
    r   rq   r$   )
r,   linalgcholeskyRuntimeErrorr   r   rK   r   eighr   )r   r   r/   r   inv_choleskycmate_valr   r   r   r   r     s   r   c              	   C   s   | j \}}}t| j}t|D ]/}t| dd|ddf | dd|d ddf   jddd |dd|ddf< qt	| t
rUt
t|t| }| | S td| }| | S )a  Phase correction to reduce distortions due to phase inconsistencies.

    ported from https://github.com/fgnt/nn-gev/blob/master/fgnt/beamforming.py#L169

    Args:
        vector: Beamforming vector with shape (..., F, C)
    Returns:
        w: Phase corrected beamforming vectors
    Nr*   rq   Trn   y             )shaper,   
empty_liker\   r_   exprK   rB   anglerW   r   cossin)vectorBFC
correctionfr   r   r   gev_phase_correction  s   
2
r   c                 C   sP   | dd }td|  || }td|  ||| }||  || |  }|S )aY  Blind analytic normalization (BAN) for post-filtering

    Args:
        ws (torch.complex64/ComplexTensor): beamformer vector (..., F, C)
        psd_noise (torch.complex64/ComplexTensor): noise PSD matrix (..., F, C, C)
        eps (float)
    Returns:
        ws_ban (torch.complex64/ComplexTensor): normalized beamformer vector (..., F)
    rq   r#   z...c,...ce,...e->...z...c,...ce,...eo,...o->...)sizer
   rK   sqrt)r   rj   r/   C2r   r   gainr   r   r   blind_analytic_normalization  s   
r   c	                 C   s  |r	t | ||d} |dkrI|rt|| }	ntt| |}	t|tr(|	d|df nt|	|dddddf }
t|d D ]}t|	|
}
q;|
d}
n|dkrtrWt	|rWt	| sYJ | 
| jdd }
t| jd D ]d}z&t|d|ddddf | d|ddddf d d	 |
d|ddf< W qj ty   td
|dd | d}| |
d|ddf jt| d|ddddf  | |
d|ddf< Y qjw ntd| |
t|
ddd }t|}|S )a   Return the generalized eigenvalue (GEV) beamformer vector:

        psd_speech @ h = lambda * psd_noise @ h

    Reference:
        Blind acoustic beamforming based on generalized eigenvalue decomposition;
        E. Warsitz and R. Haeb-Umbach, 2007.

    Args:
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        mode (str): one of ("power", "evd")
            "power": power method
            "evd": eigenvalue decomposition (only for torch builtin complex tensors)
        reference_vector (torch.Tensor or int): (..., C) or scalar
        iterations (int): number of iterations in power method
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor): (..., F, C)
    r   rw   .Nr*   rq   r{   ).rq   z-GEV beamformer: LinAlg error for frequency {}T)flushr|   rn   )r   r   r   r   rW   r}   r_   r   r~   r   	new_zerosr   r   r   printformatr   new_onesr   r   rr   r	   r   )rj   rT   r   rx   ry   rz   r   r   r/   r   r   r   r   r   r   r   r   r   get_gev_vector  s`   $,
r   r2   frame_length
frame_steprQ   
do_padding	pad_valueindicesc                    s   t | trt}tj}nt| rtj}tjjj}ntjjj}|d |r2||   d dfd|} d}|du rM fddt	d| j
d    d |D }t| rlt| j|| |||}	t| j|| |||}
||	|
S | d	|f } | S )
aX  Expand `signal` into several frames, with each frame of length `frame_length`.

    Args:
        signal : (..., T)
        frame_length:   length of each segment
        frame_step:     step for selecting frames
        bdelay:         delay for WPD
        do_padding:     whether or not to pad the input signal at the beginning
                          of the time dimension
        pad_value:      value to fill in the padding

    Returns:
        torch.Tensor:
            if do_padding: (..., T, frame_length)
            else:          (..., T - bdelay - frame_length + 2, frame_length)
    r*   r   constantFNc                    s.   g | ]}g t || |   d  qS r*   )r_   )r   r<   rQ   frame_length2r   r   r   }  s     z"signal_framing.<locals>.<listcomp>rq   .)rW   r   r   padr   r,   complexnnr   r_   r   r   signal_framingr\   r]   )r2   r   r   rQ   r   r   r   complex_wrapperpad_funcr\   r]   r   r   r   r   L  sN   

	
	r   Yinverse_powerrR   rO   c                 C   s  |  dksJ |  |d| dks"J |d| df| j\}}}}t| |d d|dddd|| | d ddf }	t|	dd	}	|	|dd|| d ddf  }
td
|	|
 }||||d | |d | }|rtd|
| d|| d df  }||fS |S )ah  Calculates the power normalized spatio-temporal covariance
        matrix of the framed signal.

    Args:
        Y : Complex STFT signal with shape (B, F, C, T)
        inverse_power : Weighting factor with shape (B, F, T)

    Returns:
        Correlation matrix: (B, F, (btaps+1) * C, (btaps+1) * C)
        Correlation vector: (B, F, btaps + 1, C, C)
    r   r   r*   F)r   .Nrq   r%   zbfdtk,bfetl->bfkdlezbfdtk,bfet->bfked)r&   r   r   r   r   r
   rK   view)r   r   rQ   rR   rO   BsFdimr   TPsiPsi_normcovariance_matrixcovariance_vectorr   r   r   rP     s$   , rP   PhiRfc           
      C   r   )a  Return the WPD vector.

        WPD is the Weighted Power minimization Distortionless response
        convolutional beamformer. As follows:

        h = (Rf^-1 @ Phi_{xx}) / tr[(Rf^-1) @ Phi_{xx}] @ u

    Reference:
        T. Nakatani and K. Kinoshita, "A Unified Convolutional Beamformer
        for Simultaneous Denoising and Dereverberation," in IEEE Signal
        Processing Letters, vol. 26, no. 6, pp. 903-907, June 2019, doi:
        10.1109/LSP.2019.2911179.
        https://ieeexplore.ieee.org/document/8691481

    Args:
        Phi (torch.complex64/ComplexTensor): (B, F, (btaps+1) * C, (btaps+1) * C)
            is the PSD of zero-padded speech [x^T(t,f) 0 ... 0]^T.
        Rf (torch.complex64/ComplexTensor): (B, F, (btaps+1) * C, (btaps+1) * C)
            is the power normalized spatio-temporal covariance matrix.
        reference_vector (torch.Tensor): (B, (btaps+1) * C)
            is the reference_vector.
        use_torch_solver (bool): Whether to use `solve` instead of `inverse`
        diagonal_loading (bool): Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):

    Returns:
        filter_matrix (torch.complex64/ComplexTensor): (B, F, (btaps + 1) * C)
    r   r   r   r   )
r   r   rx   rz   r   r   r/   r   r   r   r   r   r   get_WPD_filter  s   &r   c                 C   sv   |j d }|rt|||d}t|}|dd|f }t|| }	|	t|	dd|ddf d |  }
td|
|}|S )a  Return the WPD vector (v2).

       This implementation is more efficient than `get_WPD_filter` as
        it skips unnecessary computation with zeros.

    Args:
        Phi (torch.complex64/ComplexTensor): (B, F, C, C)
            is speech PSD.
        Rf (torch.complex64/ComplexTensor): (B, F, (btaps+1) * C, (btaps+1) * C)
            is the power normalized spatio-temporal covariance matrix.
        reference_vector (torch.Tensor): (B, C)
            is the reference_vector.
        diagonal_loading (bool):
            Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):

    Returns:
        filter_matrix (torch.complex64/ComplexTensor): (B, F, (btaps+1) * C)
    rq   r   .Nr   r   )r   r   r   r   r   r   r
   )r   r   rx   r   r   r/   r   inv_Rfinv_Rf_prunedr   r   r   r   r   r   get_WPD_filter_v2
  s   

(r   psd_observed_barc
                 C   s  t |tr	tj}
nt|rtjjj}
ntd|	d}|r%t
|||	d}t|||||d}|
|ddd| jd | fdd}|rIt|| d}n
tt| |d}td|d |}|dur||dd	|df  }|| |jd|	  }|S ||jd|	  }|S )
az  Return the WPD vector calculated with RTF.

        WPD is the Weighted Power minimization Distortionless response
        convolutional beamformer. As follows:

        h = (Rf^-1 @ vbar) / (vbar^H @ R^-1 @ vbar)

    Reference:
        T. Nakatani and K. Kinoshita, "A Unified Convolutional Beamformer
        for Simultaneous Denoising and Dereverberation," in IEEE Signal
        Processing Letters, vol. 26, no. 6, pp. 903-907, June 2019, doi:
        10.1109/LSP.2019.2911179.
        https://ieeexplore.ieee.org/document/8691481

    Args:
        psd_observed_bar (torch.complex64/ComplexTensor):
            stacked observation covariance matrix
        psd_speech (torch.complex64/ComplexTensor):
            speech covariance matrix (..., F, C, C)
        psd_noise (torch.complex64/ComplexTensor):
            noise covariance matrix (..., F, C, C)
        iterations (int): number of iterations in power method
        reference_vector (torch.Tensor or int): (..., C) or scalar
        normalize_ref_channel (int):
            reference channel for normalizing the RTF
        use_torch_solver (bool):
            Whether to use `solve` instead of `inverse`
        diagonal_loading (bool):
            Whether to add a tiny term to the diagonal of psd_n
        diag_eps (float):
        eps (float):
    Returns:
        beamform_vector (torch.complex64/ComplexTensor)r: (..., F, C)
    z?Please update your PyTorch version to 1.9+ for complex support.rq   r   r   r   r   r   N.)rW   r   r   r   r   r,   r   r   rr   r   r   r   r   r   r   r   r   r
   rK   r\   r   )r   rT   rj   ry   rx   r   rz   r   r   r/   r   r   r   r   r   r   r   r   r   r   get_WPD_filter_with_rtf8  s8   
.
 	r   filter_matrixc           
      C   sh   t ||d d|ddd}t|dd}|j\}}}}|ddddd	 |||d}td
||  }	|	S )zPerform WPD filtering.

    Args:
        filter_matrix: Filter matrix (B, F, (btaps + 1) * C)
        Y : Complex STFT signal with shape (B, F, C, T)

    Returns:
        enhanced (torch.complex64/ComplexTensor): (B, F, T)
    r*   Tr   )r   r   rq   r%   r      r#   z...tc,...c->...t)r   r   r   permute
contiguousr   r
   rK   )
r   r   rQ   rR   Ytilder   r   r   r   enhancedr   r   r   perform_WPD_filtering  s   "r  r   c                 C   s   |  d}tj|| j| jd}dd t|  d D ||g }|j| jg | j	dd ddR  }t
  t| jd	 | }|| }W d   n1 sQw   Y  | ||  } | S )
a)  Perform Tikhonov regularization (only modifying real part).

    Args:
        mat (torch.complex64/ComplexTensor): input matrix (..., C, C)
        reg (float): regularization factor
        eps (float)
    Returns:
        ret (torch.complex64/ComplexTensor): regularized matrix (..., C, C)
    rq   )dtypedevicec                 S   s   g | ]}d qS r   r   )r   r   r   r   r   r     s    ztik_reg.<locals>.<listcomp>r#   Nr$   r*   r   )r   r,   eyer  r  r_   r&   r   repeatr   no_gradr   r   r\   )matr   r/   r   r  r   epsilonr   r   r   r     s   
"(

r   )Nr   r   r   r   )Tr'   rm   )rw   r   r   T)TTr   r   )r   NNTTr   r   )r   Fr   TTr   r   )Tr   r   Tr   r   )NTTr   r   )r   )r   )rw   r   r   TTr   r   )Fr   N)F)Tr   r   )r   NNTTr   rm   )r   r   )9__doc__typingr   r   r   r,   packaging.versionr   Vtorch_complexr   r   torch_complex.tensorr    espnet2.enh.layers.complex_utilsr   r	   r
   r   r   r   r   r   r   r   __version__r~   finfodoubler/   EPSrl   floatr0   r}   Tensorboolr   r   r   r   r   r   r   r   r   r   r   r   r   r   rP   r   r   r   r  r   r   r   r   r   <module>   s   0
 
&
@
3	

A

-	

Y	

Y
"
0(
	

[
U
:
<
2	

T
