o
    siI;                     @   sR   d dl mZ d dlZd dlmZ d dlmZ G dd dejZG dd deZdS )	    )permutationsN)nnlinear_sum_assignmentc                       s   e Zd ZdZd fdd	ZdddZed	d
 Zedd ZedddZ	edd Z
edddZedejfddZ  ZS )PITLossWrappera  Permutation invariant loss wrapper.

    Args:
        loss_func: function with signature (est_targets, targets, **kwargs).
        pit_from (str): Determines how PIT is applied.

            * ``'pw_mtx'`` (pairwise matrix): `loss_func` computes pairwise
              losses and returns a torch.Tensor of shape
              :math:`(batch, n\_src, n\_src)`. Each element
              :math:`(batch, i, j)` corresponds to the loss between
              :math:`targets[:, i]` and :math:`est\_targets[:, j]`
            * ``'pw_pt'`` (pairwise point): `loss_func` computes the loss for
              a batch of single source and single estimates (tensors won't
              have the source axis). Output shape : :math:`(batch)`.
              See :meth:`~PITLossWrapper.get_pw_losses`.
            * ``'perm_avg'`` (permutation average): `loss_func` computes the
              average loss for a given permutations of the sources and
              estimates. Output shape : :math:`(batch)`.
              See :meth:`~PITLossWrapper.best_perm_from_perm_avg_loss`.

            In terms of efficiency, ``'perm_avg'`` is the least efficicient.

        perm_reduce (Callable): torch function to reduce permutation losses.
            Defaults to None (equivalent to mean). Signature of the func
            (pwl_set, **kwargs) : :math:`(B, n\_src!, n\_src) --> (B, n\_src!)`.
            `perm_reduce` can receive **kwargs during forward using the
            `reduce_kwargs` argument (dict). If those argument are static,
            consider defining a small function or using `functools.partial`.
            Only used in `'pw_mtx'` and `'pw_pt'` `pit_from` modes.

    For each of these modes, the best permutation and reordering will be
    automatically computed. When either ``'pw_mtx'`` or ``'pw_pt'`` is used,
    and the number of sources is larger than three, the hungarian algorithm is
    used to find the best permutation.

    Examples
        >>> import torch
        >>> from asteroid.losses import pairwise_neg_sisdr
        >>> sources = torch.randn(10, 3, 16000)
        >>> est_sources = torch.randn(10, 3, 16000)
        >>> # Compute PIT loss based on pairwise losses
        >>> loss_func = PITLossWrapper(pairwise_neg_sisdr, pit_from='pw_mtx')
        >>> loss_val = loss_func(est_sources, sources)
        >>>
        >>> # Using reduce
        >>> def reduce(perm_loss, src):
        >>>     weighted = perm_loss * src.norm(dim=-1, keepdim=True)
        >>>     return torch.mean(weighted, dim=-1)
        >>>
        >>> loss_func = PITLossWrapper(pairwise_neg_sisdr, pit_from='pw_mtx',
        >>>                            perm_reduce=reduce)
        >>> reduce_kwargs = {'src': sources}
        >>> loss_val = loss_func(est_sources, sources,
        >>>                      reduce_kwargs=reduce_kwargs)
    pw_mtxNc                    s2   t    || _|| _|| _| jdvrtdd S )N)r   pw_ptperm_avgzVUnsupported loss function type for now. Expectedone of [`pw_mtx`, `pw_pt`, `perm_avg`])super__init__	loss_funcpit_fromperm_reduce
ValueError)selfr   r   r   	__class__ O/home/ubuntu/.local/lib/python3.10/site-packages/asteroid/losses/pit_wrapper.pyr   @   s   

zPITLossWrapper.__init__Fc                 K   s6  |j d }|dk sJ d| | jdkr | j||fi |}n:| jdkr2| j| j||fi |}n(| jdkrX| j| j||fi |\}}	t|}
|sN|
S | ||	}|
|fS dS |jdkscJ d	|j d
 |j d
 ksqJ d|durw|nt	 }| j
