o
    SiG                  	   @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZmZ d d	lmZ G d
d deZdededeeedf  fddZdededee fddZdS )    N)deepcopy)reduce)add)AnyCallableDictListOptionalTupleTypeUnion)CutSet)Cut)
CutSamplerSamplingDiagnostics)SimpleCutSamplerc                       s  e Zd ZdZedddddededed	ed
ede	ddf fddZ
edee fddZedee fddZedee fddZdeddf fddZdeegef ddfddZ fddZdeee	f f fddZd eee	f ddf fd!d"Zd5d#d$Zdeeef fd%d&Zd'd( Zedefd)d*Zed+d, Z d-e!eeed.f f ddfd/d0Z"ede#fd1d2Z$defd3d4Z%  Z&S )6BucketingSamplera  
    Sorts the cuts in a :class:`CutSet` by their duration and puts them into similar duration buckets.
    For each bucket, it instantiates a simpler sampler instance, e.g. :class:`SimpleCutSampler`.

    It behaves like an iterable that yields lists of strings (cut IDs).
    During iteration, it randomly selects one of the buckets to yield the batch from,
    until all the underlying samplers are depleted (which means it's the end of an epoch).

    Examples:

    Bucketing sampler with 20 buckets, sampling single cuts::

        >>> sampler = BucketingSampler(
        ...    cuts,
        ...    # BucketingSampler specific args
        ...    sampler_type=SimpleCutSampler, num_buckets=20,
        ...    # Args passed into SimpleCutSampler
        ...    max_duration=200
        ... )

    Bucketing sampler with 20 buckets, sampling pairs of source-target cuts::

        >>> sampler = BucketingSampler(
        ...    cuts, target_cuts,
        ...    # BucketingSampler specific args
        ...    sampler_type=CutPairsSampler, num_buckets=20,
        ...    # Args passed into CutPairsSampler
        ...    max_source_duration=200, max_target_duration=150
        ... )
    
   Fr   )sampler_typenum_buckets	drop_lastseedcutsr   r   r   r   kwargsreturnNc                   s   t  j dd|d |_|_|_|_tdd jD r$tdtjd|i_	 fdd	j	D _
tjj _d
g| _dS )a  
        BucketingSampler's constructor.

        :param cuts: one or more ``CutSet`` objects.
            The first one will be used to determine the buckets for all of them.
            Then, all of them will be used to instantiate the per-bucket samplers.
        :param sampler_type: a sampler type that will be created for each underlying bucket.
        :param num_buckets: how many buckets to create.
        :param drop_last: When ``True``, we will drop all incomplete batches.
            A batch is considered incomplete if it depleted a bucket before
            hitting the constraint such as max_duration, max_cuts, etc.
        :param seed: random seed for bucket selection
        :param kwargs: Arguments used to create the underlying sampler for each bucket.
           r   )r   
world_sizerankr   c                 s       | ]}|j V  qd S N)is_lazy).0cs r#   U/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/sampling/bucketing.py	<genexpr>Q       z,BucketingSampler.__init__.<locals>.<genexpr>zBucketingSampler does not support working with lazy CutSet (e.g., those opened with 'load_manifest_lazy', 'CutSet.from_jsonl_lazy', or 'CutSet.from_webdataset'). Please use lhotse.dataset.DynamicBucketingSampler instead.r   c                    s"   g | ]}j |d  ijqS )r   )r   sampler_kwargs)r!   bucket_cut_setsr   selfr#   r$   
<listcomp>_   s    z-BucketingSampler.__init__.<locals>.<listcomp>FN)super__init__r   r   r'   cut_setsany
ValueErrorcreate_buckets_equal_durationbucketsbucket_samplersrandomRandomr   epoch
bucket_rngdepleted)r*   r   r   r   r   r   r   	__class__r)   r$   r-   /   s0   
zBucketingSampler.__init__c                 C   ,   zt dd | jD W S  ty   Y dS w )a  
        Remaining duration of data left in the sampler (may be inexact due to float arithmetic).
        Not available when the CutSet is read in lazy mode (returns None).

        .. note: For BucketingSampler, it's the sum of remaining duration in all buckets.
        c                 s       | ]\}}|j V  qd S r   )remaining_durationr!   _sr#   r#   r$   r%   u       

