o
    æS™iŒZ  ã                   @   sN  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
mZmZmZmZ ddlZddlmZmZ ddlmZ ddlmZmZmZ 								d)d
ededee dededededededefdd„ZG dd„ dƒZG dd„ dƒZ					d*deeee f dedededed efd!d"„Zdefd#d$„Z d+d%d&„Z!G d'd(„ d(ƒZ"dS ),u$  
High-level architecture of the Lhotse+WebDataset solution.
Read the documentation of the items below to understand each component better.

â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”
â”‚â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”â”‚
â”‚â”‚                            Training loop                             â”‚â”‚
â”‚â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜â”‚
â”‚                                    â”‚                                   â”‚
â”‚                                    â–¼                                   â”‚
â”‚                     â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”                    â”‚
â”‚                     â”‚ torch.utils.data.DataLoader â”‚                    â”‚
â”‚                     â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜        Main processâ”‚
â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”¬â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜
       â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”¼â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”
       â–¼       â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â–¼â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”       â–¼
  â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”  â”‚                â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”   Sub-process #i â”‚  â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”
  â”‚Worker #1â”‚  â”‚                â”‚Worker #iâ”‚                  â”‚  â”‚Worker #Nâ”‚
  â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜  â”‚                â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜                  â”‚  â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜
               â”‚                     â”‚                       â”‚
               â”‚                     â–¼                       â”‚
               â”‚        â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”           â”‚
               â”‚        â”‚ IterableDatasetWrapper â”‚           â”‚
               â”‚        â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜           â”‚
               â”‚                     â”‚                       â”‚
               â”‚           â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”´â”€â”€â”€â”€â”€â”€â”                â”‚
               â”‚           â–¼                â–¼                â”‚
               â”‚  â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â” â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”          â”‚
               â”‚  â”‚Map-style Datasetâ”‚ â”‚  Sampler  â”‚          â”‚
               â”‚  â”‚ (task-specific) â”‚ â”‚           â”‚          â”‚
               â”‚  â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜ â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜          â”‚
               â”‚                            â”‚                â”‚
               â”‚                            â–¼                â”‚
               â”‚                      â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”          â”‚
               â”‚                      â”‚  CutSet   â”‚          â”‚
               â”‚                      â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜          â”‚
               â”‚                            â”‚                â”‚
               â”‚                            â–¼                â”‚
               â”‚               â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”    â”‚
               â”‚               â”‚Lazy WebDataset Iteratorâ”‚    â”‚
               â”‚               â”‚(discards shard_idx % N)â”‚    â”‚
               â”‚               â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜    â”‚
               â”‚                            â”‚                â”‚
               â”‚                â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”¼â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”    â”‚
               â”‚                â–¼           â–¼           â–¼    â”‚
               â”‚           â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”  â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”  â”Œâ”€â”€â”€â”€â”€â”€â”€â”€â”â”‚
               â”‚           â”‚Shard #1â”‚  â”‚Shard #jâ”‚  â”‚Shard #Mâ”‚â”‚
               â”‚           â””â”€â”€â”€â”€â”€â”€â”€â”€â”˜  â””â”€â”€â”€â”€â”€â”€â”€â”€â”˜  â””â”€â”€â”€â”€â”€â”€â”€â”€â”˜â”‚
               â””â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”˜
