o
    SiR                  	   @   s6  d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlm  mZ d dlmZmZ d dlmZ d dlmZmZmZmZmZ d dlm Z m!Z!m"Z"m#Z#m$Z$ ed	ed
Z%G dd dZ&G dd de&Z'G dd de&Z(G dd de&Z)eddd efde*dee% de
e fddZ+dS )    N)ExecutorThreadPoolExecutor)	lru_cache)CallableDictListOptionalTupleTypeTypeVarUnion)CutSetFeatureExtractor)compute_supervisions_frame_mask)collate_audiocollate_featurescollate_matricescollate_vectorsread_audio_from_cuts)LOG_EPSILONcompute_num_framesifnonesupervision_to_framessupervision_to_samplesExecutorType)boundc                   @   s~   e Zd ZdZdefdedee ddfddZd	e	de
ejejf fd
dZd	e	deeejf fddZd	e	dejfddZdS )BatchIOa  
    Converts a :class:`CutSet` into a collated batch of audio representations.
    These representations can be e.g. audio samples or features.
    They might also be single or multi channel.

    All InputStrategies support the ``executor`` parameter in the constructor.
    It allows to pass a ``ThreadPoolExecutor`` or a ``ProcessPoolExecutor``
    to parallelize reading audio/features from wherever they are stored.
    Note that this approach is incompatible with specifying the ``num_workers``
    to ``torch.utils.data.DataLoader``, but in some instances may be faster.

    .. note:: This is a base class that only defines the interface.

    .. automethod:: __call__
    r   num_workersexecutor_typereturnNc                 C   s   || _ || _d S N)r   _executor_type)selfr   r    r#   S/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/input_strategies.py__init__.   s   
zBatchIO.__init__cutsc                 C      t  )zcReturns a tensor with collated input signals, and a tensor of length of each signal before padding.NotImplementedErrorr"   r&   r#   r#   r$   __call__6   s   zBatchIO.__call__c                 C   r'   )ap  
        Returns a dict that specifies the start and end bounds for each supervision,
        as a 1-D int tensor.

        Depending on the strategy, the dict should look like:

        .. code-block::

            {
                "sequence_idx": tensor(shape=(S,)),
                "start_frame": tensor(shape=(S,)),
                "num_frames": tensor(shape=(S,)),
            }

        or

        .. code-block::

            {
                "sequence_idx": tensor(shape=(S,)),
                "start_sample": tensor(shape=(S,)),
                "num_samples": tensor(shape=(S,))
            }

        Where ``S`` is the total number of supervisions encountered in the :class:`CutSet`.
        Note that ``S`` might be different than the number of cuts (``B``).
        ``sequence_idx`` means the index of the corresponding feature matrix (or cut) in a batch.
        r(   r*   r#   r#   r$   supervision_intervals:   s   zBatchIO.supervision_intervalsc                 C   r'   )a  
        Returns a collated batch of masks, marking the supervised regions in cuts.
        They are zero-padded to the longest cut.

        Depending on the strategy implementation, it is expected to be a
        tensor of shape ``(B, NF)`` or ``(B, NS)``, where ``B`` denotes the number of cuts,
        ``NF`` the number of frames and ``NS`` the total number of samples.
        ``NF`` and ``NS`` are determined by the longest cut in a batch.
        r(   r*   r#   r#   r$   supervision_masksY   s   
zBatchIO.supervision_masks)__name__
__module____qualname____doc__r   intr
   r   r%   r   r	   torchTensor	IntTensorr+   r   strr,   r-   r#   r#   r#   r$   r      s    
r   c                	   @   s   e Zd ZdZ	ddedee deej	ej	f fddZ
	ddedee deeej	f fdd	Z	
	ddedee dee dej	fddZd
S )PrecomputedFeaturesa  
    :class:`InputStrategy` that reads pre-computed features, whose manifests
    are attached to cuts, from disk.

    It automatically pads the feature matrices so that every example has the same number
    of frames as the longest cut in a mini-batch.
    This is needed to put all examples into a single tensor.
    The padding value is a low log-energy, around log(1e-10).

    .. automethod:: __call__
    rightr&   pad_directionr   c                 C   s   t ||t| j| jddS )a  
        Reads the pre-computed features from disk/other storage.
        The returned shape is ``(B, T, F) => (batch_size, num_frames, num_features)``.

        :return: a tensor with collated features, and a tensor of ``num_frames`` of each cut before padding.r   )r9   executor)r   _get_executorr   r!   )r"   r&   r9   r#   r#   r$   r+   s   s
   
