o
    SiZA                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZmZ G d	d
 d
eZG dd dZ G dd de	Z!dededefddZ"dS )    N)deque)
AnyCallableDict	GeneratorIterableListLiteralOptionalTupleUnion)CutSetSeconds)Cut)resolve_seed)
CutSamplerEpochDiagnosticsSamplingConstraintSamplingDiagnosticsTimeConstraint)ifnonestreaming_shufflec                       s<  e Zd ZdZdddddddddddddded	ee d
ee dee de	de	de	dedee dee dee de
eed f ddf fddZdeeef f fddZdeeef ddf fddZdd Zd)dd Zde
eee f fd!d"Zedee fd#d$Zedee fd%d&Zedee fd'd(Z  ZS )*DynamicCutSamplera  
    A dynamic (streaming) variant of sampler that doesn't stratify the sampled cuts in any way.
    It is a generalization of :class:`~lhotse.dataset.sampling.SimpleCutSampler` and
    :class:`~lhotse.dataset.sampling.CutPairsSampler` in that it allows to jointly iterate
    an arbitrary number of CutSets.

    When input CutSets are opened in lazy mode, this sampler doesn't require reading
    the whole cut set into memory.

    For scenarios such as ASR, VAD, Speaker ID, or TTS training, this class supports single CutSet
    iteration. Example::

        >>> cuts = CutSet(...)
        >>> sampler = DynamicCutSampler(cuts, max_duration=100)
        >>> for batch in sampler:
        ...     assert isinstance(batch, CutSet)

    For other scenarios that require pairs (or triplets, etc.) of utterances, this class supports
    zipping multiple CutSets together. Such scenarios could be voice conversion, speech translation,
    contrastive self-supervised training, etc. Example::

        >>> source_cuts = CutSet(...)
        >>> target_cuts = CutSet(...)
        >>> sampler = DynamicCutSampler(source_cuts, target_cuts, max_duration=100)
        >>> for batch in sampler:
        ...     assert isinstance(batch, tuple)
        ...     assert len(batch) == 2
        ...     assert isinstance(batch[0], CutSet)
        ...     assert isinstance(batch[1], CutSet)

    .. note:: for cut pairs, triplets, etc. the user is responsible for ensuring that the CutSets
        are all sorted so that when iterated over sequentially, the items are matched.
        We take care of preserving the right ordering internally, e.g., when shuffling.
        By default, we check that the cut IDs are matching, but that can be disabled.

    .. caution:: when using :meth:`DynamicCutSampler.filter` to filter some cuts with more than
        one CutSet to sample from, we sample one cut from every CutSet, and expect that all of the cuts
        satisfy the predicate -- otherwise, they are all discarded from being sampled.
    NFTi N  r   )max_durationmax_cuts
constraintshuffle	drop_lastconsistent_idsshuffle_buffer_sizequadratic_duration
world_sizerankseedstrictcutsr   r   r   r   r   r   r   r    r!   r"   r#   )trng
randomizedreturnc                   s~   t  j||	|
|d tdd |D std || _|| _|| _|| _|| _	|| _
|| _|| _|dur=tjdtd dS dS )aU
  
        :param cuts: one or more CutSets (when more than one, will yield tuples of CutSets as mini-batches)
        :param max_duration: The maximum total recording duration from ``cuts``.
            Note: with multiple CutSets, ``max_duration`` constraint applies only to the first CutSet.
        :param max_cuts: The maximum total number of ``cuts`` per batch.
            When only ``max_duration`` is specified, this sampler yields static batch sizes.
        :param constraint: Provide a :class:`~lhotse.dataset.sampling.base.SamplingConstraint` object
            defining how the sampler decides when a mini-batch is complete. It also affects which
            attribute of the input examples decides the "size" of the example (by default it's ``.duration``).
            Before this parameter was introduced, Lhotse samplers used
            :class:`~lhotse.dataset.sampling.base.TimeConstraint` implicitly.
            Introduced in Lhotse v1.22.0.
        :param shuffle: When ``True``, the cuts will be shuffled dynamically with
            a reservoir-sampling-based algorithm.
            Convenient when mini-batch loop is inside an outer epoch-level loop, e.g.:
            `for epoch in range(10): for batch in dataset: ...` as every epoch will see a
            different cuts order.
        :param drop_last: When ``True``, we will drop all incomplete batches.
            A batch is considered incomplete if it depleted a bucket before
            hitting the constraint such as max_duration, max_cuts, etc.
        :param consistent_ids: Only affects processing of multiple CutSets.
            When ``True``, at each sampling step we check cuts from all CutSets have the same ID
            (i.e., the first cut from every CutSet should have the same ID, same for the second, third, etc.).
        :param shuffle_buffer_size: How many cuts (or cut pairs, triplets) are being held in memory
            a buffer used for streaming shuffling. Larger number means better randomness at the cost
            of higher memory usage.
        :param quadratic_duration: When set, it adds an extra penalty that's quadratic in size w.r.t.
            a cuts duration. This helps get a more even GPU utilization across different input lengths
            when models have quadratic input complexity. Set between 15 and 40 for transformers.
        :param world_size: Total number of distributed nodes. We will try to infer it by default.
        :param rank: Index of distributed node. We will try to infer it by default.
        :param seed: Random seed used to consistently shuffle the dataset across different processes.
        )r   r!   r"   r#   c                 s   s     | ]}t |tr|jV  qd S N)
