o
    iB                  	   @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dl	mZmZ d dlmZ d dlmZ eddeddeddedd	edd
edddd ZG dd deZG dd deZG dd de
ZG dd de
ZG dd deZdS )    N)DistributedSampler)BatchSamplerSampler)tablesbatch_sampler_classesr   CustomDistributedBatchSampler$CustomDistributedDynamicBatchSamplerDynamicBatchLocalShuffleSampler RankFullLocalShuffleBatchSampler'RankFullLocalShuffleDynamicBatchSamplerc                 K   s   i }| dd}|dkrt| fi |}n| dddkr&t| fi |}nt| fi |}||d< | dd|d< | d	d
|d	< |S )N
batch_typeexample	sort_sizer   batch_samplernum_workers   
pin_memoryT)getr   *CustomDistributedBufferDynamicBatchSamplerr   )datasetkwargsdataloader_argsr   r    r   [/home/ubuntu/.local/lib/python3.10/site-packages/funasr/datasets/audio_datasets/samplers.py CustomDistributedBatchSampler_fn   s   r   c                   @   >   e Zd Z					ddefddZdd Zd	d
 Zdd ZdS )r   NTFis_trainingc           	      K   s   z
t  }t  }W n   d}d}Y || _|| _|| _|| _|| _|o%|| _|| _	| j	r<t
| j||  ||  | _ntt
| j||  ||  | _t| j| j | _d| _|dd | _|dd| _d S Nr      max_token_lengthlength_scale_source      ?)distget_rankget_world_sizeranknum_replicasr   
batch_sizer   shuffle	drop_lastlen
total_sizemathceilintnum_samplesepochr   r    r!   	selfr   r(   r'   r&   r)   r*   r   r   r   r   r   __init__)   s0   
z&CustomDistributedBatchSampler.__init__c                    sb  j rt }|j tjtj|d  n	t	t
tj jt  }|t kr9  d | 7  n  |t    d |t    7  t jksVJ  jjj  t jksiJ jd urg } D ]}j|j }|jkr|| qr|  fddt
dt jD }jrt|d jkr|d d }t|S )N	generatorc                       g | ]} ||j   qS r   r(   .0iindicesr3   r   r   
<listcomp>r       z:CustomDistributedBatchSampler.__iter__.<locals>.<listcomp>r   r   )r)   torch	Generatormanual_seedr1   randpermr+   r   tolistlistranger,   r&   r'   r0   r    get_source_lenr!   appendr(   r*   iter)r3   gpadding_sizefiltered_indicesidx
source_lenbatchesr   r<   r   __iter__P   s8   "


z&CustomDistributedBatchSampler.__iter__c                 C      | j | j S Nr0   r(   r3   r   r   r   __len__|      z%CustomDistributedBatchSampler.__len__c                 C   
   || _ d S rR   r1   r3   r1   r   r   r   	set_epoch      
z'CustomDistributedBatchSampler.set_epochNNTFT__name__
__module____qualname__boolr4   rP   rU   rZ   r   r   r   r   r   (   s    
',c                   @   sL   e Zd Z						ddedefddZd	d
 Zdd Zdd Zdd Z	dS )#CustomDistributedBufferBatchSamplerNTF   r   r   c	           
      K   s   z
t  }t  }W n   d}d}Y || _|| _|| _|| _|| _|o%|| _|| _	| j	r<t
| j||  ||  | _ntt
| j||  ||  | _t| j| j | _d| _|	dd | _|	dd| _|| _d S r   )r#   r$   r%   r&   r'   r   r(   r   r)   r*   r+   r,   r-   r.   r/   r0   r1   r   r    r!   r   )
r3   r   r(   r'   r&   r)   r*   r   r   r   r   r   r   r4      s2   

z,CustomDistributedBufferBatchSampler.__init__c           	         s   j rt }| j tjt j|d }n	t	t
t j} jt| }|t|kr9||d | 7 }n|||t|  |d |t|   7 }t| jksVJ | j j j }t| jksiJ  jd urg }|D ]} j| j }| jkr|| qr|}g }g }|D ]"}|| t| jkr|j fddd | | g }q|r|j fddd | | t|S )Nr5   c                        j | S rR   r   rG   xrT   r   r   <lambda>       z>CustomDistributedBufferBatchSampler.__iter__.<locals>.<lambda>keyc                    rd   rR   re   rf   rT   r   r   rh      ri   )r)   r@   rA   rB   r1   rC   r+   r   rD   rE   rF   r,   r&   r'   r0   r    rG   r!   rH   r   sortextend_create_batches_from_bufferrI   )	r3   rJ   r=   rK   rL   rM   rN   sorted_batchesbufferr   rT   r   rP      sF   "



