o
    SijA                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZm Z m!Z!m"Z"m#Z#m$Z$ G dd deeZ%dS )    N)ProcessPoolExecutor)partial)islice)Path)CallableDictIterableListMappingOptionalUnion)tqdm)Channels	Recording)AlgorithmMixin)Serializable)PathlikeSecondsexactly_one_not_nullifnonesplit_manifest_lazysplit_sequencec                   @   s  e Zd ZdZd^deee  ddfddZdd defdd	Z	e
deeeef ee f fd
dZe
dee fddZedee dd fddZeZe				d_dedededee deeegef  dee fddZedee dd fddZdee fddZ	d`ded ed!eded  fd"d#Z	$dad%ed&ed'eded  fd(d)Z	dbd*ee d+ee dd fd,d-Z		.	dcded/ee d0e d1ee  de!j"f
d2d3Z#dedd fd4d5Z$dedefd6d7Z%dedefd8d9Z&dedefd:d;Z'dede(fd<d=Z)ddd?e d@edd fdAdBZ*ddd?e d@edd fdCdDZ+ddd?e d@edd fdEdFZ,dd>dd>dGgddfdHed  dIedJed@edKee dLee dMee dd fdNdOZ-dPedd fdQdRZ.defdSdTZ/dUeeef defdVdWZ0deeef defdXdYZ1dee fdZd[Z2defd\d]Z3dS )eRecordingSeta
  
    :class:`~lhotse.audio.RecordingSet` represents a collection of recordings.
    It does not contain any annotation such as the transcript or the speaker identity --
    just the information needed to retrieve a recording such as its path, URL, number of channels,
    and some recording metadata (duration, number of samples).

    It also supports (de)serialization to/from YAML/JSON/etc. and takes care of mapping between
    rich Python classes and YAML/JSON/etc. primitives during conversion.

    When coming from Kaldi, think of it as ``wav.scp`` on steroids: :class:`~lhotse.audio.RecordingSet`
    also has the information from *reco2dur* and *reco2num_samples*,
    is able to represent multi-channel recordings and read a specified subset of channels,
    and support reading audio files directly, via a unix pipe, or downloading them on-the-fly from a URL
    (HTTPS/S3/Azure/GCP/etc.).

    Examples:

        :class:`~lhotse.audio.RecordingSet` can be created from an iterable of :class:`~lhotse.audio.Recording` objects::

            >>> from lhotse import RecordingSet
            >>> audio_paths = ['123-5678.wav', ...]
            >>> recs = RecordingSet.from_recordings(Recording.from_file(p) for p in audio_paths)

        As well as from a directory, which will be scanned recursively for files with parallel processing::

            >>> recs2 = RecordingSet.from_dir('/data/audio', pattern='*.flac', num_jobs=4)

        It behaves similarly to a ``dict``::

            >>> '123-5678' in recs
            True
            >>> recording = recs['123-5678']
            >>> for recording in recs:
            >>>    pass
            >>> len(recs)
            127

        It also provides some utilities for I/O::

            >>> recs.to_file('recordings.jsonl')
            >>> recs.to_file('recordings.json.gz')  # auto-compression
            >>> recs2 = RecordingSet.from_file('recordings.jsonl')

        Manipulation::

            >>> longer_than_5s = recs.filter(lambda r: r.duration > 5)
            >>> first_100 = recs.subset(first=100)
            >>> split_into_4 = recs.split(num_splits=4)
            >>> shuffled = recs.shuffle()

        And lazy data augmentation/transformation, that requires to adjust some information
        in the manifest (e.g., ``num_samples`` or ``duration``).
        Note that in the following examples, the audio is untouched -- the operations are stored in the manifest,
        and executed upon reading the audio::

            >>> recs_sp = recs.perturb_speed(factor=1.1)
            >>> recs_vp = recs.perturb_volume(factor=2.)
            >>> recs_rvb = recs.reverb_rir(rir_recs)
            >>> recs_24k = recs.resample(24000)
    N
recordingsreturnc                 C   s   t |i | _d S N)r   r   )selfr    r   N/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/audio/recording_set.py__init__X      zRecordingSet.__init__otherc                 C   s   | j |j kS r   r   r   r!   r   r   r   __eq__[   s   zRecordingSet.__eq__c                 C   s   | j S )z&Alias property for ``self.recordings``r"   r   r   r   r   data^   s   zRecordingSet.datac                 C      dd | D S )Nc                 s   s    | ]}|j V  qd S r   id.0rr   r   r   	<genexpr>e   s    z#RecordingSet.ids.<locals>.<genexpr>r   r%   r   r   r   idsc   s   zRecordingSet.idsc                 C   s   t t| S r   )r   listr"   r   r   r   from_recordingsg   s   zRecordingSet.from_recordings   pathpatternnum_jobsforce_opus_sampling_raterecording_idexclude_patternc           
         s   d| d}t tj||d}t| |} dur't  t fdd|}|dkr7t	t
t|||dS t|}	t	t
|	|||dW  d   S 1 sSw   Y  dS )	a  
        Recursively scan a directory ``path`` for audio files that match the given ``pattern`` and create
        a :class:`.RecordingSet` manifest for them.
        Suitable to use when each physical file represents a separate recording session.

        .. caution::
            If a recording session consists of multiple files (e.g. one per channel),
            it is advisable to create each :class:`.Recording` object manually, with each
            file represented as a separate :class:`.AudioSource` object, and then
            a :class:`RecordingSet` that contains all the recordings.

        :param path: Path to a directory of audio of files (possibly with sub-directories).
        :param pattern: A bash-like pattern specifying allowed filenames, e.g. ``*.wav`` or ``session1-*.flac``.
        :param num_jobs: The number of parallel workers for reading audio files to get their metadata.
        :param force_opus_sampling_rate: when specified, this value will be used as the sampling rate
            instead of the one we read from the manifest. This is useful for OPUS files that always
            have 48kHz rate and need to be resampled to the real one -- we will perform that operation
            "under-the-hood". For non-OPUS files this input does nothing.
        :param recording_id: A function which takes the audio file path and returns the recording ID. If not
            specified, the filename will be used as the recording ID.
        :param exclude_pattern: optional regex string for identifying file name patterns to exclude.
            There has to be a full regex match to trigger exclusion.
        :return: a new ``Recording`` instance pointing to the audio file.
        zScanning audio files ())r5   r6   Nc                    s     | jd u S r   )matchname)pr7   r   r   <lambda>   s    z'RecordingSet.from_dir.<locals>.<lambda>r1   )desc)r   r   	from_filer   rglobrecompilefilterr   r0   r   mapr   )
r2   r3   r4   r5   r6   r7   msgfile_read_workeritexr   r<   r   from_dirm   s,   !


$zRecordingSet.from_dirr&   c                 C   s   t dd | D S )Nc                 s   s    | ]}t |V  qd S r   )r   	from_dict)r+   raw_recr   r   r   r-      s    

z*RecordingSet.from_dicts.<locals>.<genexpr>r   r0   )r&   r   r   r   
from_dicts   s   
zRecordingSet.from_dictsc                 C   r'   )Nc                 s   s    | ]}|  V  qd S r   )to_dictr*   r   r   r   r-      s    z(RecordingSet.to_dicts.<locals>.<genexpr>r   r%   r   r   r   to_dicts   s   zRecordingSet.to_dictsF
num_splitsshuffle	drop_lastc                 C   s   dd t | |||dD S )aZ  
        Split the :class:`~lhotse.RecordingSet` into ``num_splits`` pieces of equal size.

        :param num_splits: Requested number of splits.
        :param shuffle: Optionally shuffle the recordings order first.
        :param drop_last: determines how to handle splitting when ``len(seq)`` is not divisible
            by ``num_splits``. When ``False`` (default), the splits might have unequal lengths.
            When ``True``, it may discard the last element in some splits to ensure they are
            equally long.
        :return: A list of :class:`~lhotse.RecordingSet` pieces.
        c                 S   s   g | ]}t |qS r   rL   )r+   subsetr   r   r   
<listcomp>   s    z&RecordingSet.split.<locals>.<listcomp>)rP   rQ   rR   )r   )r   rP   rQ   rR   r   r   r   split   s
   zRecordingSet.split 
output_dir
chunk_sizeprefixc                 C   s   t | |||dS )a  
        Splits a manifest (either lazily or eagerly opened) into chunks, each
        with ``chunk_size`` items (except for the last one, typically).

        In order to be memory efficient, this implementation saves each chunk
        to disk in a ``.jsonl.gz`` format as the input manifest is sampled.

        .. note:: For lowest memory usage, use ``load_manifest_lazy`` to open the
            input manifest for this method.

        :param output_dir: directory where the split manifests are saved.
            Each manifest is saved at: ``{output_dir}/{prefix}.{split_idx}.jsonl.gz``
        :param chunk_size: the number of items in each chunk.
        :param prefix: the prefix of each manifest.
        :return: a list of lazily opened chunk manifests.
        )rW   rX   rY   )r   )r   rW   rX   rY   r   r   r   
split_lazy   s   zRecordingSet.split_lazyfirstlastc                 C   s   t ||s	J d|dur|dksJ tt| |}|S |dur>|dks'J |t| kr/| S tt| t| | t| S dS )ai  
        Return a new ``RecordingSet`` according to the selected subset criterion.
        Only a single argument to ``subset`` is supported at this time.

        :param first: int, the number of first recordings to keep.
        :param last: int, the number of last recordings to keep.
        :return: a new ``RecordingSet`` with the subset results.
        z*subset() can handle only one non-None arg.Nr   )r   r   
from_itemsr   lenr0   )r   r[   r\   outr   r   r   rS      s"   zRecordingSet.subset        channelsoffset_secondsduration_secondsc                 C   s   | | j |||dS )N)ra   offsetduration)
load_audio)r   r6   ra   rb   rc   r   r   r   rf      s   zRecordingSet.load_audioc                       t  fdd| D S )Nc                 3       | ]}|  V  qd S r   )with_path_prefixr*   r2   r   r   r-         z0RecordingSet.with_path_prefix.<locals>.<genexpr>rL   )r   r2   r   rj   r   ri     s   zRecordingSet.with_path_prefixc                 C   
   | | j S r   )num_channelsr   r6   r   r   r   rm        
zRecordingSet.num_channelsc                 C   rl   r   sampling_ratern   r   r   r   rq   
  ro   zRecordingSet.sampling_ratec                 C   rl   r   )num_samplesrn   r   r   r   rr     ro   zRecordingSet.num_samplesc                 C   rl   r   )re   rn   r   r   r   re     ro   zRecordingSet.durationTfactoraffix_idc                       t  fdd| D S )a  
        Return a new ``RecordingSet`` that will lazily perturb the speed while loading audio.
        The ``num_samples`` and ``duration`` fields are updated to reflect the
        shrinking/extending effect of speed.

        :param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
        :param affix_id: When true, we will modify the ``Recording.id`` field
            by affixing it with "_sp{factor}".
        :return: a ``RecordingSet`` containing the perturbed ``Recording`` objects.
        c                 3       | ]
}|j  d V  qdS )rs   rt   N)perturb_speedr*   rt   rs   r   r   r-         
z-RecordingSet.perturb_speed.<locals>.<genexpr>rL   r   rs   rt   r   ry   r   rx        zRecordingSet.perturb_speedc                    ru   )a  
        Return a new ``RecordingSet`` that will lazily perturb the tempo while loading audio.
        The ``num_samples`` and ``duration`` fields are updated to reflect the
        shrinking/extending effect of tempo.

        :param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
        :param affix_id: When true, we will modify the ``Recording.id`` field
            by affixing it with "_sp{factor}".
        :return: a ``RecordingSet`` containing the perturbed ``Recording`` objects.
        c                 3   rv   rw   )perturb_tempor*   ry   r   r   r-   -  rz   z-RecordingSet.perturb_tempo.<locals>.<genexpr>rL   r{   r   ry   r   r}   "  r|   zRecordingSet.perturb_tempoc                    ru   )a  
        Return a new ``RecordingSet`` that will lazily perturb the volume while loading audio.

        :param factor: The volume scale to be applied (e.g. factor=1.1 means 1.1x louder).
        :param affix_id: When true, we will modify the ``Recording.id`` field
            by affixing it with "_sp{factor}".
        :return: a ``RecordingSet`` containing the perturbed ``Recording`` objects.
        c                 3   rv   rw   )perturb_volumer*   ry   r   r   r-   :  rz   z.RecordingSet.perturb_volume.<locals>.<genexpr>rL   r{   r   ry   r   r~   1  s   	zRecordingSet.perturb_volumer   rir_recordingsnormalize_output
early_onlyrir_channelsroom_rng_seedsource_rng_seedc              	      s,   t t fdd| D S )a  
        Return a new ``RecordingSet`` that will lazily apply reverberation based on provided
        impulse responses while loading audio. If no ``rir_recordings`` are provided, we will
        generate a set of impulse responses using a fast random generator (https://arxiv.org/abs/2208.04101).

        :param rir_recordings: The impulse responses to be used.
        :param normalize_output: When true, output will be normalized to have energy as input.
        :param early_only: When true, only the early reflections (first 50 ms) will be used.
        :param affix_id: When true, we will modify the ``Recording.id`` field
            by affixing it with "_rvb".
        :param rir_channels: The channels to be used for the RIRs (if multi-channel). Uses first
            channel by default. If no RIR is provided, we will generate one with as many channels
            as this argument specifies.
        :param room_rng_seed: The seed to be used for the room configuration.
        :param source_rng_seed: The seed to be used for the source positions.
        :return: a ``RecordingSet`` containing the perturbed ``Recording`` objects.
        c              
   3   s6    | ]}|j rtnd  dV  qd S )N)rir_recordingr   r   rt   r   r   r   )
reverb_rirrandomchoicer*   rt   r   r   r   r   r   r   r   r   r-   Z  s    

z*RecordingSet.reverb_rir.<locals>.<genexpr>)r/   r   r0   )r   r   r   r   rt   r   r   r   r   r   r   r   >  s   
zRecordingSet.reverb_rirrq   c                    rg   )z
        Apply resampling to all recordings in the ``RecordingSet`` and return a new ``RecordingSet``.
        :param sampling_rate: The new sampling rate.
        :return: a new ``RecordingSet`` with lazily resampled ``Recording`` objects.
        c                 3   rh   r   )resampler*   rp   r   r   r-   m  rk   z(RecordingSet.resample.<locals>.<genexpr>rL   )r   rq   r   rp   r   r   g  s   zRecordingSet.resamplec                 C   s   dt |  dS )NzRecordingSet(len=r8   )r^   r%   r   r   r   __repr__o  r    zRecordingSet.__repr__index_or_idc                    sZ   z| j   W S  ty,   | jrt fddt| D  Y S t fdd| D  Y S w )Nc                 3   s     | ]\}}| kr|V  qd S r   r   )r+   idxitemr   r   r   r-   x  s    z+RecordingSet.__getitem__.<locals>.<genexpr>c                 3   s    | ]
}|j  kr|V  qd S r   r(   r+   r   r   r   r   r-   {  s    )r   	TypeErroris_lazynext	enumerate)r   r   r   r   r   __getitem__r  s   zRecordingSet.__getitem__c                    s6   t  trt fdd| D S t fdd| D S )Nc                 3   s    | ]} |j kV  qd S r   r(   r   r!   r   r   r-     rk   z,RecordingSet.__contains__.<locals>.<genexpr>c                 3   s    | ]	} j |j kV  qd S r   r(   r   r   r   r   r-     s    )
isinstancestranyr#   r   r   r   __contains__}  s   
zRecordingSet.__contains__c                 c   s    | j E d H  d S r   r"   r%   r   r   r   __iter__  s   zRecordingSet.__iter__c                 C   s
   t | jS r   )r^   r   r%   r   r   r   __len__  ro   zRecordingSet.__len__r   )r1   NNN)FF)rV   )NN)Nr`   N)T)4__name__
__module____qualname____doc__r   r   r   r   boolr$   propertyr   r   r   r&   r.   staticmethodr0   r]   r   intr   r   rI   dictrM   rO   r	   rU   rZ   rS   r   floatnpndarrayrf   ri   rm   rq   rr   r   re   rx   r}   r~   r   r   r   r   r   r   r   r   r   r   r   r      s    =$:



	
)r   )&loggingr   rA   concurrent.futuresr   	functoolsr   	itertoolsr   pathlibr   typingr   r   r   r	   r
   r   r   numpyr   tqdm.asyncior   lhotse.audio.recordingr   r   lhotse.lazyr   lhotse.serializationr   lhotse.utilsr   r   r   r   r   r   r   r   r   r   r   <module>   s    $ 
