o
    Si!                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZ d dlmZ d dl m!Z! d d	l"m#Z#m$Z$m%Z%m&Z&m'Z' d d
l(m)Z)m*Z*m+Z+ d dl,m-Z- G dd de#Z.eG dd de%Z/	d$dee de0dee% dee1 fddZ2G dd dZ3G dd dZ4de
de j5de6deeeeedf f ddf fddZ7G d d! d!e8Z9d"d# Z:dS )%    N)asdict	dataclass)isliceQueue)AnyCallableDequeDict	GeneratorIterableListLiteralOptionalSequenceTupleUnion)CutSetSeconds)Cut)resolve_seed)
CutSamplerEpochDiagnosticsSamplingConstraintSamplingDiagnosticsTimeConstraint)DurationBatcherFiltercheck_constraint)ifnonec                %       sd  e Zd ZdZddddddddddddddddddd	d
edee dee dee dee de	de	de	de
e dededee dee dee deeed f de	de	ddf$ fddZdeeef f fdd Zd!eeef ddf fd"d#Zd$d% Zd0d&d'Zdeeee f fd(d)Zedee fd*d+Zedee fd,d-Zedee fd.d/Z  ZS )1DynamicBucketingSampleraB  
    A dynamic (streaming) variant of :class:`~lhotse.dataset.sampling.bucketing.BucketingSampler`,
    that doesn't require reading the whole cut set into memory.

    The basic idea is to sample N (e.g. ~10k) cuts and estimate the boundary durations for buckets.
    Then, we maintain a buffer of M cuts (stored separately in K buckets) and every time we sample a batch,
    we consume the input cut iterable for the same amount of cuts.
    The memory consumption is limited by M at all times.

    For scenarios such as ASR, VAD, Speaker ID, or TTS training, this class supports single CutSet
    iteration. Example::

        >>> cuts = CutSet(...)
        >>> sampler = DynamicBucketingSampler(cuts, max_duration=100)
        >>> for batch in sampler:
        ...     assert isinstance(batch, CutSet)

    For other scenarios that require pairs (or triplets, etc.) of utterances, this class supports
    zipping multiple CutSets together. Such scenarios could be voice conversion, speech translation,
    contrastive self-supervised training, etc. Example::

        >>> source_cuts = CutSet(...)
        >>> target_cuts = CutSet(...)
        >>> sampler = DynamicBucketingSampler(source_cuts, target_cuts, max_duration=100)
        >>> for batch in sampler:
        ...     assert isinstance(batch, tuple)
        ...     assert len(batch) == 2
        ...     assert isinstance(batch[0], CutSet)
        ...     assert isinstance(batch[1], CutSet)

    .. note:: for cut pairs, triplets, etc. the user is responsible for ensuring that the CutSets
        are all sorted so that when iterated over sequentially, the items are matched.
        We take care of preserving the right ordering internally, e.g., when shuffling.
        By default, we check that the cut IDs are matching, but that can be disabled.

    .. caution:: when using :meth:`DynamicBucketingSampler.filter` to filter some cuts with more than
        one CutSet to sample from, we sample one cut from every CutSet, and expect that all of the cuts
        satisfy the predicate -- otherwise, they are all discarded from being sampled.
    N
   FT'  i N  r   )max_durationmax_cuts