z6BucketingSampler.remaining_duration.<locals>.<genexpr>Nsum_nondepleted_samplers_with_idxs	TypeErrorr*   r#   r#   r$   r=   l      
z#BucketingSampler.remaining_durationc                 C   r;   )z
        Remaining number of cuts in the sampler.
        Not available when the CutSet is read in lazy mode (returns None).

        .. note: For BucketingSampler, it's the sum of remaining cuts in all buckets.
        c                 s   r<   r   )remaining_cutsr>   r#   r#   r$   r%      rA   z2BucketingSampler.remaining_cuts.<locals>.<genexpr>NrB   rF   r#   r#   r$   rH   {   rG   zBucketingSampler.remaining_cutsc                 C   r;   )z
        Total number of cuts in the sampler.
        Not available when the CutSet is read in lazy mode (returns None).

        .. note: For BucketingSampler, it's the sum of num cuts in all buckets.
        c                 s   r   r   )num_cutsr!   r@   r#   r#   r$   r%      r&   z,BucketingSampler.num_cuts.<locals>.<genexpr>N)rC   r3   rE   rF   r#   r#   r$   rI      s
   zBucketingSampler.num_cutsr6   c                    s&   | j D ]}|| qt | dS )a  
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.

        :param epoch: Epoch number.
        N)r3   	set_epochr,   )r*   r6   r@   r9   r#   r$   rK      s   
zBucketingSampler.set_epoch	predicatec                 C   s   | j D ]}|| qdS )a
  
        Add a constraint on individual cuts that has to be satisfied to consider them.

        Can be useful when handling large, lazy manifests where it is not feasible to
        pre-filter them before instantiating the sampler.

        Example:
            >>> cuts = CutSet(...)
            ... sampler = SimpleCutSampler(cuts, max_duration=100.0)
            ... # Retain only the cuts that have at least 1s and at most 20s duration.
            ... sampler.filter(lambda cut: 1.0 <= cut.duration <= 20.0)
        N)r3   filter)r*   rL   samplerr#   r#   r$   rM      s   
zBucketingSampler.filterc                    s"   t    | jD ]}|  qdS )a8  
        Enables re-setting to the start of an epoch when iter() is called.
        This is only needed in one specific scenario: when we restored previous
        sampler state via ``sampler.load_state_dict()`` but want to discard
        the progress in the current epoch and start from the beginning.
        N)r,   allow_iter_to_reset_stater3   )r*   r@   r9   r#   r$   rO      s   


z*BucketingSampler.allow_iter_to_reset_statec                    sD   t   }|| jt| jdd | jD t| j| j	 d |S )z
        Return the current state of the sampler in a state_dict.
        Together with ``load_state_dict()``, this can be used to restore the
        training loop's state to the one stored in the state_dict.
        c                 S   s   g | ]}|  qS r#   )
state_dictrJ   r#   r#   r$   r+      s    z/BucketingSampler.state_dict.<locals>.<listcomp>)r   r8   r3   r'   bucket_rng_state)
r,   rP   updater   r   r8   r3   r'   r7   getstate)r*   rP   r9   r#   r$   rP      s   
	zBucketingSampler.state_dictrP   c                    s   | d}| j|ksJ d| j d| d| dd | dd | d| _| d	| _| j| d
 t| jt|d ksTJ dt| j dt|d  dt| j| dD ]	\}}|	| q]t
 	| dS )aX  
        Restore the state of the sampler that is described in a state_dict.
        This will result in the sampler yielding batches from where the previous training left it off.

        .. caution::
            The samplers are expected to be initialized with the same CutSets,
            but this is not explicitly checked anywhere.

        .. caution::
            The input ``state_dict`` is being mutated: we remove each consumed key, and expect
            it to be empty at the end of loading. If you don't want this behavior, pass a copy
            inside of this function (e.g., using ``import deepcopy``).

        .. note::
            For implementers of sub-classes of CutSampler: the flag ``self._just_restored_state`` has to be
            handled in ``__iter__`` to make it avoid resetting the just-restored state (only once).
        r   zaError in BucketingSampler.load_state_dict(): Inconsistent number of buckets: current sampler has z, the state_dict has .proportional_samplingNbucket_methodr'   r8   rQ   r3   zbError in BucketingSampler.load_state_dict(): Inconsistent number of samplers: current sampler has )popr   r'   r8   r7   setstatelenr3   zipload_state_dictr,   )r*   rP   r   rN   
sampler_sdr9   r#   r$   r[      s2   

z BucketingSampler.load_state_dictc                 C   sN   | j r| S | j  | j| j| j  | jD ]}t| qdg| j | _	| S )NF)
_just_restored_statediagnosticsreset_current_epochr7   r   r6   r3   iterr   r8   )r*   br#   r#   r$   __iter__   s   


zBucketingSampler.__iter__c                 C   s   | j d jr| j| jS | j}t|dkr|d S | j|\}}| j|\}}z|j|j|j  }W n tyB   ||f Y S w | j |krN||fS ||fS )Nr   r   )	r.   r    r7   choicerD   rY   r=   ZeroDivisionErrorr4   )r*   idx_sampler_pairsidx1sampler1idx2sampler2prob1r#   r#   r$   _select_bucket_with_idx
  s"   
z(BucketingSampler._select_bucket_with_idxc                 C   sN   |    | js$|  \}}zt|W S  ty    d| j|< Y nw | jrt )NT)rO   is_depletedrk   nextStopIterationr8   )r*   idxrN   r#   r#   r$   _next_batch-  s   
zBucketingSampler._next_batchc                 C   s
   t | jS r   )allr8   rF   r#   r#   r$   rl   7     