é    N)Úpartial)ÚCallableÚDictÚ	GeneratorÚIterableÚListÚOptionalÚSequenceÚUnion)ÚCutSetÚMonoCut©ÚLazyIteratorChain)ÚPathlikeÚis_module_availableÚsuppress_and_warnTÚflacÚcutsÚoutput_pathÚ
shard_sizeÚverboseÚaudio_formatÚ
load_audioÚload_featuresÚload_customÚfault_tolerantÚreturnc	                 C   sÖ   t |||||||d}	d}
d}|	9 tjjd| d }| D ]}|
d7 }
|	 |¡}|t|ƒ7 }| ¡  qW d  ƒ n1 s>w   Y  W d  ƒ n1 sMw   Y  |	j}t d|› d|
› d	|› d
|
| › d	¡ |S )að
  
    Saves the CutSet metadata along with audio/features data into a WebDataset archive.
    The audio and feature data is read, decoded, and encoded into ``audio_format`` for audio,
    lilcom for features and arrays with floating point type, and pickle for all other dtypes.
    The intended use of this function is to speed up the I/O in training data pipelines by
    converting random access reads to sequential access reads.

    Supported values for ``audio_format`` are the same as for the ``format`` argument in
    ``torchaudio.save`` function with ``sox_io`` backend.

    If ``shard_size`` is specified, we will leverage WebDataset's ``ShardWriter`` to
    create multiple tarballs with ``shard_size`` items per shard. In that mode, we expect
    that ``output_path`` contains a pattern like "/path/to/shard-%06d.tar", which will
    be internally expanded with the shard index.

    Returns number of written shards if sharding is enabled, otherwise 0.

    .. note: By default, we'll skip cuts which failed to load for any reason and proceed
        with exporting. To raise an exception and stop, set ``fault_tolerant=False``.

    **Examples**

    Export cuts with audio, features, and all custom data to a single tarball,
    converting audio to FLACs::

        >>> cuts = CutSet.from_jsonl_lazy("data/cuts-train.jsonl")
        >>> n_shards = export_to_webdataset(
        ...     cuts=cuts,
        ...     output_path="data/cuts-train.tar",
        ...     audio_format="flac",
        ... )

    Export cuts with audio, features, and all custom data to a directory with shards
    counting 10000 cuts each, converting audio to SPHERE (sph)::

        >>> cuts = CutSet.from_jsonl_lazy("data/cuts-train.jsonl")
        >>> n_shards = export_to_webdataset(
        ...     cuts=cuts,
        ...     output_path="data/cuts-train-wds/shard-%06d.tar",
        ...     shard_size=10000,
        ...     audio_format="sph",
        ... )

    The same, but export cuts with only the features being read into memory
    (recording and custom data still refers to external storage)::

        >>> cuts = CutSet.from_jsonl_lazy("data/cuts-train.jsonl")
        >>> n_shards = export_to_webdataset(
        ...     cuts=cuts,
        ...     output_path="data/cuts-train-wds/shard-%06d.tar",
        ...     shard_size=10000,
        ...     load_audio=False,
        ...     load_custom=False,
        ... )

    Export cuts to sharded tarballs stored in the cloud
    (in this example AWS S3, using AWS CLI)::

        >>> cuts = CutSet.from_jsonl_lazy("data/cuts-train.jsonl")
        >>> n_shards = export_to_webdataset(
        ...     cuts=cuts,
        ...     output_path="pipe:aws s3 cp - s3://my-bucket/data/shard-%06d.tar",
        ...     shard_size=10000,
        ... )
    )Úpath_or_urlr   r   r   r   r   r   r   zCreating WebDataset tarball(s))ÚdescÚdisableé   Nz	Exported z cuts out of z total into z shards (there were z cuts with errors).)	ÚWebdatasetWriterÚtqdmÚautoÚwriteÚintÚupdateÚnum_shards_writtenÚloggingÚinfo)r   r   r   r   r   r   r   r   r   ÚwriterÚtotalÚokÚpbarÚcutÚsuccessr'   © r0   úM/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/webdataset.pyÚexport_to_webdataset@   s>   Mù
ÿ