constraintnum_bucketsshuffle	drop_lastconsistent_idsduration_binsnum_cuts_for_bins_estimatebuffer_sizequadratic_duration
world_sizerankseedsync_buckets
concurrentstrictshuffle_buffer_sizecutsr#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   )
randomizedtrngr1   r2   returnc                   s*  t  j||||d tdd |D std || _|| _|| _|| _|| _	|| _
|	| _|
| _|| _|| _|| _d| _t||| |durMtjdtd |dur[t  |  j|7  _|durwt|t|kskJ d|| _t|d	 | _dS |du rt| j| j| jd
}tt| jd |	||d| _dS )a-  
        :param cuts: one or more CutSets (when more than one, will yield tuples of CutSets as mini-batches)
        :param max_duration: The maximum total recording duration from ``cuts``.
            Note: with multiple CutSets, ``max_duration`` constraint applies only to the first CutSet.
        :param max_cuts: The maximum total number of ``cuts`` per batch.
            When only ``max_duration`` is specified, this sampler yields static batch sizes.
        :param num_buckets: how many buckets to create. Ignored if duration_bins are provided.
        :param shuffle: When ``True``, the cuts will be shuffled dynamically with
            a reservoir-sampling-based algorithm.
            Convenient when mini-batch loop is inside an outer epoch-level loop, e.g.:
            `for epoch in range(10): for batch in dataset: ...` as every epoch will see a
            different cuts order.
        :param drop_last: When ``True``, we will drop all incomplete batches.
            A batch is considered incomplete if it depleted a bucket before
            hitting the constraint such as max_duration, max_cuts, etc.
        :param consistent_ids: Only affects processing of multiple CutSets.
            When ``True``, at each sampling step we check cuts from all CutSets have the same ID
            (i.e., the first cut from every CutSet should have the same ID, same for the second, third, etc.).
        :param duration_bins: A list of floats (seconds); when provided, we'll skip the initial
            estimation of bucket duration bins (useful to speed-up the launching of experiments).
        :param num_cuts_for_bins_estimate: We will draw this many cuts to estimate the duration bins
            for creating similar-duration buckets.
            Larger number means a better estimate to the data distribution, possibly at a longer init cost.
        :param buffer_size: How many cuts (or cut pairs, triplets) we hold at any time across all
            of the buckets.
            Increasing ``max_duration`` (batch_size) or ``num_buckets`` might require increasing this number.
            Larger number here will also improve shuffling capabilities.
            It will result in larger memory usage.
        :param quadratic_duration: When set, it adds an extra penalty that's quadratic in size w.r.t.
            a cuts duration. This helps get a more even GPU utilization across different input lengths
            when models have quadratic input complexity. Set between 15 and 40 for transformers.
        :param sync_buckets: When set, we'll try to make each DDP rank sample from as close
            duration buckets as possible to minimize the tail worker effect.
        :param concurrent: Enabling concurrency eliminates most of the waiting to pre-populate the
            bucketing buffers before the sampler starts yielding examples. For tarred/Lhotse Shar data
            this can speed up the start of the training. Note that enabling concurrency will cause the
            sampling results to be non-deterministic. This feature is experimental.
        :param world_size: Total number of distributed nodes. We will try to infer it by default.
        :param rank: Index of distributed node. We will try to infer it by default.
        :param seed: Random seed used to consistently shuffle the dataset across different processes.
        )r(   r.   r/   r0   c                 s   s     | ]}t |tr|jV  qd S N)
isinstancer   is_lazy.0cs r?   ]/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/sampling/dynamic_bucketing.py	<genexpr>   s    z3DynamicBucketingSampler.__init__.<locals>.<genexpr>zYou are using DynamicBucketingSampler with an eagerly read CutSet. You won't see any memory/speed benefits with that setup. Either use 'CutSet.from_jsonl_lazy' to read the CutSet lazily, or use a BucketingSampler instead.Nz|In Lhotse v1.4 all samplers act as if 'strict=True'. Sampler's argument 'strict' will be removed in a future Lhotse release.categoryz)Duration bins must be sorted ascendingly.   r#   r$   r-   r   )r&   r%   )super__init__allwarningswarnr5   r#   r$   r%   r'   r)   r+   r,   r-   r1   r2   rngr   DeprecationWarning!_emit_shuffle_buffer_size_warninglistsortedr*   lenr&   r   estimate_duration_bucketsr   )selfr#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   	__class__r?   r@   rG   Q   s^   ?z DynamicBucketingSampler.__init__c              	      sD   | j d u s	J dt  }|| j| j| j| j| j| j	d |S )NzIstate_dict() is not supported with samplers that use a custom constraint.)r#   r$   r)   r,   r+   r-   )
r%   rF   
state_dictupdater#   r$   r)   r,   r+   r-   )rR   sdrS   r?   r@   rU      s   