|fd| ji|\}}	t|}
|s|
S | ||	}|
|fS )a  Find the best permutation and return the loss.

        Args:
            est_targets: torch.Tensor. Expected shape $(batch, nsrc, ...)$.
                The batch of target estimates.
            targets: torch.Tensor. Expected shape $(batch, nsrc, ...)$.
                The batch of training targets
            return_est: Boolean. Whether to return the reordered targets
                estimates (To compute metrics or to save example).
            reduce_kwargs (dict or None): kwargs that will be passed to the
                pairwise losses reduce function (`perm_reduce`).
            **kwargs: additional keyword argument that will be passed to the
                loss function.

        Returns:
            - Best permutation loss for each batch sample, average over
              the batch.
            - The reordered targets estimates if ``return_est`` is True.
              :class:`torch.Tensor` of shape $(batch, nsrc, ...)$.
           
   z(Expected source axis along dim 1, found r   r   r	   N   zBSomething went wrong with the loss function, please read the docs.r   z&PIT loss needs same batch dim as inputr   )shaper   r   get_pw_lossesbest_perm_from_perm_avg_losstorchmeanreorder_sourcendimdictfind_best_permr   )r   est_targetstargets
return_estreduce_kwargskwargsn_src	pw_lossesmin_lossbatch_indices	mean_loss	reorderedr   r   r   forwardK   sF   







zPITLossWrapper.forwardc                 K   sr   |j ^}}}||||}t|ddD ]!\}}	t|ddD ]\}
}| |	|fi ||dd||
f< q!q|S )a  Get pair-wise losses between the training targets and its estimate
        for a given loss function.

        Args:
            loss_func: function with signature (est_targets, targets, **kwargs)
                The loss function to get pair-wise losses from.
            est_targets: torch.Tensor. Expected shape $(batch, nsrc, ...)$.
                The batch of target estimates.
            targets: torch.Tensor. Expected shape $(batch, nsrc, ...)$.
                The batch of training targets.
            **kwargs: additional keyword argument that will be passed to the
                loss function.

        Returns:
            torch.Tensor or size $(batch, nsrc, nsrc)$, losses computed for
            all permutations of the targets and est_targets.

        This function can be called on a loss function which returns a tensor
        of size :math:`(batch)`. There are more efficient ways to compute pair-wise
        losses using broadcasting.
        r   r   N)r   	new_empty	enumerate	transpose)r   r!   r"   r%   
batch_sizer&   _pair_wise_lossesest_idxest_src
target_idx
target_srcr   r   r   r      s   "zPITLossWrapper.get_pw_lossesc           	         s~   j d }tjttt|tjdtj fddD dd}tj|dd\}}tjfdd|D dd}||fS )a  Find best permutation from loss function with source axis.

        Args:
            loss_func: function with signature $(est_targets, targets, **kwargs)$
                The loss function batch losses from.
            est_targets: torch.Tensor. Expected shape $(batch, nsrc, *)$.
                The batch of target estimates.
            targets: torch.Tensor. Expected shape $(batch, nsrc, *)$.
                The batch of training targets.
            **kwargs: additional keyword argument that will be passed to the
                loss function.

        Returns:
            - :class:`torch.Tensor`:
                The loss corresponding to the best permutation of size $(batch,)$.

            - :class:`torch.Tensor`:
                The indices of the best permutations.
        r   dtypec                    s*   g | ]} d d |f fi qS Nr   ).0perm)r!   r%   r   r"   r   r   
<listcomp>   s   * z?PITLossWrapper.best_perm_from_perm_avg_loss.<locals>.<listcomp>dimc                       g | ]} | qS r   r   r:   mpermsr   r   r<          r   )	r   r   tensorlistr   rangelongstackmin)	r   r!   r"   r%   r&   loss_setr(   min_loss_idxr)   r   )r!   r%   r   rC   r"   r   r      s   
z+PITLossWrapper.best_perm_from_perm_avg_lossc                 K   sR   | j d }|dus|dkrtj| fd|i|\}}||fS t| \}}||fS )a  Find the best permutation, given the pair-wise losses.

        Dispatch between factorial method if number of sources is small (<3)
        and hungarian method for more sources. If ``perm_reduce`` is not None,
        the factorial method is always used.

        Args:
            pair_wise_losses (:class:`torch.Tensor`):
                Tensor of shape :math:`(batch, n\_src, n\_src)`. Pairwise losses.
            perm_reduce (Callable): torch function to reduce permutation losses.
                Defaults to None (equivalent to mean). Signature of the func
                (pwl_set, **kwargs) : :math:`(B, n\_src!, n\_src) -> (B, n\_src!)`
            **kwargs: additional keyword argument that will be passed to the
                permutation reduce function.

        Returns:
            - :class:`torch.Tensor`:
              The loss corresponding to the best permutation of size $(batch,)$.

            - :class:`torch.Tensor`:
              The indices of the best permutations.
        Nr   r   )r   r   find_best_perm_factorialfind_best_perm_hungarian)r2   r   r%   r&   r(   r)   r   r   r   r       s   

