o
    2wi2                  	   @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZ d dlmZ d d	lmZmZmZ G d
d deZdede deee	e! ef ddf fddZ"dS )    NPath)	CallableDict	GeneratorListLiteralOptionalSequenceTupleUnion)Cut)resolve_seed)DillableLazyIteratorChainLazyJsonlIteratorLazyManifestIteratorcount_newlines_fast)extension_contains)TarIterator)Pathlikeexactly_one_not_nullifnonec                   @   s   e Zd ZdZ							d"deeeee f  dee de	d	e	d
e	de
eed ed f deeeegef   ddfddZd#deeeee f  fddZdefddZdedefddZdedefddZdd ZdefddZd$d d!ZdS )%LazySharIteratora  
    LazySharIterator reads cuts and their corresponding data from multiple shards,
    also recognized as the Lhotse Shar format.
    Each shard is numbered and represented as a collection of one text manifest and
    one or more binary tarfiles.
    Each tarfile contains a single type of data, e.g., recordings, features, or custom fields.

    Given an example directory named ``some_dir``, its expected layout is
    ``some_dir/cuts.000000.jsonl.gz``, ``some_dir/recording.000000.tar``,
    ``some_dir/features.000000.tar``, and then the same names but numbered with ``000001``, etc.
    There may also be other files if the cuts have custom data attached to them.

    The main idea behind Lhotse Shar format is to optimize dataloading with sequential reads,
    while keeping the data composition more flexible than e.g. WebDataset tar archives do.
    To achieve this, Lhotse Shar keeps each data type in a separate archive, along a single
    CutSet JSONL manifest.
    This way, the metadata can be investigated without iterating through the binary data.
    The format also allows iteration over a subset of fields, or extension of existing data
    with new fields.

    As you iterate over cuts from ``LazySharIterator``, it keeps a file handle open for the
    JSONL manifest and all of the tar files that correspond to the current shard.
    The tar files are read item by item together, and their binary data is attached to
    the cuts.
    It can be normally accessed using methods such as ``cut.load_audio()``.

    We can simply load a directory created by :class:`~lhotse.shar.writers.shar.SharWriter`.
    Example::

        >>> cuts = LazySharIterator(in_dir="some_dir")
        ... for cut in cuts:
        ...     print("Cut", cut.id, "has duration of", cut.duration)
        ...     audio = cut.load_audio()
        ...     fbank = cut.load_features()

    :class:`.LazySharIterator` can also be initialized from a dict, where the keys
    indicate fields to be read, and the values point to actual shard locations.
    This is useful when only a subset of data is needed, or it is stored in different
    directories. Example::

        >>> cuts = LazySharIterator({
        ...     "cuts": ["some_dir/cuts.000000.jsonl.gz"],
        ...     "recording": ["another_dir/recording.000000.tar"],
        ...     "features": ["yet_another_dir/features.000000.tar"],
        ... })
        ... for cut in cuts:
        ...     print("Cut", cut.id, "has duration of", cut.duration)
        ...     audio = cut.load_audio()
        ...     fbank = cut.load_features()

    We also support providing shell commands as shard sources, inspired by WebDataset.
    Example::

        >>> cuts = LazySharIterator({
        ...     "cuts": ["pipe:curl https://my.page/cuts.000000.jsonl.gz"],
        ...     "recording": ["pipe:curl https://my.page/recording.000000.tar"],
        ... })
        ... for cut in cuts:
        ...     print("Cut", cut.id, "has duration of", cut.duration)
        ...     audio = cut.load_audio()

    Finally, we allow specifying URLs or cloud storage URIs for the shard sources.
    We defer to ``smart_open`` library to handle those.
    Example::

        >>> cuts = LazySharIterator({
        ...     "cuts": ["s3://my-bucket/cuts.000000.jsonl.gz"],
        ...     "recording": ["s3://my-bucket/recording.000000.tar"],
        ... })
        ... for cut in cuts:
        ...     print("Cut", cut.id, "has duration of", cut.duration)
        ...     audio = cut.load_audio()

    :param fields: a dict whose keys specify which fields to load,
        and values are lists of shards (either paths or shell commands).
        The field "cuts" pointing to CutSet shards always has to be present.
    :param in_dir: path to a directory created with ``SharWriter`` with
        all the shards in a single place. Can be used instead of ``fields``.
    :param split_for_dataloading: bool, by default ``False`` which does nothing.
        Setting it to ``True`` is intended for PyTorch training with multiple
        dataloader workers and possibly multiple DDP nodes.
        It results in each node+worker combination receiving a unique subset
        of shards from which to read data to avoid data duplication.
        This is mutually exclusive with ``seed='randomized'``.
    :param shuffle_shards: bool, by default ``False``. When ``True``, the shards
        are shuffled (in case of multi-node training, the shuffling is the same
        on each node given the same seed).
    :param seed: When ``shuffle_shards`` is ``True``, we use this number to
        seed the RNG.
        Seed can be set to ``'randomized'`` in which case we expect that the user provided
        :func:`lhotse.dataset.dataloading.worker_init_fn` as DataLoader's ``worker_init_fn``
        argument. It will cause the iterator to shuffle shards differently on each node
        and dataloading worker in PyTorch training. This is mutually exclusive with
        ``split_for_dataloading=True``.
        Seed can be set to ``'trng'`` which, like ``'randomized'``, shuffles the shards
        differently on each iteration, but is not possible to control (and is not reproducible).
        ``trng`` mode is mostly useful when the user has limited control over the training loop
        and may not be able to guarantee internal Shar epoch is being incremented, but needs
        randomness on each iteration (e.g. useful with PyTorch Lightning).
    :param stateful_shuffle: bool, by default ``False``. When ``True``, every
        time this object is fully iterated, it increments an internal epoch counter
        and triggers shard reshuffling with RNG seeded by ``seed`` + ``epoch``.
        Doesn't have any effect when ``shuffle_shards`` is ``False``.
    :param cut_map_fns: optional sequence of callables that accept cuts and return cuts.
        It's expected to have the same length as the number of shards, so each function
        corresponds to a specific shard.
        It can be used to attach shard-specific custom attributes to cuts.

    See also: :class:`~lhotse.shar.writers.shar.SharWriter`
    NFT*   fieldsin_dirsplit_for_dataloadingshuffle_shardsstateful_shuffleseed