üý€ 	ÿÿr2   c                   @   sŽ   e Zd ZdZ						ddedee deded	ed
ededdfdd„Z	ddd„Z
ddd„Zddd„Zdedefdd„Zdee fdd„ZdS )r!   a=  
    Saves the CutSet metadata along with audio/features data into a WebDataset archive.
    The audio and feature data is read, decoded, and encoded into ``audio_format`` for audio,
    lilcom for features and arrays with floating point type, and pickle for all other dtypes.
    The intended use of this function is to speed up the I/O in training data pipelines by
    converting random access reads to sequential access reads.

    Supported values for ``audio_format`` are the same as for the ``format`` argument in
    ``torchaudio.save`` function with ``sox_io`` backend.

    If ``shard_size`` is specified, we will leverage WebDataset's ``ShardWriter`` to
    create multiple tarballs with ``shard_size`` items per shard. In that mode, we expect
    that ``output_path`` contains a pattern like "/path/to/shard-%06d.tar", which will
    be internally expanded with the shard index.

    Returns number of written shards if sharding is enabled, otherwise 0.

    .. note: By default, we'll skip cuts which failed to load for any reason and proceed
        with exporting. To raise an exception and stop, set ``fault_tolerant=False``.

    **Example**

    Export cuts with audio, features, and all custom data to a tarball shards with 500
    cuts each::

        >>> cuts = CutSet.from_jsonl_lazy("data/cuts-train.jsonl")
        >>> with WebdatasetWriter("data/tars/shard-%06d.tar", shard_size=500) as writer:
        ...     for cut in cuts:
        ...         writer.write(cut)
        >>> output_paths = writer.output_manifest_paths()

    See also: :func`.export_to_webdataset`
    Nr   Tr   r   r   r   r   r   r   r   c           	      C   s˜   t dƒstdƒ‚ddlm} || _|| _|| _|| _|| _|| _	|| _
| jd ur:| jdks/J ‚tt| j| jd| _nt|| jƒ| _d | _d | _d | _d S )NÚ
webdatasetú&Please 'pip install webdataset' first.r   ©Ú	TarWriter)Úmaxcount)r   ÚImportErrorr3   r6   r   r   r   r   r   r   r   r   ÚShardWriterÚwriter_init_fnr*   r'   Úfinished)	Úselfr   r   r   r   r   r   r   r6   r0   r0   r1   Ú__init__Ï   s&   



ÿ
zWebdatasetWriter.__init__c                 C   s   |   ¡ | _d| _| S )NF)r:   r*   r;   ©r<   r0   r0   r1   Ú	__enter__ó   s   
zWebdatasetWriter.__enter__c                 O   s   |   ¡  d S ©N©Úclose)r<   ÚargsÚkwargsr0   r0   r1   Ú__exit__ø   s   zWebdatasetWriter.__exit__c                 C   s*   t | jtƒr| jj| _| j ¡  d| _d S )NT)Ú
isinstancer*   r9   Úshardr'   rB   r;   r>   r0   r0   r1   rB   û   s   


zWebdatasetWriter.closeÚmanifestc                 C   sr   t t| jd( |j| j| j| j| jd}t 	| 
¡ ¡}| j |j|dœ¡ 	 W d  ƒ dS 1 s2w   Y  dS )zÒ
        Converts a Cut to a dict, pickles it, and then stores into a tarfile.

        :param manifest: the manifest to be written.
        :return: bool indicating whether the writing was successful.
        )Úenabled)r   r   r   r   )Ú__key__ÚdataNTF)r   Ú	Exceptionr   Úmove_to_memoryr   r   r   r   ÚpickleÚdumpsÚto_dictr*   r$   Úid)r<   rH   r.   rK   r0   r0   r1   r$     s   ü ÷zWebdatasetWriter.writec                    sJ   ˆ j du r	tdƒ‚ˆ j stdƒ‚ˆ jdu rˆ jgS ‡ fdd„tˆ jƒD ƒS )a  
        Return the a list of paths/urls where the data was written.
        The list can be used directly to initialize :class:`.LazyWebdatasetIterator`
        or :meth:`lhotse.cut.CutSet.from_webdataset`.
        Useful when writing into shards with a specified pattern.
        Nz(The writer has not written anything yet.zWThe writer was not closed -- call writer.close() first, or use it as a context manager.c                    s   g | ]}ˆ j | ‘qS r0   )r   )Ú.0Úir>   r0   r1   Ú