zPrecomputedFeatures.__call__c                    s   |dvrt d| tdd |D  tdd |D  \}}|dkr6 fdd|D }d	d t||D }d
d t|D }tj|tjdtj|tjdtj|tjddS )h  
        Returns a dict that specifies the start and end bounds for each supervision,
        as a 1-D int tensor, in terms of frames:

        .. code-block::

            {
                "sequence_idx": tensor(shape=(S,)),
                "start_frame": tensor(shape=(S,)),
                "num_frames": tensor(shape=(S,))
            }

        Where ``S`` is the total number of supervisions encountered in the :class:`CutSet`.
        Note that ``S`` might be different than the number of cuts (``B``).
        ``sequence_idx`` means the index of the corresponding feature matrix (or cut) in a batch.
        leftr8   -pad_direction must be 'left' or 'right', got c                 s   s    | ]}|j V  qd S r    )
num_frames.0cutr#   r#   r$   	<genexpr>   s    z<PrecomputedFeatures.supervision_intervals.<locals>.<genexpr>c                 s   s2    | ]}|j D ]}t||j|j|jd V  qqdS )
max_framesN)supervisionsr   frame_shiftsampling_raterA   rC   rD   supr#   r#   r$   rE      s    r?   c                    s"   g | ]}|j D ]} |j qqS r#   )rH   rA   )rC   rD   _rF   r#   r$   
<listcomp>   s
    
z=PrecomputedFeatures.supervision_intervals.<locals>.<listcomp>c                 S   s   g | ]\}}|| qS r#   r#   )rC   sor#   r#   r$   rN      s    c                 S       g | ]\}}|j D ]}|q	qS r#   rH   )rC   icrM   r#   r#   r$   rN           dtypesequence_idxstart_framerA   )
ValueErrormaxzip	enumerater3   tensorint32)r"   r&   r9   start_framesnums_framesoffsetsrY   r#   rF   r$   r,      s(   

z)PrecomputedFeatures.supervision_intervalsNuse_alignment_if_existsc                    s4   |dvrt d|  fdd|D }t||dS )a0  Returns the mask for supervised frames.

        :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not
            exists, fall back on supervision time spans.
        :param pad_direction: where to apply the padding (``right`` or ``left``).
        r>   r@   c                       g | ]}|j  d qS rd   )supervisions_feature_maskrB   rg   r#   r$   rN          z9PrecomputedFeatures.supervision_masks.<locals>.<listcomp>)r9   )r[   r   )r"   r&   rd   r9   masksr#   rg   r$   r-      s   
z%PrecomputedFeatures.supervision_masks)r8   )Nr8   )r.   r/   r0   r1   r   r   r6   r	   r3   r4   r+   r   r,   r-   r#   r#   r#   r$   r7   f   s:    

7r7   c                       s   e Zd ZdZddedfdededee dedd	f
 fd
dZ			dde
dee deeejejf eejeje
f f fddZde
deeejf fddZ		dde
dee dejfddZ  ZS )AudioSamplesa  
    :class:`InputStrategy` that reads single-channel recordings, whose manifests
    are attached to cuts, from disk (or other audio source).

    It automatically zero-pads the recordings so that every example has the same number
    of audio samples as the longest cut in a mini-batch.
    This is needed to put all examples into a single tensor.

    .. automethod:: __call__
    r   Fr   fault_tolerantr   use_batch_loaderr   Nc                    sD   t  j||d || _d| _|| _| jr ddlm} | | _dS dS )a.  
        AudioSamples constructor.

        :param num_workers: when larger than 0, we will spawn an executor (of type specified
            by ``executor_type``) to read the audio data in parallel.
            Thread executor can be used with PyTorch's DataLoader, whereas Process executor
            would fail (but could be faster for other applications).
        :param fault_tolerant: when ``True``, the cuts for which audio loading failed
            will be skipped. It will make ``__call__`` return an additional item,
            which is the CutSet for which we successfully read the audio.
            It may be a subset of the input CutSet.
        :param executor_type: the type of executor used for parallel audio reads
            (only relevant when ``num_workers>0``).
        :param use_batch_loader: When ``True``, enables batch loading of audio data from AIStore.
            This allows all audio samples in the batch to be fetched in a single request for increased efficiency.
            Requires the input CutSet to be eager (not lazy).
        r   r   Nr   )AISBatchLoader)superr%   rl   ais_batch_loaderrm   