isinstancer   is_lazy.0cs r/   S/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/sampling/dynamic.py	<genexpr>{   s    z-DynamicCutSampler.__init__.<locals>.<genexpr>zYou are using DynamicCutSampler with an eagerly read CutSet. You won't see any memory/speed benefits with that setup. Use e.g. 'CutSet.from_jsonl_lazy' to read the CutSet lazily.Nz|In Lhotse v1.4 all samplers act as if 'strict=True'. Sampler's argument 'strict' will be removed in a future Lhotse release.)category)super__init__allwarningswarnr%   r   r   r   r   r   r   r    DeprecationWarning)selfr   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   	__class__r/   r0   r4   G   s*   1
zDynamicCutSampler.__init__c                    s@   | j d u s	J dt  }|| j| j| j| j| jd |S )NzIstate_dict() is not supported with samplers that use a custom constraint.)r   r   r   r   r    )	r   r3   
state_dictupdater   r   r   r   r    r9   sdr:   r/   r0   r<      s   
	zDynamicCutSampler.state_dictr?   c                    s`   | d| _| d| _| d| _| d| _| d| _| dd  t | |   d S )Nr   r   r   r   r    r$   )	popr   r   r   r   r    r3   load_state_dict_fast_forwardr>   r:   r/   r0   rA      s   z!DynamicCutSampler.load_state_dictc                 C   s\   | j j}| j jj}| | t|d| j j|< d| _t|  t	|D ]}t
|  q"d| _d S )N)epochFT)diagnosticscurrent_epochcurrent_epoch_statstotal_batches	set_epochr   stats_per_epoch_just_restored_stateiterrangenext)r9   rE   num_batches_to_iter_r/   r/   r0   rB      s   



zDynamicCutSampler._fast_forwardc              	      s   j rS j  tj dd jD _jr' fddjD _tt	j fddjd_t
jjjjjjjd_tj_S )Nc                 S   s   g | ]}t |qS r/   )rK   r,   r/   r/   r0   
<listcomp>   s    z.DynamicCutSampler.__iter__.<locals>.<listcomp>c                    s(   g | ]}t |t j jd qS ))rngbufsize)r   randomRandomrC   r   r,   r#   r9   r/   r0   rP      s    c                    s   t  fdd| D S )Nc                 3   s    | ]}  |V  qd S r)   )
_filter_fnr-   cr9   r/   r0   r1          z?DynamicCutSampler.__iter__.<locals>.<lambda>.<locals>.<genexpr>)r5   )tplrY   r/   r0   <lambda>       z,DynamicCutSampler.__iter__.<locals>.<lambda>)iterator	predicaterD   )r   r   r   r   r    rD   )rJ   rD   reset_current_epochr   r#   r%   	cuts_iterr   FilterzipDurationBatcherr   r   r   r   r    rK   rY   r/   rU   r0   __iter__   s2   


	zDynamicCutSampler.__iter__c                    st   t | j}| jr8t|tr8t| D ]&}|d j t fdd|dd  D s7J dddd |D  dq|S )	Nr   c                 3   s    | ]}|j  kV  qd S r)   idrW   expected_idr/   r0   r1      rZ   z0DynamicCutSampler._next_batch.<locals>.<genexpr>   zhThe input CutSet are not sorted by cut ID in the same way. We sampled the following mismatched cut IDs: z, c                 s   s    | ]}|j V  qd S r)   rf   rW   r/   r/   r0   r1      s    z[. If this is expected, pass the argument 'consistent_ids=False' to DynamicBucketingSampler.)	rM   ra   r   r*   tuplerc   rg   r5   join)r9   batchr%   r/   rh   r0   _next_batch   s   

 zDynamicCutSampler._next_batchc                 C      d S r)   r/   rY   r/   r/   r0   remaining_duration      z$DynamicCutSampler.remaining_durationc                 C   ro   r)   r/   rY   r/   r/   r0   remaining_cuts   rq   z DynamicCutSampler.remaining_cutsc                 C   ro   r)   r/   rY   r/   r/   r0   num_cuts   rq   zDynamicCutSampler.num_cuts)r(   r   )__name__
__module____qualname____doc__r   r
   r   intr   boolr   r	   r4   r   strr   r<   rA   rB   re   r   r   rn   propertyfloatrp   rr   rs   __classcell__r/   r/   r:   r0   r      sh    +	
J

*r   c                   @   s   e Zd Z						ddeeeee f  dedee	 dee
 dedee d	ee d
dfddZd
eeeee f ddf fddZd
eeee f fddZdS )rd   NFdatapiper   r   r   r   r    rD   r(   c                 C   sT   || _ t | _|| _t|t | _t||| |d ur || _d S t	|||d| _d S )N)r   r   r    )
r~   r   reuse_cuts_bufferr   r   r   rD   check_constraintr   r   )r9   r~   r   r   r   r   r    rD   r/   r/   r0   r4     s   

zDurationBatcher.__init__c                 c   s:    t | j| _z	 |  V  q	 ty   Y nw d | _d S r)   )rK   r~   ra   _collect_batchStopIterationrY   r/   r/   r0   re     s   

zDurationBatcher.__iter__c                 C   s  dt tttt f  dtttt f fdd}| j  g }	 zt| j}W n5 t	y[   |r=| j
r7| j r=|| Y S z
| j| W t	  tyZ   | j|d  Y t	 w w || | jt|trm|d n| | j r| j rt|dkrtd 	 ||S q)	Nr%   r(   c                 S   s^   t | d tr*t| d dkrtdd | D } | S tt|  }tdd |D S t| S )zJHelper to do the right thing whether we sampled single cuts or cut tuples.r   rj   c                 s   s    | ]}|d  V  qdS )r   Nr/   r,   r/   r/   r0   r1   )  s    zDDurationBatcher._collect_batch.<locals>.detuplify.<locals>.<genexpr>c                 S   s   g | ]}t |qS r/   )r   	from_cutsr,   r/   r/   r0   rP   -  r]   zEDurationBatcher._collect_batch.<locals>.detuplify.<locals>.<listcomp>)r*   rk   lenr   r   listrc   )r%   tuple_of_cut_listsr/   r/   r0   	detuplify#  s   
z1DurationBatcher._collect_batch.<locals>.detuplifyTr   rj   zWe have exceeded the max_duration constraint during sampling but have only 1 cut. This is likely because max_duration was set to a very low value ~10s, or you're using a CutSet with very long cuts (e.g. 100s of seconds long).)r   r   r   r   r   r   resetrM   ra   r   r   close_to_exceedingrD   discardAttributeErrorappendaddr*   rk   exceededr   r6   r7   )r9   r   r%   next_cut_or_tplr/   r/   r0   r   "  sN   




zDurationBatcher._collect_batch)NNNFNN)rt   ru   rv   r   r   r   r   r   r
   rx   r   ry   r   r4   r   r   re   r   r/   r/   r/   r0   rd      s4    	
$	rd   c                	   @   sJ   e Zd ZdZ	ddedeegef dee	 ddfddZ
defd	d
ZdS )rb   aH  
    A wrapper over an iterable that enables lazy filtering.
    It works like Python's `filter` built-in by applying the filter predicate
    to each element and yielding it further if predicate returned ``True``.

    This variant additionally tracks the number of discarded items and updates
    the sampling statistics.
    Nr^   r_   rD   r(   c                 C   s8   || _ || _t|t | _t| jsJ d| dd S )Nz2LazyFilter: 'predicate' arg must be callable (got z).)r^   r_   r   r   rD   callable)r9   r^   r_   rD   r/   r/   r0   r4   j  s   
zFilter.__init__c                 c   s0    | j D ]}| |r|V  q| j| qd S r)   )r^   r_   rD   r   )r9   itemr/   r/   r0   re   x  s   

zFilter.__iter__r)   )rt   ru   rv   rw   r   r   r   ry   r
   r   r4   re   r/   r/   r/   r0   rb   `  s    
rb   r   r   r   c                 C   sD   | d ur|d u r|d u sJ dd S |d us|d us J dd S d S )Nz;Cannot specify both constraint= and max_duration=/max_cuts=zVAt least one of max_duration= or max_cuts= has to be defined (or provide constraint=).r/   )r   r   r   r/   r/   r0   r     s   r   )#rS   r6   collectionsr   typingr   r   r   r   r   r   r	   r
   r   r   lhotser   r   
lhotse.cutr   lhotse.dataset.dataloadingr   lhotse.dataset.sampling.baser   r   r   r   r   lhotse.utilsr   r   r   rd   rb   r   r/   r/   r/   r0   <module>   s    0 c` 