o
    i8                     @   sz   d dl Z d dlZd dlZd dlmZ dd ZdddZdddd	d
dZddddddZdd Z	dddddddZ
dS )    N)"wiener_filter_predict_single_inputc                 C   s*   | d u rd S | dksJ | d|  d  S )N   
    )soft_max_SDRr   r   A/home/ubuntu/.local/lib/python3.10/site-packages/ci_sdr/pt/sdr.pysoft_max_SDR_to_eps   s   r   c                 C   s0   |du rdt | |  S dt ||  |  S )a}  
    >>> linear_to_db(torch.tensor(100.), torch.tensor(1.))
    tensor(20.)
    >>> linear_to_db(torch.tensor(100.), torch.tensor(1.), soft_max_SDR_to_eps(10))
    tensor(9.5861)
    >>> linear_to_db(torch.tensor(100.), torch.tensor(1.), soft_max_SDR_to_eps(20))
    tensor(16.9897)
    >>> linear_to_db(torch.tensor(100.), torch.tensor(1.), soft_max_SDR_to_eps(100))
    tensor(20.)

    >>> import numpy as np
    >>> linear_to_db(torch.tensor(1.), torch.tensor(np.finfo(np.float64).eps))
    tensor(156.5356, dtype=torch.float64)
    >>> linear_to_db(torch.tensor(1.), torch.tensor(np.finfo(np.float32).eps))
    tensor(69.2369)

    Nr   i)torchlog10)	numeratordenominatorepsr   r   r   linear_to_db   s   r   Ti   )compute_permutationfilter_lengthr   c                C   s   t di t ddiS )aP  Convolutive transfer function Invariant Signal-to-Distortion Ratio loss

    Note:
        To follow the pytorch convention, this function has as first argument
        the estimation, while `ci_sdr` follows the convention for many metrics,
        that use as first argument the reference.

    The difference to ci_sdr are:
     - Change the sign, so this function can be minimized by an NN to reach the
       optimum.

    Args:
        estimation: ... x source x samples
        reference: ... x source x samples
        compute_permutation: If true, assume estimation source index is
            permuted. Note mir_eval.separation.bss_eval_sources computes
            the permutation based on the SIR, while this function computes the
            permutation based on the SDR.
        filter_length:
        soft_max_SDR:

    Returns:

    change_signTNr   )ci_sdrlocals)
estimation	referencer   r   r   r   r   r   ci_sdr_loss)   s
    r   optimal)r   r   	algorithmc                   s   ddl m | ddddddf } |ddddddf }t| |||dd}t|jdkr5|d dS t|d	}t fd
d|D }|j	|jdd  S )au  Convolutive transfer function Invariant Signal-to-Distortion Ratio loss

    The `ci_sdr_loss` function is more efficient for low number of speakers,
    while this function has advantages for larger number of speakers.
    The optimization of this function is, that instead of triing each
    permutation, it uses the idea of the Hungarian algorithm to decompose the
    permutation problem in the calculation of a loss/score matrix and a
    "linear sum assignment" problem (scipy.optimize.linear_sum_assignment).

    Args:
        estimation: ... x source x samples
        reference: ... x source x samples
        filter_length:
        soft_max_SDR:
        algorithm: Either 'optimal' or 'greedy'
         - 'optimal': Use scipy.optimize.linear_sum_assignment to find the
                      optimal assignment
         - 'greedy' : Use a gready approach to find a good assignment.
                      ToDo: Check thesis: greedy is better for large number of
                                          speakers.

    Returns:

    Example:
        >>> reference = torch.tensor([[1., 2, 1, 2], [4, 3, 2, 1], [1, 2, 3, 4]])
        >>> estimation = torch.tensor([[1., 2, 3, 4], [1, 2, 1, 2], [4, 3, 2, 1]])
        >>> ci_sdr_loss_hungarian(estimation, reference, filter_length=2)
        tensor([-144.0805, -145.4635, -143.0331])
        >>> ci_sdr_loss(estimation, reference, filter_length=2)
        tensor([-144.0805, -145.4635, -143.0331])
        >>> estimation = torch.tensor([[1., 2, 2, 4], [1, 1, 1, 2], [4, 2, 2, 1]])
        >>> ci_sdr_loss_hungarian(estimation, reference, filter_length=2)
        tensor([-10.3300, -17.2712, -15.4051])
        >>> ci_sdr_loss(estimation, reference, filter_length=2)
        tensor([-10.3300, -17.2712, -15.4051])
        >>> ci_sdr_loss_hungarian(estimation[None], reference[None], filter_length=2)
        tensor([[-10.3300, -17.2712, -15.4051]])

    r   )pit_loss_from_loss_matrix.NF)r   r   r   r   r      	reductionr   z... k1 k2 -> (...) k1 k2c                    s   g | ]	}|d  dqS )Nr   r   ).0lr   r   r   r   