z"DynamicBucketingSampler.state_dictrW   c                    s   | d| _| d| _| d| _| d| _| d| _d|v r1t  | d}|  j|7  _| dd | _| dd  t 	| | 
  d S )	Nr#   r$   r)   r+   r,   r4   r-   r3   )popr#   r$   r)   r+   r,   rM   r-   rF   load_state_dict_fast_forward)rR   rW   r4   rS   r?   r@   rY      s   
z'DynamicBucketingSampler.load_state_dictc                 C   s\   | j j}| j jj}| | t|d| j j|< d| _t|  t	|D ]}t
|  q"d| _d S )N)epochFT)diagnosticscurrent_epochcurrent_epoch_statstotal_batches	set_epochr   stats_per_epoch_just_restored_stateiterrangenext)rR   r]   num_batches_to_iter_r?   r?   r@   rZ      s   



z%DynamicBucketingSampler._fast_forwardc                    s    j r S t j}t| j  _ jr-d}tj	j
 }|d ur'||j7 }t|}nd } j  dd  jD }tt|  fdd jd}t| j j j j j j j j j j| j jd}t| _ S )Ni  c                 S   s   g | ]}t |qS r?   )rc   r<   r?   r?   r@   
<listcomp>      z4DynamicBucketingSampler.__iter__.<locals>.<listcomp>c                    s   t  fdd| D S )Nc                 3   s    | ]}  |V  qd S r9   )
_filter_fnr=   crR   r?   r@   rA         zEDynamicBucketingSampler.__iter__.<locals>.<lambda>.<locals>.<genexpr>)rH   )tplrm   r?   r@   <lambda>      z2DynamicBucketingSampler.__iter__.<locals>.<lambda>)iterator	predicater\   )r*   r.   r#   r$   r%   r(   r,   r-   r'   rK   
bucket_rngr2   r\   )rb   r   r0   randomRandomr[   rK   r1   torchutilsdataget_worker_infoidr\   reset_current_epochr5   r   zipDynamicBucketerr*   r.   r#   r$   r%   r(   r,   r-   r'   r2   rc   	cuts_iter)rR   r0   bucket_rng_seedworker_infort   r   r?   rm   r@   __iter__   sH   




z DynamicBucketingSampler.__iter__c                    st   t | j}| jr8t|tr8t| D ]&}|d j t fdd|dd  D s7J dddd |D  dq|S )	Nr   c                 3   s    | ]}|j  kV  qd S r9   r{   rk   expected_idr?   r@   rA   /  rn   z6DynamicBucketingSampler._next_batch.<locals>.<genexpr>rD   zhThe input CutSet are not sorted by cut ID in the same way. We sampled the following mismatched cut IDs: z, c                 s   s    | ]}|j V  qd S r9   r   rk   r?   r?   r@   rA   1  s    z[. If this is expected, pass the argument 'consistent_ids=False' to DynamicBucketingSampler.)	re   r   r)   r:   tupler}   r{   rH   join)rR   batchr5   r?   r   r@   _next_batch*  s   

 z#DynamicBucketingSampler._next_batchc                 C      d S r9   r?   rm   r?   r?   r@   remaining_duration6     z*DynamicBucketingSampler.remaining_durationc                 C   r   r9   r?   rm   r?   r?   r@   remaining_cuts:  r   z&DynamicBucketingSampler.remaining_cutsc                 C   r   r9   r?   rm   r?   r?   r@   num_cuts>  r   z DynamicBucketingSampler.num_cuts)r8   r    )__name__
