o
    pi$                     @   sj  d dl Z d dlmZmZ d dlmZmZmZmZ d dl	Z
d dlZd dlZd dlm  mZ d dlmZ d dlmZ eddeed	 B d
efddZdd Zdd Zej		ddejdejdeed	 B d
edeejeee  f f
ddZej		ddejdejdeed	 B d
edeejeee  f f
ddZdefdede deejejgejf de
j!fddZ"dS )    N)partialsingledispatch)CallableListLiteralTuple)SlidingWindowFeature)linear_sum_assignmentmseF	cost_func)r
   maereturn_costc                 C   s   t  )a  Find cost-minimizing permutation

    Parameters
    ----------
    y1 : np.ndarray or torch.Tensor
        (batch_size, num_samples, num_classes_1)
    y2 : np.ndarray or torch.Tensor
        (num_samples, num_classes_2) or (batch_size, num_samples, num_classes_2)
    cost_func : callable or {"mse", "mae"}, optional
        Can be either "mse" (mean squared error) or "mae" (mean absolute error) or a callable.
        When callable, takes two (num_samples, num_classes) sequences 
        and returns (num_classes, ) pairwise cost.
        Defaults to computing mean squared error ("mse").
    return_cost : bool, optional
        Whether to return cost matrix. Defaults to False.

    Returns
    -------
    permutated_y2 : np.ndarray or torch.Tensor
        (batch_size, num_samples, num_classes_1)
    permutations : list of tuple
        List of permutations so that permutation[i] == j indicates that jth speaker of y2
        should be mapped to ith speaker of y1.  permutation[i] == None when none of y2 speakers
        is mapped to ith speaker of y1.
    cost : np.ndarray or torch.Tensor, optional
        (batch_size, num_classes_1, num_classes_2)
    )	TypeError)y1y2r   r    r   T/home/ubuntu/.local/lib/python3.10/site-packages/pyannote/audio/utils/permutation.py	permutate%   s   r   c                 K   s   t jtj| |ddddS )zCompute class-wise mean-squared error

    Parameters
    ----------
    Y, y : (num_frames, num_classes) torch.tensor

    Returns
    -------
    mse : (num_classes, ) torch.tensor
        Mean-squared error
    none)	reductionr   axis)torchmeanFmse_lossYykwargsr   r   r   mse_cost_funcE   s   r    c                 K   s   t jt | | ddS )zCompute class-wise mean absolute difference error

    Parameters
    ----------
    Y, y: (num_frames, num_classes) torch.tensor

    Returns
    -------
    mae : (num_classes, ) torch.tensor
        Mean absolute difference error
    r   r   )r   r   absr   r   r   r   mae_cost_funcT   s   r"   r   r   returnc              	      sB  | j \}}}t|j dkr||dd}t|j dkr!d}t||j \}}	||ks/||	krBdt| j  dt|j  d}t| d u rHd g }
g }|rPg }tj| j |j|jd	}t	t
| |D ]\}\t J  dkrdd
 }tj|| dd}n* dkrdd
 }tjt|dd}nt fddt|D }W d    n1 sw   Y  |krt|ddd| fdt|d
 }n|}d g| }t
t|  D ]\}}||k r|||< d d |f ||d d |f< q|
t| |r|| qb|r||
t|fS ||
fS )N      zAIncorrect shape: should be (batch_size, num_frames, num_classes).zShape mismatch: z vs. .r
   )devicedtype   r   )dimr   c              	      s2   g | ]} d d ||d f  dqS )Nr*   r%   )expand).0ir   num_classes_2y1_y2_r   r   
<listcomp>   s    $z#permutate_torch.<locals>.<listcomp>constant)shapelenr,   
ValueErrortupler   zerosr(   r)   	enumeratezipno_grad	unsqueezer   r!   stackranger   padmaxr	   cpuappend)r   r   r   r   
batch_sizenum_samplesnum_classes_1msgbatch_size_num_samples_permutationspermutated_y2costsbdiffcostpadded_costpermutationk1k2r   r/   r   permutate_torchc   sj   



rT   c                 C   sP   t t| t|||d}|r|\}}}| || fS |\}}| |fS )Nr   r   )r   r   
from_numpynumpy)r   r   r   r   outputrK   rJ   rL   r   r   r   permutate_numpy   s   
rY   g      ?segmentationsonsetc              	   C   s  t ||d}| j}| jj\}}}t|j|j d }d|f }t	 }	t
| D ]\}
\}}ttd|
|d  t||
|d  d D ]}||
krJqCt|
| | |j |j }|dk rp| }||d }| |d|| f }n|d||  }| ||df }t|tj ||dd\}\}\}t
|D ]D\}}t|dd|f |k}t|dd|f |k}|r|	|
|f |r|	||f |r|r|	j|
|f||f|||f d qqCq(|	S )	aa  Build permutation graph

    Parameters
    ----------
    segmentations : (num_chunks, num_frames, local_num_speakers)-shaped SlidingWindowFeature
        Raw output of segmentation model.
    onset : float, optionan
        Threshold above which a speaker is considered active. Defaults to 0.5
    cost_func : callable
        Cost function used to find the optimal bijective mapping between speaker activations
        of two overlapping chunks. Expects two (num_frames, num_classes) torch.tensor as input
        and returns cost as a (num_classes, ) torch.tensor. Defaults to mae_cost_func.

    Returns
    -------
    permutation_graph : nx.Graph
        Nodes are (chunk_idx, speaker_idx) tuples.
        An edge between two nodes indicate that those are likely to be the same speaker
        (the lower the value of "cost" attribute, the more likely).
    )r[   r*   r$   r   NTrU   )rO   )r   sliding_windowdatar5   mathfloordurationstepnxGraphr:   r?   rA   minroundr   npnewaxisanyadd_nodeadd_edge)rZ   r[   r   chunks
num_chunks
num_frames_max_lookahead	lookaheadpermutation_graphCchunksegmentationcshiftthis_segmentationsthat_segmentationsrQ   rO   thisthatthis_is_activethat_is_activer   r   r   build_permutation_graph   sL   
.(r}   )r
   F)#r^   	functoolsr   r   typingr   r   r   r   networkxrb   rW   rf   r   torch.nn.functionalnn
functionalr   pyannote.corer   scipy.optimizer	   boolr   r    r"   registerTensorintrT   ndarrayrY   floatrc   r}   r   r   r   r   <module>   sh   
L