<listcomp>   s    
z)ci_sdr_loss_hungarian.<locals>.<listcomp>)
'padertorch.ops.losses.source_separationr   r   lenshapeeinops	rearranger	   stackreshape)r   r   r   r   r   loss_matrixloss_matrix_flatlossr   r   r   ci_sdr_loss_hungarianO   s*   0r,   c                 C   sJ   t | d d d |d d d D ]\}}|dks|dks||kr q dS dS )Nr!      FT)zip)shape1shape2abr   r   r   _is_broadcastable   s
   &r3   F)r   r   r   r   c                C   sv  t | jdkr)t |jdksJ | j|jfd}| dddf } |dddf }nd}| j^ }}}	|dkr|r| j|jksFJ | j|jfd}
g }tdg|j }ttt|}|D ]}|||
< |t	| |t
| d||dd q]t | jdkrt|}tjtj|dd	d
d\}}|| }n3t|}t|d}tjtj|dd	d
d\}}|jd }||t|ddf }|j|jdd  }|r| S |S t| j|jsJ | j|jf| jd |jd ksJ | j|jf|}|d
kr
t| ||d}tjj|d
|d g}n| }tj|d dd}tj|| d dd}t||t|d}|r/| }|r9tj|dd}|S )a  Convolutive transfer function Invariant Signal-to-Distortion Ratio

    With the default arguments, this functions returns the same value as the
    SDR from `mir_eval.separation.bss_eval_sources`.

    Args:
        reference: ... x source x samples
        estimation: ... x source x samples
        compute_permutation: If true, assume estimation source index is
            permuted. Note mir_eval.separation.bss_eval_sources computes
            the permutation based on the SIR, while this function computes the
            permutation based on the SDR.
        change_sign:
            When True, assume this function is used as loss and return `-SDR`
            instead of `SDR`.
        filter_length:
        soft_max_SDR: ToDo: Was it first proposed in mixture of mixture?

    Returns:
        SDR values for each source

    >>> import numpy as np
    >>> import paderbox as pb

    >>> from paderbox.testing.testfile_fetcher import fetch_file_from_url

    >>> prefix = 'https://github.com/fgnt/pb_test_data/raw/master/bss_data/reverberation/'
    >>> audio_data = {
    ...     file.split('.')[0]: pb.io.load(fetch_file_from_url(prefix + file))
    ...     for file in [
    ...         'speech_source_0.wav',  # speaker 0
    ...         'speech_source_1.wav',  # speaker 1
    ...         'speech_reverberation_early_0.wav',  # reverberated signal, speaker 0
    ...         'speech_reverberation_early_1.wav',  # reverberated signal, speaker 1
    ...         'speech_image_0.wav',  # reverberated signal, speaker 0
    ...         'speech_image_1.wav',  # reverberated signal, speaker 1
    ...         'observation.wav',
    ...     ]
    ... }
    >>> ref_channel = 0
    >>> reference = np.array([audio_data['speech_source_0'], audio_data['speech_source_1']])
    >>> estimation = np.array([audio_data['speech_image_0'][ref_channel, :], audio_data['speech_image_1'][ref_channel, :]])

    >>> reference.shape, estimation.shape
    ((2, 38520), (2, 38520))
    >>> reference_pt = torch.as_tensor(reference)
    >>> estimation_pt = torch.as_tensor(estimation)

    >>> import pb_bss
    >>> pb_bss.evaluation.mir_eval_sources(reference, estimation)[0]
    array([12.60576235, 12.45027328])
    >>> ci_sdr(reference_pt, estimation_pt)
    tensor([12.6058, 12.4503], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt, compute_permutation=True)
    tensor([12.6058, 12.4503], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt[[0, 1]], compute_permutation=False)
    tensor([12.6058, 12.4503], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt[[1, 0]], compute_permutation=False)
    tensor([-23.5670, -25.1648], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt[[0, 1]], compute_permutation=True)
    tensor([12.6058, 12.4503], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt[[1, 0]], compute_permutation=True)
    tensor([12.6058, 12.4503], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt[[0, 1]], compute_permutation=True, change_sign=True)
    tensor([-12.6058, -12.4503], dtype=torch.float64)

    >>> ci_sdr(reference_pt, estimation_pt, soft_max_SDR=20)
    tensor([11.8788, 11.7469], dtype=torch.float64)
    >>> ci_sdr(reference_pt, reference_pt, soft_max_SDR=20)
    tensor([20., 20.], dtype=torch.float64)
    >>> sdrs = ci_sdr(reference_pt, reference_pt, soft_max_SDR=None)
    ... # tensor([245.8194, 282.1901], dtype=torch.float64)  # old pytorch
    ... # tensor([253.4707, 279.4020], dtype=torch.float64)  # new pytorch
    >>> sdrs > 200, sdrs < 300
    (tensor([True, True]), tensor([True, True]))


    >>> estimation = audio_data['observation'][:2]
    >>> pb_bss.evaluation.mir_eval_sources(reference, estimation)[0]
    array([ 1.83304215, -2.79861495])
    >>> e = torch.tensor(estimation, requires_grad=True)
    >>> sdr = ci_sdr(reference_pt, e)
    >>> sdr  # doctest: +ELLIPSIS
    tensor([ 1.8330, -2.7986], dtype=torch.float64, grad_fn=<SelectBackward...>)
    >>> sdr.sum().backward()
    >>> e.grad
    tensor([[-2.7294e-06, -5.2814e-06, -3.2224e-05,  ..., -8.6633e-05,
             -1.3574e-04, -7.0333e-06],
            [-3.0444e-06,  3.5137e-06,  4.8881e-06,  ...,  3.4590e-06,
              4.3444e-05,  6.0922e-06]], dtype=torch.float64)

    Comparison with si_sdr and sdr. Note, we must change reference to a
    reverberated signal (speech_image), otherwise we get very bad values for
    both objectives.

    >>> reference = np.array([audio_data['speech_reverberation_early_0'][ref_channel, :], audio_data['speech_reverberation_early_1'][ref_channel, :]])
    >>> estimation = np.array([audio_data['speech_image_0'][ref_channel, :], audio_data['speech_image_1'][ref_channel, :]])

    # >>> reference = audio_data['speech_image'][(0, 1), (0, 1), :]
    # >>> estimation = audio_data['observation'][:2, :] + reference
    >>> reference.shape, estimation.shape
    ((2, 38520), (2, 38520))
    >>> reference_pt = torch.as_tensor(reference)
    >>> estimation_pt = torch.as_tensor(estimation)

    >>> from padertorch.ops.losses.regression import sdr_loss, si_sdr_loss
    >>> si_sdr_loss(estimation_pt, reference_pt, reduction=None)
    tensor([-9.9188, -9.5530], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt, soft_max_SDR=None, filter_length=1, change_sign=True)
    tensor([-9.9188, -9.5530], dtype=torch.float64)
    >>> sdr_loss(estimation_pt, reference_pt, reduction=None)
    tensor([-9.9814, -9.4490], dtype=torch.float64)
    >>> ci_sdr(reference_pt, estimation_pt, soft_max_SDR=None, filter_length=0, change_sign=True)
    tensor([-9.9814, -9.4490], dtype=torch.float64)
    r-   TNF)r   r   r   r   r   r!   )axisr   )dimz*permutations ... k -> permutations (...) k)r   )r   )r#   r$   slicendimlist	itertoolspermutationsrangeappendr   tupler	   r'   maxsumr%   r&   r(   r3   r   nn
functionalpadr   r   squeeze)r   r   r   r   r   r   single_source_Knum_samplesr5   
candidatesindexerr;   permutationidxsdrcandidates_flat
batch_sizeestreverberatednumdenscoresr   r   r   r      sn   |





$
r   )N)r:   r	   r%   ci_sdr.pt.wiener_filterr   r   r   r   r,   r3   r   r   r   r   r   <module>   s(    
+M