lhotse.aisro   )r"   r   rl   r   rm   ro   	__class__r#   r$   r%      s   zAudioSamples.__init__r&   recording_fieldc                 C   s8   | j r| jdur| |}t|t| j| jd| j|dS )a  
        Reads the audio samples from recordings on disk/other storage.
        The returned shape is ``(B, T) => (batch_size, num_samples)``.

        :param recording_field: when specified, we will try to load recordings from a custom field with this name
            (i.e., ``cut.load_<recording_field>()`` instead of default ``cut.load_audio()``).
        :return: a tensor with collated audio samples, and a tensor of ``num_samples`` of each cut before padding.

        .. note::
            When AIStore batch loading is enabled (`use_batch_loader=True`), the audio data
            will be fetched from AIStore using a single batch request before collation.
            The input CutSet must be eager (not lazy).
        Nr:   )r;   rl   ru   )rm   rq   r   r<   r   r!   rl   )r"   r&   ru   r#   r#   r$   r+      s   
zAudioSamples.__call__c                 C   sX   t dd |D  \}}dd t|D }tj|tjdtj|tjdtj|tjddS )al  
        Returns a dict that specifies the start and end bounds for each supervision,
        as a 1-D int tensor, in terms of samples:

        .. code-block::

            {
                "sequence_idx": tensor(shape=(S,)),
                "start_sample": tensor(shape=(S,)),
                "num_samples": tensor(shape=(S,))
            }

        Where ``S`` is the total number of supervisions encountered in the :class:`CutSet`.
        Note that ``S`` might be different than the number of cuts (``B``).
        ``sequence_idx`` means the index of the corresponding feature matrix (or cut) in a batch.

        c                 s   s(    | ]}|j D ]	}t||jV  qqd S r    )rH   r   rJ   rK   r#   r#   r$   rE   -  s    
z5AudioSamples.supervision_intervals.<locals>.<genexpr>c                 S   rQ   r#   rR   rC   rS   rT   rO   r#   r#   r$   rN   3  rU   z6AudioSamples.supervision_intervals.<locals>.<listcomp>rV   )rY   start_samplenum_samplesr]   r^   r3   r_   r`   )r"   r&   start_samplesnums_samplesrY   r#   r#   r$   r,     s   z"AudioSamples.supervision_intervalsrd   c                    s   t  fdd|D S )Returns the mask for supervised samples.

        :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not
            exists, fall back on supervision time spans.
        c                    re   rf   )supervisions_audio_maskrB   rg   r#   r$   rN   C  ri   z2AudioSamples.supervision_masks.<locals>.<listcomp>r   r"   r&   rd   r#   rg   r$   r-   :  s
   
zAudioSamples.supervision_masksr    )r.   r/   r0   r1   r   r2   boolr
   r   r%   r   r   r6   r   r	   r3   r4   r+   r   r,   r-   __classcell__r#   r#   rs   r$   rk      sH    " 
!rk   c                       s   e Zd ZdZdddddefdedeeej	gej	f  de
d	ed
ededee ddf fddZ	ddedee deeej	ej	f eej	ej	ef f fddZdedeeej	f fddZ	ddedee dej	fddZ  ZS )OnTheFlyFeaturesa  
    :class:`InputStrategy` that reads single-channel recordings, whose manifests
    are attached to cuts, from disk (or other audio source).
    Then, it uses a :class:`FeatureExtractor` to compute their features on-the-fly.

    It automatically pads the feature matrices so that every example has the same number
    of frames as the longest cut in a mini-batch.
    This is needed to put all examples into a single tensor.
    The padding value is a low log-energy, around log(1e-10).

    .. note:
        The batch feature extraction performed here is not as efficient as it could be,
        but it allows to use arbitrary feature extraction method that may work on
        a single recording at a time.

    .. automethod:: __call__
    Nr   TF	extractorwave_transformsr   use_batch_extractrl   return_audior   r   c                    s8   t  j||d || _t|g | _|| _|| _|| _dS )a  
        OnTheFlyFeatures' constructor.

        :param extractor: the feature extractor used on-the-fly (individually on each waveform).
        :param wave_transforms: an optional list of transforms applied on the batch of audio
            waveforms collated into a single tensor, right before the feature extraction.
        :param num_workers: when larger than 0, we will spawn an executor (of type specified
            by ``executor_type``) to read the audio data in parallel.
            Thread executor can be used with PyTorch's DataLoader, whereas Process executor
            would fail (but could be faster for other applications).
        :param use_batch_extract: when ``True``, we will call
            :meth:`~lhotse.features.base.FeatureExtractor.extract_batch` to compute the features
            as it is possibly faster. It has a restriction that all cuts must have the same
            sampling rate. If that is not the case, set this to ``False``.
        :param fault_tolerant: when ``True``, the cuts for which audio loading failed
            will be skipped. It will make ``__call__`` return an additional item,
            which is the CutSet for which we successfully read the audio.
            It may be a subset of the input CutSet.
        :param return_audio: When ``True``, calling this object will additionally return collated
            audio tensor and audio lengths tensor.
        :param executor_type: the type of executor used for parallel audio reads
            (only relevant when ``num_workers>0``).
        rn   N)rp   r%   r   r   r   r   rl   r   )r"   r   r   r   r   rl   r   r   rs   r#   r$   r%   _  s   !