randomizedtrngcut_map_fnsreturnc           	         s   t ||s	J d|r|dksJ d| _| _| _| _d _d  _|d ur/ | n | t	 j
d  _ jD ]%}t	 j
|  jksdJ d j d| dt	 j
|  d	 j
|  q? fd
dt jD  _t|d g j  _d S )NzITo read Lhotse Shar format, provide either 'in_dir' or 'fields' argument.r!   zyError: seed='randomized' and split_for_dataloading=True are mutually exclusive options as they would result in data loss.r   cutsz	Expected z shards available for field 'z' but found z: c                    s"   g | ]  fd dj D qS )c                    s   i | ]
}| j |  qS  streams).0field)self	shard_idxr&   U/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/lhotse/shar/readers/lazy.py
<dictcomp>   s    z8LazySharIterator.__init__.<locals>.<listcomp>.<dictcomp>r'   )r)   r+   )r,   r-   
<listcomp>   s    z-LazySharIterator.__init__.<locals>.<listcomp>)r   r   r   r   r    epoch_len_init_from_dir_init_from_inputslenr(   
num_shardsr   rangeshardsr   r#   )	r+   r   r   r   r   r   r    r#   r*   r&   r/   r-   __init__   s6   



,
zLazySharIterator.__init__c                 C   s4   d|v sJ dt | | _| jd || _d S )Nr%   zOTo initialize Shar reader, please provide the value for key 'cuts' in 'fields'.)setkeysr   remover(   )r+   r   r&   r&   r-   r4      s   

