o
    SiO5                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlm Z  ee e!f Z"G dd dej#j$j%eZ&G dd dZ'dS )    N)Path)	CallableDict	GeneratorIterableListOptionalSequenceTupleUnion)compose_left)CutSetSeconds)deserialize_cut)get_rankget_world_size)SamplingDiagnostics)Dillable)decode_json_line)Pathlikec                       s   e Zd ZdZ					ddeeee ee f dedede	e
 de	e de	e d	ee
 d
e	e
 ddf fddZdeegef dd fddZdefddZdeddfddZdeeddf fddZdefddZ  ZS )StatelessSamplera]  
    An infinite and stateless cut sampler that selects data at random from one or more cut manifests.
    The main idea is to make training resumption easy while guaranteeing the data seen each
    time by the model is shuffled differently.
    It discards the notion of an "epoch" and it never finishes iteration.
    It makes no strong guarantees about avoiding data duplication, but in practice you would
    rarely see duplicated data.

    The recommended way to use this sampler is by placing it into a dataloader worker
    subprocess with Lhotse's :class:``~lhotse.dataset.iterable_dataset.IterableDatasetWrapper``,
    so that each worker has its own sampler replica that uses a slightly different random seed::

        >>> import torch
        >>> import lhotse
        >>> dloader = torch.utils.data.DataLoader(
        ...     lhotse.dataset.iterable_dataset.IterableDatasetWrapper(
        ...         dataset=lhotse.dataset.K2SpeechRecognitionDataset(...),
        ...         sampler=StatelessSampler(...),
        ...     ),
        ...     batch_size=None,
        ...     num_workers=4,
        ... )

    This sampler's design was originally proposed by Dan Povey. For details see:
    https://github.com/lhotse-speech/lhotse/issues/1096

    Example 1: Get a non-bucketing :class:``.StatelessSampler``::

        >>> sampler = StatelessSampler(
        ...     cuts_paths=["data/cuts_a.jsonl", "data/cuts_b.jsonl"],
        ...     index_path="data/files.idx",
        ...     max_duration=600.0,
        ... )

    Example 2: Get a bucketing :class:``.StatelessSampler``::

        >>> sampler = StatelessSampler(
        ...     cuts_paths=["data/cuts_a.jsonl", "data/cuts_b.jsonl"],
        ...     index_path="data/files.idx",
        ...     max_duration=600.0,
        ...     num_buckets=50,
        ...     quadratic_duration=30.0,
        ... )

    Example 3: Get a bucketing :class:``.StatelessSampler`` with scaled weights for each cutset::

        >>> sampler = StatelessSampler(
        ...     cuts_paths=[
        ...         ("data/cuts_a.jsonl", 2.0),
        ...         ("data/cuts_b.jsonl", 1.0),
        ...     ],
        ...     index_path="data/files.idx",
        ...     max_duration=600.0,
        ...     num_buckets=50,
        ...     quadratic_duration=30.0,
        ... )

    .. note:: This sampler works only with uncompressed jsonl manifests, as it creates extra index files
     with line byte offsets to quickly find and sample JSON lines.
     This means this sampler will not work with Webdataset and Lhotse Shar data format.

    :param cuts_paths: Path, or list of paths, or list of tuples of (path, scale) to cutset files.
    :param index_path: Path to a file that contains the index of all cutsets and their line count
        (will be auto-created the first time this object is initialized).
    :param base_seed: Int, user-provided part of the seed used to initialize the RNG
        for sampling (each node and worker are still going to produce different results).
        When continuing the training it should be a function of the number of training steps
        to ensure the model doesn't see identical mini-batches again.
    :param max_duration: Maximum total number of audio seconds in a mini-batch (dynamic batch size).
    :param max_cuts: Maximum number of examples in a mini-batch (static batch size).
    :param num_buckets: If set, enables bucketing (each mini-batch has examples of a similar duration).
    :param duration_bins: A list of floats (seconds); when provided, we'll skip the initial
        estimation of bucket duration bins (useful to speed-up the launching of experiments).
    :param quadratic_duration: If set, adds a penalty term for longer duration cuts.
        Works well with models that have quadratic time complexity to keep GPU utilization similar
        when using bucketing. Suggested values are between 30 and 45.
    N
cuts_paths
index_path	base_seedmax_durationmax_cutsnum_bucketsduration_binsquadratic_durationreturnc	                    s  t  jd d g | _g | _t|ttfr#| jt| | jd npt|}g | _t|d ttfrR|D ]}	t|	ttfsBJ d| jt|	 | jd q5nA|D ]>}
t	|
dksdJ d|
 d|
\}	}t|	ttfsvJ d|	 t|t
tfsJ d	| | jt|	 | j| qT|| _|| _|| _|| _|| _|| _|| _td
d | j| jfD sJ dt | _t| j| j| _dd t| jj | jD | _g | _t | _t | _ d S )N)data_sourceg      ?r   z4Mixing paths with and without scales is not allowed.   z Expected (path, scale) but got: z< [note: mixing paths with and without scales is not allowed]z$Path must be a string or Path, got: z$Scale must be an int or float, got: c                 s   s    | ]}|d uV  qd S N ).0vr#   r#   U/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/sampling/stateless.py	<genexpr>   s    
z,StatelessSampler.__init__.<locals>.<genexpr>z7At least one of max_duration or max_cuts has to be set.c                 S   s   g | ]\}}|| qS r#   r#   )r$   lcscaler#   r#   r&   
<listcomp>   s    z-StatelessSampler.__init__.<locals>.<listcomp>)!super__init__pathsscales
isinstancer   strappendlistlenintfloatr   r   r   r   r   r   r   anyr   diagnosticsManifestIndexindexzipline_countsvaluesscaled_line_counts_transformsr   rankr   
world_size)selfr   r   r   r   r   r   r   r   ptplr)   	__class__r#   r&   r,   m   sn   