__module____qualname____doc__r   r   r   intr   boolr   r   r   rG   r
   strr   rU   rY   rZ   r   r   r   r   propertyfloatr   r   r   __classcell__r?   r?   rS   r@   r    (   s    +	
t
4r    c                   @   s   e Zd ZU dZee ed< ee ed< dZe	edf ed< dZ
eed< dd	 Zd
efddZded
dfddZd
efddZd
efddZd#ddZded
efddZd
eeef fddZdeeef d
dfddZ		 	
	 d$ddZdd d
efd!d"ZdS )%FixedBucketBatchSizeConstrainta  
    Sampling constraint that accepts a pre-defined batch size for each bucket.
    It uses the example's sequence length to determine which bucket we're sampling for,
    and otherwise the batch size is locally static for each bucket.

    This constraint doesn't support samples longer than the upper bound of the last bucket;
    if such sample is provided, we will raise an exception.
    max_seq_len_bucketsbatch_sizesNcurrent_bucketr   r   c                 C   s   t | jt| jksJ d S r9   )rO   r   rN   rm   r?   r?   r@   __post_init__S  s   z,FixedBucketBatchSizeConstraint.__post_init__r8   c                 C   s   dS NTr?   rm   r?   r?   r@   	is_activeV  s   z(FixedBucketBatchSizeConstraint.is_activeexamplec                 C   s   |  |}| j| j|d}|t| jk s"J d| d| jd  d| jdu r+|| _n| j|ks?J d| d| d	| j d|  jd
7  _dS )z
        Increment the internal counter for the time constraint,
        selecting the right property from the input ``cut`` object.
        )bucketsexample_lenz&Received example with sequence length z) that exceeds the highest allowed length .NzUser error: FixedBucketBatchSizeConstraint is supposed to be used only on one bucket. The example we received has sequence length z9 which is outside of the allowed bounds for bucket index z in buckets rD   )measure_lengthselect_bucketr   rP   r   r   )rR   r   seqlen
bucket_idxr?   r?   r@   addY  s*   

z"FixedBucketBatchSizeConstraint.addc                 C   s   | j | j| j kS )z"Is the constraint exceeded or not.r   r   r   rm   r?   r?   r@   exceededp  s   z'FixedBucketBatchSizeConstraint.exceededc                 C   s   | j | j| j kS )a(  
        Check if the batch is close to satisfying the constraints.
        We define "closeness" as: if we added one more cut that has
        duration/num_frames/num_samples equal to the longest seen cut
        in the current batch, then the batch would have exceeded the constraints.
        r   rm   r?   r?   r@   close_to_exceedingt  s   z1FixedBucketBatchSizeConstraint.close_to_exceedingc                 C   s   d| _ d| _dS )z{
        Reset the internal counter (to be used after a batch was created,
        to start collecting a new one).
        Nr   )r   r   rm   r?   r?   r@   reset}  s   
z$FixedBucketBatchSizeConstraint.resetc                 C   s   |j S r9   )duration)rR   r   r?   r?   r@   r     s   z-FixedBucketBatchSizeConstraint.measure_lengthc                 C   s   t | S r9   )r   rm   r?   r?   r@   rU     s   z)FixedBucketBatchSizeConstraint.state_dictrU   c                 C   sV   | d| _| d| _| d| _| d| _t|dks)J dd|  d S )Nr   r   r   r   r   zNError in FixedBucketBatchSizeConstraint.load_state_dict(): Unexpected keys:
- z
- )rX   r   r   r   r   rP   r   keys)rR   rU   r?   r?   r@   rY     s   z.FixedBucketBatchSizeConstraint.load_state_dictotherc                 C   sz   dD ]+}t | |}t ||}|d u o|d u }|s-||ks-J d| d| d| d| d	qt| j| j| j| j|j dS )N)r   r   r   zXTo add two TimeConstraint objects, they need to represent the same constraint (got self.=z
 != other.).)r   r   r   r   )getattrr   r   r   r   r   )rR   r   key	self_attr
other_attris_noner?   r?   r@   __add__  s,   


z&FixedBucketBatchSizeConstraint.__add__r   c                 C   s.   t |to| j|jko| j|jko| j|jkS r9   )r:   r   r   r   r   )rR   r   r?   r?   r@   __eq__  s   