z"LazySharIterator._init_from_inputsc                    s   t || _t| jd}tdd |D | _d| jv sJ | jd dtdd |D i| _| jD ] t fdd|D | j < q3d S )N*c                 s   s     | ]}|j d d V  qdS .r   N)stemsplitr)   pr&   r&   r-   	<genexpr>   s    z2LazySharIterator._init_from_dir.<locals>.<genexpr>r%   c                 s   s2    | ]}|j d d dkrtd|r|V  qdS )r?   r   r%   z.jsonlN)namerA   r   rB   r&   r&   r-   rD      s    c                 3   s(    | ]}|j d d  kr|V  qdS r>   )rE   rA   rB   r*   r&   r-   rD      s    )	r   r   listglobr:   r   r<   sortedr(   )r+   r   	all_pathsr&   rF   r-   r3      s   


zLazySharIterator._init_from_dirr8   c                 C   s&   ddl m}m} | jr|||S |S )N   )split_by_nodesplit_by_worker)utilsrL   rM   r   )r+   r8   rL   rM   r&   r&   r-   _maybe_split_for_dataloading   s   z-LazySharIterator._maybe_split_for_dataloadingc                 C   s<   | j r| }t| j}| jr|| j7 }t|| |S N)	r   copyr   r    r   r1   randomRandomshuffle)r+   r8   r    r&   r&   r-   _maybe_shuffle_shards   s   

z&LazySharIterator._maybe_shuffle_shardsc              
   c   s@   | j | j}}| |}| |}|d ur | |}| |}t||D ]q\}}t|d }dd | D }dd | D }t|g| R  D ]J^}}	t| |	D ]*\}
\}}|d u raqVt	|j
|j |jkszJ d|j d| d|
 t||
| qV|d |_| j|_|d ur||}|V  qKq%|  jd7  _d S )	Nr%   c                 S   s   i | ]\}}|d kr||qS )r%   r&   r)   r*   pathr&   r&   r-   r.      s    z-LazySharIterator.__iter__.<locals>.<dictcomp>c                 S   s4   i | ]\}}|t d |rt|ntt||dqS )z.tarrF   )r   r   _jsonl_tar_adaptorr   rV   r&   r&   r-   r.      s    
zMismatched IDs: cut ID is 'z' but found data with name 'z' fsor field rK   )r8   r#   rU   rO   zipr   itemsvaluesr;   strparentr@   idsetattrshard_originr1   
shar_epoch)r+   r8   map_fnsshard
cut_map_fnr%   field_pathsfield_iterscut
field_datar*   maybe_manifest	data_pathr&   r&   r-   __iter__   sB   




zLazySharIterator.__iter__c                 C   s*   | j d u rtdd | jd D | _ | j S )Nc                 s   s    | ]}t |V  qd S rP   )r   rB   r&   r&   r-   rD     s    z+LazySharIterator.__len__.<locals>.<genexpr>r%   )r2   sumr(   r/   r&   r&   r-   __len__  s   
zLazySharIterator.__len__r   c                 C   s
   t | |S rP   )r   )r+   otherr&   r&   r-   __add__  s   
zLazySharIterator.__add__)NNFFTr   NrP   )r$   r   )__name__
__module____qualname____doc__r	   r   r\   r
   r   boolr   intr   r   r   r9   r4   r3   r   rO   rU   rk   rm   ro   r&   r&   r&   r-   r      sB    q	
 ,/r   
jsonl_iterr*   r$   c                 c   sB    | D ]}t |d  d}||vrd}n|| }||fV  qdS )z_
    Used to adapt the iteration output of LazyJsonlIterator to mimic that of TarIterator.
    cut_idz.dummyNr   )rv   r*   itempseudo_pathr&   r&   r-   rX   !  s   rX   )#rR   pathlibr   typingr   r   r   r   r   r	   r
   r   r   
lhotse.cutr   lhotse.dataset.dataloadingr   lhotse.lazyr   r   r   r   r   lhotse.serializationr   lhotse.shar.readers.tarr   lhotse.utilsr   r   r   r   r\   dictrX   r&   r&   r&   r-   <module>   s&    ,  