zPITLossWrapper.find_best_permc                 C   s   t dd t| |D }|S )ay  Reorder sources according to the best permutation.

        Args:
            source (torch.Tensor): Tensor of shape :math:`(batch, n_src, time)`
            batch_indices (torch.Tensor): Tensor of shape :math:`(batch, n_src)`.
                Contains optimal permutation indices for each batch.

        Returns:
            :class:`torch.Tensor`: Reordered sources.
        c                 S   s   g | ]\}}t |d |qS )r   )r   index_select)r:   sbr   r   r   r<      s    z1PITLossWrapper.reorder_source.<locals>.<listcomp>)r   rI   zip)sourcer)   reordered_sourcesr   r   r   r      s   zPITLossWrapper.reorder_sourcec                    s   | j d }| dd}|jttt|tjd t d}|du rA|	g  
 |R d|d}td||g}|| }n|ddt||df }||fi |}tj|dd\}	}
tj fd	d
|
D dd}|	|fS )a.  Find the best permutation given the pair-wise losses by looping
        through all the permutations.

        Args:
            pair_wise_losses (:class:`torch.Tensor`):
                Tensor of shape :math:`(batch, n_src, n_src)`. Pairwise losses.
            perm_reduce (Callable): torch function to reduce permutation losses.
                Defaults to None (equivalent to mean). Signature of the func
                (pwl_set, **kwargs) : :math:`(B, n\_src!, n\_src) -> (B, n\_src!)`
            **kwargs: additional keyword argument that will be passed to the
                permutation reduce function.

        Returns:
            - :class:`torch.Tensor`:
              The loss corresponding to the best permutation of size $(batch,)$.

            - :class:`torch.Tensor`:
              The indices of the best permutations.

        MIT Copyright (c) 2018 Kaituo XU.
        See `Original code
        <https://github.com/kaituoxu/Conv-TasNet/blob/master>`__ and `License
        <https://github.com/kaituoxu/Conv-TasNet/blob/master/LICENSE>`__.
        rM   r7      Nr   zbij,pij->bpr=   c                    r?   r   r   r@   rB   r   r   r<   &  rD   z;PITLossWrapper.find_best_perm_factorial.<locals>.<listcomp>r   )r   r/   
new_tensorrF   r   rG   r   rH   	unsqueeze	new_zerossizescatter_einsumarangesqueezerJ   rI   )r2   r   r%   r&   pwlidxperms_one_hotrK   pwl_setr(   rL   r)   r   rB   r   rN      s   
"
z'PITLossWrapper.find_best_perm_factorialr2   c                 C   sX   |  dd}|  }tdd |D |j}t|d|d ddg}||fS )aF  
        Find the best permutation given the pair-wise losses, using the Hungarian algorithm.

        Returns:
            - :class:`torch.Tensor`:
              The loss corresponding to the best permutation of size (batch,).

            - :class:`torch.Tensor`:
              The indices of the best permutations.
        rM   rV   c                 S   s   g | ]}t |d  qS )r   r   )r:   r`   r   r   r   r<   :  s    z;PITLossWrapper.find_best_perm_hungarian.<locals>.<listcomp>rW   ).N)	r/   detachcpur   rE   todevicegatherr   )r2   r`   pwl_copyr)   r(   r   r   r   rO   )  s   z'PITLossWrapper.find_best_perm_hungarian)r   N)FNr9   )__name__
__module____qualname____doc__r   r,   staticmethodr   r   r    r   rN   r   TensorrO   __classcell__r   r   r   r   r      s     8
;

 
2r   c                       s"   e Zd ZdZd fdd	Z  ZS )
PITReorderzzPermutation invariant reorderer. Only returns the reordered estimates.
    See `:py:class:asteroid.losses.PITLossWrapper`.Nc                    s$   t  jd||d|d|\}}|S )NT)r!   r"   r#   r$   r   )r
   r,   )r   r!   r"   r$   r%   r1   r+   r   r   r   r,   E  s   
zPITReorder.forwardr9   )rj   rk   rl   rm   r,   rp   r   r   r   r   rq   A  s    rq   )		itertoolsr   r   r   scipy.optimizer   Moduler   rq   r   r   r   r   <module>   s      <