z%FixedBucketBatchSizeConstraint.__eq__)r8   N)r   r   r8   r   )r   r   r   r   r   r   __annotations__r   r   r   r   r   r   r   r   r   r   r   r   r   r
   r   r   rU   rY   r   r   r?   r?   r?   r@   r   C  s*   
 	
	

r   r5   r&   r%   r8   c                    s   |dksJ  du rt   t fdd| D }|  ||jd ks2J d| d|jd  d| | }g }d	}|D ]}||krK|| d	}||7 }q>|S )
a	  
    Given an iterable of cuts and a desired number of buckets, select duration values
    that should start each bucket.

    The returned list, ``bins``, has ``num_buckets - 1`` elements.
    The first bucket should contain cuts with duration ``0 <= d < bins[0]``;
    the last bucket should contain cuts with duration ``bins[-1] <= d < float("inf")``,
    ``i``-th bucket should contain cuts with duration ``bins[i - 1] <= d < bins[i]``.

    :param cuts: an iterable of :class:`lhotse.cut.Cut`.
    :param num_buckets: desired number of buckets.
    :param constraint: object with ``.measure_length()`` method that's used to determine
        the size of each sample. If ``None``, we'll use ``TimeConstraint``.
    :return: a list of boundary duration values (floats).
    rD   Nc                    s   g | ]}  |qS r?   )r   rk   r%   r?   r@   rh     rq   z-estimate_duration_buckets.<locals>.<listcomp>r   zThe number of buckets (z7) must be smaller than or equal to the number of cuts (r   g        )r   nparraysortshapesumappend)r5   r&   r%   sizessize_per_bucketbinstotsizer?   r   r@   rQ     s&   

rQ   c                   @   sj   e Zd ZdZdejdededdfddZdefd	d
Zde	e
ef fddZde	e
ef ddfddZdS )BucketSelectionStatea  
    Helper class used in the context of bucket selection synchronization across DDP ranks.
    It's only necessary when using a map-style dataset (i.e., the sampler lives in the training loop process)
    and world_size is greater than 1. In these cases we have to use the same bucket idx ``world_size`` times
    to ensure each rank uses the same bucket. This is due to how CutSampler distributes mini-batches
    across ranks, ensuring the number of steps is always equal for each rank.
    rt   r&   r.   r8   Nc                 C   s"   || _ || _|| _d| _d | _d S Nr   )_bucket_rng_num_buckets_world_size_usage_count_bucket_idx)rR   rt   r&   r.   r?   r?   r@   rG     s
   
zBucketSelectionState.__init__c                 C   s@   | j d u s| j| jkr| j| j| _ d| _|  jd7  _| j S )Nr   rD   )r   r   r   r   	randranger   rm   r?   r?   r@   select_bucket_idx  s
   z&BucketSelectionState.select_bucket_idxc                 C   s   | j  | j| jdS )N)r   r   r   )r   getstater   r   rm   r?   r?   r@   save  s   zBucketSelectionState.saveckptc                 C   s(   | j |d  |d | _|d | _d S )Nr   r   r   )r   setstater   r   )rR   r   r?   r?   r@   restore  s   
zBucketSelectionState.restore)r   r   r   r   ru   rv   r   rG   r   r
   r   r   r   r   r?   r?   r?   r@   r     s    
	r   c                   @   s   e Zd Z											d&deeeee f  dee de	de
e de
e	 d	e
e d
ede	de
e dedejdejdede
e ddfddZdeeddf fddZdedefddZdedefddZdd Zdd  Zd!e	ddfd"d#Zd$d% ZdS )'r~   NFr"   r5   r*   r.   r#   r$   r%   r(   r,   r-   r'   rK   rt   r2   r\   r8   c                 C   s*  || _ || _|| _|| _|| _|| _|| _|| _|	| _t	|t
 | _|d u r*t }|| _|| _|