zBucketingSampler.is_depletedc                 C   s   dd t t| j| jD S )Nc                 S   s    g | ]\}\}}|s||fqS r#   r#   )r!   ro   bsr8   r#   r#   r$   r+   =  s    
zDBucketingSampler._nondepleted_samplers_with_idxs.<locals>.<listcomp>)	enumeraterZ   r3   r8   rF   r#   r#   r$   rD   ;  s
   z0BucketingSampler._nondepleted_samplers_with_idxsbatch.c                 C   s   d S r   r#   )r*   ru   r#   r#   r$   _log_diagnosticsE  s   z!BucketingSampler._log_diagnosticsc                 C   s   t tdd | jD S )Nc                 s   r   r   )r^   r!   bucketr#   r#   r$   r%   J  r&   z/BucketingSampler.diagnostics.<locals>.<genexpr>)r   r   r3   rF   r#   r#   r$   r^   H  s   zBucketingSampler.diagnosticsc                 C   s
   | j  S )zJReturns a string describing the statistics of the sampling process so far.)r^   
get_reportrF   r#   r#   r$   ry   L  rr   zBucketingSampler.get_report)r   r   )'__name__
__module____qualname____doc__r   r   r   intboolr   r-   propertyr	   floatr=   rH   rI   rK   r   r   rM   rO   r   strrP   r[   rb   r
   r   rk   rp   rl   rD   r   rv   r   r^   ry   __classcell__r#   r#   r9   r$   r      sV    "=
)#

"	r   r   r   r   .c                    sX   |d j dd}t|| dg}|dd D ] | fdd|d D  qtt| S )	a  
    Creates buckets of cuts with similar durations.
    Each bucket has the same cumulative duration, but a different number of cuts.

    :param cuts: One or more CutSets; the input CutSets are assumed to have the same cut IDs
        (i.e., the cuts correspond to each other and are meant to be sampled together as pairs,
        triples, etc.).
    :param num_buckets: The number of buckets.
    :return: A list of CutSet buckets (or tuples of CutSet buckets, depending on the input).
    r   T)	ascending)r   r   Nc                 3   s    | ]
} j |jd V  qdS ))cut_idsN)subsetidsrw   cut_setr#   r$   r%   c  s
    
z0create_buckets_equal_duration.<locals>.<genexpr>)sort_by_duration%_create_buckets_equal_duration_singleappendlistrZ   )r   r   first_cut_setbuckets_per_cutsetr#   r   r$   r1   Q  s   

r1   c                    s  t dd | D }|| }ttdt| dttt| dt| d   dd }dd t|D }d	d | D }tt||d
d d}d\}}	|d }
dd t|D }dd t|D  d}i }t|dD ]v\}\}}|du rx|	|
krx|	}|d r||	 | |kr|dur|	|krtdt	|d |d }	n	t	|	d |d }	||	  |7  < |	||| < qh||
 | |kr|dur|
|krt|d d}
nt|
d d}
||
  |7  < |
||| < qht| D ]\}} ||  
| q fddt|D }|S )z
    Helper method to partition a single CutSet into buckets that have the same
    cumulative duration.

    See also: :meth:`.create_buckets_from_duration_percentiles`.
    c                 S      g | ]}|j qS r#   durationr!   cr#   r#   r$   r+   v      z9_create_buckets_equal_duration_single.<locals>.<listcomp>r      r   c                 S   s   i | ]\}}||qS r#   r#   )r!   io_idxr#   r#   r$   
<dictcomp>  s    z9_create_buckets_equal_duration_single.<locals>.<dictcomp>c                 S   r   r#   r   r   r#   r#   r$   r+     r   c                 S   s   | d S )Nr   r#   )xr#   r#   r$   <lambda>  s    z7_create_buckets_equal_duration_single.<locals>.<lambda>)key)r   r   c                 S   s   i | ]}|d qS )r   r#   r!   r   r#   r#   r$   r     r   c                 S   s   i | ]}|g qS r#   r#   r   r#   r#   r$   r     r   Nc                    s   g | ]	}t  | qS r#   )r   	from_cutsr   buckets_cut_dictr#   r$   r+     s    )nprC   r   rangerY   rt   sortedrZ   maxminr   )r   r   total_durationbucket_durationorder	order2idx	durationsordered_cut_durations
last_orderfirst_bucketlast_bucketbuckets_dictmiddle_bucketidx_to_bucket_idr   	order_idxr   cut_idxcutr2   r#   r   r$   r   m  sD   	r   )r4   copyr   	functoolsr   operatorr   typingr   r   r   r   r	   r
   r   r   numpyr   lhotser   
lhotse.cutr   lhotse.dataset.sampling.baser   r   lhotse.dataset.sampling.simpler   r   r~   r1   r   r#   r#   r#   r$   <module>   s6    (  D