<listcomp>$  s    z:WebdatasetWriter.output_manifest_paths.<locals>.<listcomp>)r;   Ú
ValueErrorr'   r   Úranger>   r0   r>   r1   Úoutput_manifest_paths  s   
ÿ
z&WebdatasetWriter.output_manifest_paths)Nr   TTTT)r   r!   ©r   N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r%   ÚstrÚboolr=   r?   rE   rB   r   r$   r   rW   r0   r0   r0   r1   r!   ¬   s<    %øþýüûúùø	
÷
$

r!   c                   @   s¨   e Zd ZdZdeeee f ddfdd„Zdeddfdd	„Z	dd
d„Z
defdd„Zdeddfdd„Zddd„Zdd„ Zdd„ Zdefdd„Zdd„ Zdefdd„ZdS )ÚLazyWebdatasetIteratoraF  
    LazyWebdatasetIterator provides the ability to read Lhotse objects from a
    WebDataset tarball on-the-fly, without reading its full contents into memory.

    This class is designed to be a partial "drop-in" replacement for ordinary dicts
    to support lazy loading of RecordingSet, SupervisionSet and CutSet.
    Since it does not support random access reads, some methods of these classes
    might not work properly.

    The behaviour of the underlying ``WebDataset`` instance can be customized by
    providing its kwargs directly to the constructor of this class.
    Úsourcer   Nc                 K   s    t dƒstdƒ‚|| _|| _d S ©Nr3   r4   )r   r8   r`   Ú
wds_kwargs)r<   r`   rb   r0   r0   r1   r=   5  s   
zLazyWebdatasetIterator.__init__Úepochc                 C   s   || j d< d S )Nrc   )rb   )r<   rc   r0   r0   r1   Ú	set_epoch>  ó   z LazyWebdatasetIterator.set_epochc                 C   s6   t dƒstdƒ‚t| jfi | j¤Ž| _t| jƒ| _d S ra   )r   r8   Úmini_webdatasetr`   rb   Ú_dsÚiterÚ_ds_iterr>   r0   r0   r1   Ú_resetA  s   zLazyWebdatasetIterator._resetc                 C   s   | j | jdœ}|S )zò
        Store the state for pickling -- we'll only store the path + kwargs, and re-initialize
        this iterator when unpickled. This is necessary to transfer this object across processes
        for PyTorch's DataLoader workers.
        ©r`   rb   rk   ©r<   Ústater0   r0   r1   Ú__getstate__H  s   z#LazyWebdatasetIterator.__getstate__rm   c                 C   s   | j  |¡ dS )z!Restore the state when unpickled.N)Ú__dict__r&   rl   r0   r0   r1   Ú__setstate__Q  s   z#LazyWebdatasetIterator.__setstate__c                 C   s   |   ¡  | S r@   )rj   r>   r0   r0   r1   Ú__iter__U  s   zLazyWebdatasetIterator.__iter__c                 C   s:   ddl m} t| jƒ}t |d ¡}||ƒ}|d |_|S )Nr   )Údeserialize_itemrK   Ú__url__)Úlhotse.serializationrr   Únextri   rN   ÚloadsÚshard_origin)r<   rr   Ú	data_dictrK   Úitemr0   r0   r1   Ú__next__Y  s   

zLazyWebdatasetIterator.__next__c                 c   s    | E d H  d S r@   r0   r>   r0   r0   r1   Úvaluesb  s   €zLazyWebdatasetIterator.valuesc                 C   ó   dd„ | D ƒS )Nc                 s   s    | ]}|j V  qd S r@   ©rQ   ©rR   ry   r0   r0   r1   Ú	<genexpr>f  s   € z.LazyWebdatasetIterator.keys.<locals>.<genexpr>r0   r>   r0   r0   r1   Úkeyse  re   zLazyWebdatasetIterator.keysc                 C   r|   )Nc                 s   s    | ]}|j |fV  qd S r@   r}   r~   r0   r0   r1   r   i  s   € z/LazyWebdatasetIterator.items.<locals>.<genexpr>r0   r>   r0   r0   r1   Úitemsh  re   zLazyWebdatasetIterator.itemsc                 C   s
   t | |ƒS r@   r   )r<   Úotherr0   r0   r1   Ú__add__k  s   