| _|| _|t|ksDJ d| dt||| | jd u rZt| j| j| jd| _|d urt|}|| }|t|d  }||k rtd| d| d dd	 tt|d D | _d | _d
| _d S )NzJArgument list for 'duration_bins' is expected to be in sorted order (got: r   rE   rD   zYour 'buffer_size' setting of z1 might be too low to satisfy a 'max_duration' of z (given our best guess).c                 S   s   g | ]}t  qS r?   r   )r=   rg   r?   r?   r@   rh   @  s    z,DynamicBucketer.__init__.<locals>.<listcomp>F)r5   r*   r.   r#   r$   r%   r(   r,   r-   r   r   r\   ru   rv   rK   rt   r'   r2   rO   r   r   r   meanrP   rI   rJ   rd   r   _producer_thread_source_exhausted)rR   r5   r*   r.   r#   r$   r%   r(   r,   r-   r'   rK   rt   r2   r\   mean_durationexpected_buffer_durationexpected_bucket_durationr?   r?   r@   rG     sV   



zDynamicBucketer.__init__c                 c   s   t | j| _| jrd| _|   |   n| | j t	| j
t| j| jd}zz	 | |}|}g }| jr@t|| j|d}n|j t|j}W d    n1 sSw   Y  t|| j | jd}tt |}t|trut|d }nt|}|V  |r|jdd |j |j}|D ]}	||	= qW d    n1 sw   Y  nt|D ]}
|  q| jr|   n| | q+ ty   Y nw W | jr| j ! rd| _| j "  d | _ d | _d S | jr| j ! rd| _| j "  d | _ d | _w )NF)rt   r&   r.   T)rK   out_indexes_used)r%   r\   r   )reverse)#rc   r5   r   r2   r   _start_data_producer_thread_maybe_wait_for_producer_collect_cuts_in_bucketsr,   r   rt   rP   r   r.   _select_bucketr'   pick_at_randomrK   mutexrN   queuer   r%   copyr\   re   r:   r   r   rd   getStopIterationr   is_aliver   )rR   statesampling_bucketmaybe_shuffledindexes_usedbatcherr   
batch_size_qidxrg   r?   r?   r@   r   E  s~   





)


zDynamicBucketer.__iter__r   c                    s    j d u r- fdd jD }|s'dd  jD } js"t|dkr%t |} j|S dttgt	f dt
f fdd} }z| j}W n' typ    jrVt z| |d	d
 }W n
 tym   t w Y nw  j| S )Nc                    s   g | ]	}  |r|qS r?   )	_is_readyr=   brm   r?   r@   rh     s    z2DynamicBucketer._select_bucket.<locals>.<listcomp>c                 S   s   g | ]}|r|qS r?   r?   r   r?   r?   r@   rh     ri   r   rs   r8   c                    s      dtf fdd}d}  }}| r| j  sS|dk r-|tjkr-t |d7 } |d dkr:dnd|   t| }t| }| r| j  r S )Nr8   c                      s   d   kot jk S   S r   )rP   r   r?   )r   rR   r?   r@   	valid_idx  s   zGDynamicBucketer._select_bucket.<locals>.scan_buckets.<locals>.valid_idxr   rD      r   )r   r   r   rP   BucketsDontHaveEnoughDataminmax)rs   r   num_attemptsseen_minseen_maxrR   r   )r   r@   scan_buckets  s   



z4DynamicBucketer._select_bucket.<locals>.scan_bucketsc                 S   s   |   dkS r   qsize)r   r?   r?   r@   rp     s    z0DynamicBucketer._select_bucket.<locals>.<lambda>)rt   r   r(   rP   r   rK   choicer   r   r   r   r   r   r   r   )rR   r   ready_bucketsnon_empty_bucketsr   r   selected_bucket_idxr?   r   r@   r     s0   
"