z,CustomDistributedBufferBatchSampler.__iter__c                    sJ    fddt dt jD }jr#t|d jkr#|d d }|S )Nc                    r7   r   r8   r9   rp   r3   r   r   r>      r?   zSCustomDistributedBufferBatchSampler._create_batches_from_buffer.<locals>.<listcomp>r   r   )rF   r+   r(   r*   )r3   rp   batched_bufferr   rq   r   rn      s   z?CustomDistributedBufferBatchSampler._create_batches_from_bufferc                 C   rQ   rR   rS   rT   r   r   r   rU      rV   z+CustomDistributedBufferBatchSampler.__len__c                 C   rW   rR   rX   rY   r   r   r   rZ      r[   z-CustomDistributedBufferBatchSampler.set_epoch)NNTFTrc   )
r^   r_   r`   ra   r/   r4   rP   rn   rU   rZ   r   r   r   r   rb      s     	
)4	rb   c                   @   r   )r   NTFr   c           	      K   s   z
t  }t  }W n   d}d}Y || _|| _|| _|| _|| _|o%|| _|| _	t
| j| _d| _|dd| _|dd| _d S )Nr   r   r       r!   r"   )r#   r$   r%   r&   r'   r   r(   r   r)   r*   r+   r,   r1   r   r    r!   r2   r   r   r   r4      s"   
z-CustomDistributedDynamicBatchSampler.__init__c           
      C   s  | j rt }|| j tjt| j|d }n	t	t
t| j}|| j| j| j }g }g }d}d}|D ]7}| j|}|| jkrEq7||k rK|n|t|d  }	|	| jkrd|| ||krc|}q7|| |g}|}q7|r| jr}t|| | jkr|| t|S )Nr5   r   r   )r)   r@   rA   rB   r1   rC   r+   r   rD   rE   rF   r&   r,   r'   rG   r    r(   rH   r*   rI   )
r3   rJ   r=   rO   batchmax_len_in_batchcurrent_batch_lengthrM   sample_lengthpotential_batch_lengthr   r   r   rP     s8   





z-CustomDistributedDynamicBatchSampler.__iter__c                 C   s   dS )Nr   r   rT   r   r   r   rU   <  s   z,CustomDistributedDynamicBatchSampler.__len__c                 C   rW   rR   rX   rY   r   r   r   rZ   @  r[   z.CustomDistributedDynamicBatchSampler.set_epochr\   r]   r   r   r   r   r      s    
 (c                   @   sN   e Zd Z									ddeded	efd
dZdd Zdd Zdd ZdS )r   tokenNFTrc   r   r   r   
start_stepc                 K   s   z
t  }t  }W n   d}d}Y || _|| _|| _|| _|| _|	| _|o(|	| _	|| _
t| j| _tt| j| j | _d| _|
| | _|dd| _|dd| _|dd| _|| _d| _| jdkrrtd	| j  d S d S )
Nr   r   r    rs   r!   r"   batch_size_sample_max   z5Warning, start_step > 0, dataloader start from step: )r#   r$   r%   r&   r'   r   r(   r   r   r)   r*   r+   r,   r/   r-   r.   r0   r1   r   r   r    r!   r{   rz   	batch_numlogginginfo)r3   r   r(   r   r'   r&   
rank_splitr)   r*   r   r   rz   r   r   r   r   r4   E  s4   


z3CustomDistributedBufferDynamicBatchSampler.__init__c              
      s   j r t }| j t j tjt j	|d
 }n	ttt j	}g }tdt| jD ]j}t||| j   fddd}g }d}d}|D ]G}	 j	|	}
|
 jkr]qO jdkrddn|
}t||t|d  }| jkr| jk r||	 t||}|d7 }qO|| |	g}|}d}qO|r|| q4tt| j }| j }|t| }|tj||d7 }d	d
 t jD }t|D ]\}}|| j  | q| j  jd  }t| _td j d j dt| j  d j  t |S )Nr5   r   c                    rd   rR   re   )rM   rT   r   r   rh     ri   zECustomDistributedBufferDynamicBatchSampler.__iter__.<locals>.<lambda>rj   r   r   )kc                 S   s   g | ]}g qS r   r   )r:   _r   r   r   r>     s    zGCustomDistributedBufferDynamicBatchSampler.__iter__.<locals>.<listcomp>zrank: z, dataloader start from step: z, batch_num: z	, after: )!r)   r@   rA   rB   r1   randomseedrC   r+   r   rD   rE   rF   r   sortedrG   r    r   maxr(   r{   rH   r-   r.   r'   choices	enumerater&   rz   r}   r~   r   rI   )r3   rJ   r=   buffer_batchesr;   rp   rt   ru   countrM   original_sample_lengthrw   rx   batches_per_ranktotal_batches_neededextra_batchesrank_batchesfinal_batchesr   rT   r   rP   y  sX   







*z3CustomDistributedBufferDynamicBatchSampler.__iter__c                 C   s   | j S rR   )r}   rT   r   r   r   rU     s   z2CustomDistributedBufferDynamicBatchSampler.__len__c                 C   rW   rR   rX   rY   r   r   r   rZ     r[   z4CustomDistributedBufferDynamicBatchSampler.set_epoch)	ry   NNFTFTrc   r   )	r^   r_   r`   ra   r/   r4   rP   rU   rZ   r   r   r   r   r   D  s(    

4:r   c                       s4   e Zd Z	d
 fdd	Z fddZdd	 Z  ZS )DistributedSamplerWarpNTFc                    s   |d u rt j stdt j }|d u r$t j stdt j }|| _|| _|| _|| _	|| _
|| _t| j| j| j	| j
d| _t | j|| d S )Nz,Requires distributed package to be available)r'   r&   r)   )r@   distributedis_availableRuntimeErrorr%   r$   r   r(   r'   r&   r)   r*   r   samplersuperr4   )r3   r   r(   r'   r&   r)   r*   	__class__r   r   r4     s$   



zDistributedSamplerWarp.__init__c                    s   | j r
| j| j t  S rR   )r)   r   rZ   r1   r   rP   rT   r   r   r   rP     s   
zDistributedSamplerWarp.__iter__c                 C   rW   rR   rX   rY   r   r   r   rZ     r[   z DistributedSamplerWarp.set_epoch)NNTF)r^   r_   r`   r4   rP   rZ   __classcell__r   r   r   r   r     s
    r   )r@   numpynpr~   r-   r   torch.distributedr   r#   torch.utils.datar   r   r   funasr.registerr   registerr   r   rb   r   r   r   r   r   r   r   <module>   s,    





\oQw