zLazyWebdatasetIterator.__add__rX   )r   r_   )rY   rZ   r[   r\   r
   r   r	   r=   r%   rd   rj   Údictrn   r   rp   rq   rz   r{   r]   r€   r   r   rƒ   r0   r0   r0   r1   r_   '  s"    ÿ
þ	
	
	r_   FÚurlsrc   Úshuffle_shardsÚsplit_by_workerÚsplit_by_nodeÚignore_error_shardsc                 C   s¦   t dƒstdƒ‚ddlm}m}m} ddlm}	 ddlm}
 ddlm}m	} ||| dƒ}|r4| 
|	¡ |r;| 
|
¡ |rE| 
t|d	¡ | 
||rL|n|d
¡ |S )a#  
    Return a pipeline for WebDataset-style data files.

    This is a convenience function for constructing a partial pipeline
    that reads from a set of sharded tar files, extracts the individual
    files, and groups them together into samples (dictionaries).

    You can use all the methods from `Composable` (`then`, `compose`) and
    from `Shorthands` (`batched`, `unbatched`, `decode`, `shuffle`, etc.)
    on the result.

    .. note: This is a reduced version of ``webdataset.WebDataset`` function,
        that only uses the functionalities relevant to Lhotse, and makes it
        possible to disable the node/worker splitting.

    :param urls: the source URLs: a string or a list.
    :param epoch: epoch number (used only when ``shuffle_shards`` is enabled).
    :param shuffle_shards: shuffle the shards if True.
        Only takes effect when ``urls`` is a list of shard paths/urls.
    :param split_by_worker: DEPRECATED: always acts as if True.
        If True, shards are split per DataLoader worker subprocesses,
        otherwise each dataloader worker will yield the same data.
        Only takes effect when ``urls`` is a list of shard paths/urls.
    :param split_by_node: if True, shards are split per node in DDP training,
        otherwise on each node we'll yield the same data.
        Only takes effect when ``urls`` is a list of shard paths/urls.
    :param ignore_error_shards: when ``True``, we tell WebDataset to ignore shards that
        failed during loading and emit a warning. When ``False``, we won't catch the exceptions.
    r3   r4   r   )ÚDataPipelineÚSimpleShardListÚreraise_exception)rˆ   )r‡   )Útarfile_to_samplesÚwarn_and_continue)r…   )rc   )Úhandler)r   r8   r3   rŠ   r‹   rŒ   rˆ   r‡   r   rŽ   ÚappendÚcreate_shard_shuffler)r…   rc   r†   r‡   rˆ   r‰   rŠ   r‹   rŒ   Úsplit_by_node_Úsplit_by_worker_r   rŽ   Úwdsr0   r0   r1   rf   o  s&   %


ÿÿrf   c                 C   s&   ddl m} G dd„ d|ƒ}|| dS )Nr   )ÚPipelineStagec                   @   s   e Zd Zddd„Zdd„ ZdS )	z-create_shard_shuffler.<locals>.detshuffle_allr   éÿÿÿÿc                 S   s   || _ || _d S r@   )Úseed_Úepoch_)r<   r—   r˜   r0   r0   r1   r=   ¯  s   
z6create_shard_shuffler.<locals>.detshuffle_all.__init__c                 S   sB   |  j d7  _ t ¡ }| t| j| j fƒ¡ t|ƒ}| |¡ |S )Nr    )r˜   ÚrandomÚRandomÚseedÚhashr—   ÚlistÚshuffle)r<   ÚsrcÚrngr   r0   r0   r1   Úrun³  s   
z1create_shard_shuffler.<locals>.detshuffle_all.runN)r   r–   )rY   rZ   r[   r=   r¡   r0   r0   r0   r1   Údetshuffle_all®  s    
r¢   )r˜   )r3   r•   )rc   r•   r¢   r0   r0   r1   r‘   «  s   
r‘   c                 c   s    | E dH  dS )zn
    Helper fn that works normally with single-node training, but duplicates data in multi-node training.
    Nr0   )rŸ   Úgroupr0   r0   r1   Ú/_single_node_or_multi_node_with_duplicated_data¾  s   €r¤   c                   @   sl   e Zd ZdZ				ddededed	ee d