zStatelessSampler.__init__fnc                 C   s   | j | | S )zAApply ``fn`` to each mini-batch of ``CutSet`` before yielding it.)r>   r1   )rA   rF   r#   r#   r&   map   s   zStatelessSampler.mapc                 C   s   i S )zHStub state_dict method that returns nothing - this sampler is stateless.r#   rA   r#   r#   r&   
state_dict      zStatelessSampler.state_dictrI   c                 C   s   dS )zJStub load_state_dict method that does nothing - this sampler is stateless.Nr#   )rA   rI   r#   r#   r&   load_state_dict   rJ   z StatelessSampler.load_state_dictc           	      #   s   ddl m}m} tjj }|d u rdn|j}|dj  }j	| }t
| tdtj d| dj	 d| dj d	| d
  fdd}jd usWjd url|| jjjjddjddd
}n|| jjddddd}|tj  |j_|E d H  d S )Nr   )DynamicBucketingSamplerDynamicCutSampleri  [z$] Initialized sampler RNG with seed z (== base_seed=z	 + my_id=z) [ddp_rank=z worker_id=]c            
      3   s    d} 	   jjd }jj| } t|d }|| ||d  }}| }|| |	|| }W d   n1 sCw   Y  t
|}t|}	|	j d|  |	_|	V  | d7 } q)a	  
            Infinite generator of cuts.
            Each cut is samples in two steps:
            - first we select a cutset file, weighted by line count (num cuts)
            - then we randomly select a line from that file using uniform distribution
            r   T   N_it)choicesr-   r=   r9   line_offsets	randranger3   openseekreadr   r   id)
npathrS   	begin_idxbeginendflinedatacutrngrA   r#   r&   _inner   s"   

z)StatelessSampler.__iter__.<locals>._innerFrP   )	r   r   r   r   shuffle	drop_lastr   r@   r?   )r   r   re   rf   r@   r?   )lhotse.datasetrL   rM   torchutilsr`   get_worker_inforX   r?   r   randomRandomlogginginfotype__name__r   r   r   r   r   rG   r   r>   r7   )	rA   rL   rM   worker_info	worker_idmy_idseedrd   inner_samplerr#   rb   r&   __iter__   sP   

"	zStatelessSampler.__iter__c                 C   s
   | j  S )zJReturns a string describing the statistics of the sampling process so far.)r7   
get_reportrH   r#   r#   r&   rw     s   
zStatelessSampler.get_report)NNNNN)rp   
__module____qualname____doc__r   r   r   PathlikeAndScaler4   r   r   r   r,   r   r   rG   r   rI   rK   r   rv   r0   rw   __classcell__r#   r#   rD   r&   r      s>    S	
GCr   c                	   @   sb   e Zd ZdZ	ddee dededdfdd	Zd
ede	e
 fddZded
ede	e
 fddZdS )r8   a;  
    An index of line count and line offset for each cutset manifest.
    When created for the first time, it writes a .jsonl.idx file for each .jsonl file that contains byte offsets for each line.
    It also writes a file at ``index_path`` that has the line count and path for each manifest.
    When this object is instantiated again (e.g. when resuming training), it will just load the contents of existing files from disk.

    Objects of this class expose two members: ``line_counts: Dict[Path, int]`` and ``line_offsets: Dict[Path, List[int]]`` to simplify working with manifests.

    :param manifest_paths: A list of paths to cut sets.
    :param index_path: A path where we should write the line count index (if it doesn't exist).
    :param force: When true, we'll ignore existing files and reindex the cutsets.
    Fmanifest_pathsr   forcer   Nc           	      C   s   i | _ i | _tt|D ]1}|jdksJ d| |d}| r*|s*| |}n| ||}t	|| j |< || j|< q| rC|rp|
d}| j  D ]\}}t| d| |d qNW d    d S 1 siw   Y  d S d S )Nz.jsonlzIWe only support uncompressed .jsonl files in this sampler, but received: z
.jsonl.idxw file)r;   rS   rG   r   suffixwith_suffixis_file_load_processr3   rU   itemsprint)	rA   r}   r   r~   rB   offset_pathoffsetsindex_fr(   r#   r#   r&   r,     s&   
"zManifestIndex.__init__
file_indexc                 C   s<   |  }ttt|}W d    |S 1 sw   Y  |S r"   )rU   tuplerG   r4   )rA   r   r^   r   r#   r#   r&   r   7  s   

zManifestIndex._loadmanifestc              	   C   s   dg}|  K}| d)}td|d | }|r/||  t|d |d | }|sW d    n1 s9w   Y  W d    t|S W d    t|S 1 sUw   Y  t|S )Nr   r   r   )rU   r   readliner1   tellr   )rA   r   r   r   cuts_fr   r_   r#   r#   r&   r   <  s    (zManifestIndex._process)F)rp   rx   ry   rz   r	   r   boolr,   r   r
   r4   r   r   r#   r#   r#   r&   r8   	  s    
 r8   )(rm   rk   pathlibr   typingr   r   r   r   r   r   r	   r
   r   rh   cytoolzr   lhotser   r   lhotse.cut.setr   lhotse.dataset.dataloadingr   r   lhotse.dataset.sampling.baser   lhotse.lazyr   lhotse.serializationr   lhotse.utilsr   r5   r{   ri   r`   Samplerr   r8   r#   r#   r#   r&   <module>   s"    , l