zOnTheFlyFeatures.__init__r&   ru   c              
      s|  t  t| j| jd| j|d\} | jD ]}tt|D ]
}||| ||< qq| jrDt	 fdd D s8J | j
j| d jd}n6g }t D ]/\}}||  }z| j
| | j}	W n   td|j d|   |t|	 qJt|td	}
tjd
d |D tjd}|
|f}| jrdd |D }tjdd |D tjd}t|dd	}|||f }| jr| f }|S )a  
        Reads the audio samples from recordings on disk/other storage
        and computes their features.
        The returned shape is ``(B, T, F) => (batch_size, num_frames, num_features)``.

        :param recording_field: when specified, we will try to load recordings from a custom field with this name
            (i.e., ``cut.load_<recording_field>()`` instead of default ``cut.load_audio()``).
        :return: a tuple of objcets: ``(feats, feat_lens, [audios, audio_lens], [cuts])``.
            Tensors ``audios`` and ``audio_lens`` are returned when ``return_audio=True``.
            CutSet ``cuts`` is returned when ``fault_tolerant=True``.
        r:   )r;   suppress_errorsru   c                 3   s     | ]}|j  d  j kV  qdS )r   NrJ   )rC   rT   r&   r#   r$   rE     s    z,OnTheFlyFeatures.__call__.<locals>.<genexpr>r   r   z4Error while extracting the features for cut with ID z -- details:
)padding_valuec                 S      g | ]}|j d  qS r   shape)rC   fr#   r#   r$   rN         z-OnTheFlyFeatures.__call__.<locals>.<listcomp>rV   c                 S   s   g | ]}| d qS r   )squeezerC   ar#   r#   r$   rN     r   c                 S   r   r   r   r   r#   r#   r$   rN     r   )r   r<   r   r!   rl   r   rangelenr   allr   extract_batchrJ   r^   numpyextractloggingerroridappendr3   
from_numpyr   r   r_   int64r   r   )r"   r&   ru   audiostfnmidxfeatures_singlerD   samplesfeaturesfeatures_batchfeature_lensout
audio_lensr#   r   r$   r+     sN   



zOnTheFlyFeatures.__call__c                    s\   t  fdd|D  \}}dd t|D }tj|tjdtj|tjdtj|tjddS )r=   c                 3   s.    | ]}|j D ]}t| jj|jV  qqd S r    )rH   r   r   rI   rJ   rK   r"   r#   r$   rE     s    z9OnTheFlyFeatures.supervision_intervals.<locals>.<genexpr>c                 S   rQ   r#   rR   rv   r#   r#   r$   rN     rU   z:OnTheFlyFeatures.supervision_intervals.<locals>.<listcomp>rV   rX   ry   )r"   r&   ra   rb   rY   r#   r   r$   r,     s   
	z&OnTheFlyFeatures.supervision_intervalsrd   c                    s   t  fdd|D S )r|   c                    s   g | ]}t | jjd qS ))rI   rd   )r   r   rI   rB   r"   rd   r#   r$   rN     s    z6OnTheFlyFeatures.supervision_masks.<locals>.<listcomp>r~   r   r#   r   r$   r-     s
   z"OnTheFlyFeatures.supervision_masksr    )r.   r/   r0   r1   r   r   r   r   r3   r4   r2   r   r
   r   r%   r   r   r6   r   r	   r+   r   r,   r-   r   r#   r#   rs   r$   r   L  sX    	) 
D"r      )maxsizemax_workersr   r   c                 C   s   | dkrdS || dS )aA  
    This function caches a thread/process pool in the global state of a given process.
    It's useful for keeping a process pool alive across different invocations within the
    same process for efficiency.
    We intend it to be used for efficient data reads withing a task executed in a
    parent process pool.
    r   N)r   r#   )r   r   r#   r#   r$   r<      s   
r<   ),r   concurrent.futuresr   r   	functoolsr   typingr   r   r   r   r	   r
   r   r   r3   torch.nn.functionalnn
functionalFlhotser   r   
lhotse.cutr   lhotse.dataset.collationr   r   r   r   r   lhotse.utilsr   r   r   r   r   r   r   r7   rk   r   r2   r<   r#   r#   r#   r$   <module>   s2    (Ij| 5