ef
dd„Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ ZdS )r9   aª  
    Like ``webdataset.TarWriter`` but splits into multiple shards.

    Note: this implementation is copied from webdataset and adapted to
    allow shard writing using the "pipe:" notation. E.g., this is possible::

        >>> writer = ShardWriter("pipe:gzip -c > data/shard-%06d.tar.gz")

    Source:
    https://github.com/webdataset/webdataset/blob/ccfe88086cdb21a0dc23a6454ce3e3723b6b8033/webdataset/writer.py#L359
    é † ç   ÀZæANr   Úpatternr7   ÚmaxsizeÚpostÚstart_shardc                 K   sv   t dƒstdƒ‚d| _|| _|| _|| _|| _d| _|| _|| _	| j	dks)J dƒ‚d| _
d| _d| _d| _|  ¡  dS )a  Create a ShardWriter.

        :param pattern: output file pattern
        :param maxcount: maximum number of records per shard (Default value = 100000)
        :param maxsize: maximum size of each shard (Default value = 3e9)
        :param kw: other options passed to TarWriter
        r3   r4   r    Nú-z3Dash '-' is not an allowed pattern for ShardWriter.r   )r   r8   r   Úkwr7   r¨   r©   Ú	tarstreamrG   r§   r+   ÚcountÚsizeÚfnameÚnext_stream)r<   r§   r7   r¨   r©   rª   r¬   r0   r0   r1   r=   Ò  s$   ÿzShardWriter.__init__c                 C   s|   ddl m} |  ¡  | j| j | _| jr$td| j| jd| j	d  | j
ƒ |  jd7  _|| jfi | j¤Ž| _d| _d| _	dS )z.Close the current stream and move to the next.r   r5   z	# writingz%.1f GBg    eÍÍAr    N)Úwebdataset.writerr6   Úfinishr§   rG   r°   r   Úprintr®   r¯   r+   r¬   r­   )r<   r6   r0   r0   r1   r±   ÷  s   û
zShardWriter.next_streamc                 C   sd   | j du s| j| jks| j| jkr|  ¡  | j  |¡}|  jd7  _|  jd7  _|  j|7  _dS )zBWrite a sample.

        :param obj: sample to be written
        Nr    )r­   r®   r7   r¯   r¨   r±   r$   r+   )r<   Úobjr¯   r0   r0   r1   r$   
  s   
zShardWriter.writec                 C   sF   | j dur!| j  ¡  | jdusJ ‚t| jƒr|  | j¡ d| _ dS dS )z'Finish all writing (use close instead).N)r­   rB   r°   Úcallabler©   r>   r0   r0   r1   r³     s   



ûzShardWriter.finishc                 C   s   |   ¡  | `| `| `| `dS )zClose the stream.N)r³   r­   rG   r®   r¯   r>   r0   r0   r1   rB   #  s
   zShardWriter.closec                 C   s   | S )zEnter context.r0   r>   r0   r0   r1   r?   +  s   zShardWriter.__enter__c                 O   s   |   ¡  dS )zExit context.NrA   )r<   rC   r¬   r0   r0   r1   rE   /  s   zShardWriter.__exit__)r¥   r¦   Nr   )rY   rZ   r[   r\   r]   r%   Úfloatr   r   r=   r±   r$   r³   rB   r?   rE   r0   r0   r0   r1   r9   Å  s.    úþýüû
ú%	r9   )NTr   TTTT)r   FTFTr@   )#r\   r(   rN   r™   Ú	functoolsr   Útypingr   r   r   r   r   r   r	   r
   r"   Úlhotser   r   Úlhotse.lazyr   Úlhotse.utilsr   r   r   r%   r^   r]   r2   r!   r_   rf   r‘   r¤   r9   r0   r0   r0   r1   Ú<module>   sz    2(÷ÿþýüûúùø	÷

öl{Júÿþýüû
ú<