zDynamicBucketer._select_bucketbucketc                 C   sr   | j  }|j t|j}W d    n1 sw   Y  |D ]}|t|tr,|d n| | r6 dS qdS )Nr   TF)	r%   r   r   rN   r   r   r:   r   r   )rR   r  r   contentsrl   r?   r?   r@   r     s   
zDynamicBucketer._is_readyc                    s(    fdd}t j|d _ j  dS )zEStart concurrent filling of the bucket buffer in a background thread.c                     s   z> j s<tdd  jD  jkrtd qt j}  jj	 j
t| tr*| d n| d} j| |   j rW d S W d S  tyK   d _ Y d S w )Nc                 s       | ]}|  V  qd S r9   r   r   r?   r?   r@   rA         zPDynamicBucketer._start_data_producer_thread.<locals>.producer.<locals>.<genexpr>g?r   r   r   T)r   r   r   r,   timesleepre   r   r%   r   r*   r:   r   putr   )r5   r   rm   r?   r@   producer  s   


z=DynamicBucketer._start_data_producer_thread.<locals>.producer)targetN)	threadingThreadr   start)rR   r  r?   rm   r@   r     s   z+DynamicBucketer._start_data_producer_threadc                 C   sb   t dd | jD | jd k r+| js/td t dd | jD | jd k r-| jrdS dS dS dS )zLTriggers wait for producer if the bucket buffers are less than 10% utilized.c                 s   r  r9   r   r   r?   r?   r@   rA     r  z;DynamicBucketer._maybe_wait_for_producer.<locals>.<genexpr>r!   g      ?N)r   r   r,   r   r  r	  rm   r?   r?   r@   r     s   
z(DynamicBucketer._maybe_wait_for_producern_cutsc                 C   sh   z)t |D ]!}t| j}| jj| jt|tr|d n|d}| j| 	| qW dS  t
y3   Y dS w )zIFetches ``n_cuts`` from the input data iterable. Doesn't use concurrency.r   r  N)rd   re   r   r%   r   r*   r:   r   r   r
  r   )rR   r  rg   r5   r   r?   r?   r@   r     s   
z(DynamicBucketer._collect_cuts_in_bucketsc                 C   s:   | j r| jd ur| j rd| _| j  d S d S d S d S r   )r2   r   r   r   r   rm   r?   r?   r@   __del__  s   
zDynamicBucketer.__del__)NNNFr"   NFNNFN)r   r   r   r   r   r   r   r   r   r   r   r   r   ru   rv   r   rG   r   r   r   r   r   r   r   r   r   r   r  r?   r?   r?   r@   r~     sd    	

BJN
r~   r  rK   r   .c                 c   sn    | j  t| j} W d   n1 sw   Y  ttt| }|| |D ]}|| | | V  q(dS )z
    Generator which will yield items in a sequence in a random order.
    It will append the indexes of items yielded during iteration via ``out_used_indexes``.
    N)r   rN   r   rd   rP   r'   r   )r  rK   r   indexesr   r?   r?   r@   r     s   	

r   c                   @   s   e Zd ZdS )r   N)r   r   r   r?   r?   r?   r@   r   -  s    r   c                   C   s   t jdtd d S )Na  Since Lhotse v1.20 'shuffle_buffer_size' is deprecated, because DynamicBucketingSampler does not require a separate shuffling buffer anymore. To improve both shuffling and sampling randomness, increase 'buffer_size' instead. To maintain backward compatibility, we will increase 'buffer_size' by 'shuffling_buffer_size' for you. This argument will be deprecated in a future Lhotse version.rB   )rI   rJ   rL   r?   r?   r?   r@   rM   1  s   
rM   r9   );ru   r  r  rI   dataclassesr   r   	itertoolsr   r   r   typingr   r   r	   r
   r   r   r   r   r   r   r   r   numpyr   rw   lhotser   r   
lhotse.cutr   lhotse.dataset.dataloadingr   lhotse.dataset.sampling.baser   r   r   r   r   lhotse.dataset.sampling.dynamicr   r   r   lhotse.utilsr   r    r   r   r   rQ   r   r~   rv   rN   r   	Exceptionr   rM   r?   r?   r?   r@   <module>   s\    8  o
,&  
