o
    }oi                  	   @   s   d dl mZ d dlZd dlmZ zd dlmZ W n eefy.   d dl	mZ e
d Y nw G dd dZG dd	 d	eZdS )
    )ValueN)logging)IterableDatasetz@Webdataset import failed! We recommend use `webdataset==0.2.48`.c                   @   s,   e Zd Zd
defddZdd Zdd Zd	S )SharedEpochr   epochc                 C   s   t d|| _d S )Ni)r   shared_epochselfr    r   i/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/multimodal/data/common/data_samplers.py__init__   s   zSharedEpoch.__init__c                 C   s   || j _d S Nr   valuer	   r   r   r   	set_value!   s   zSharedEpoch.set_valuec                 C   s   | j jS r   r   r
   r   r   r   	get_value$   s   zSharedEpoch.get_valueN)r   )__name__
__module____qualname__intr   r   r   r   r   r   r   r      s    r   c                       sN   e Zd Zdedededededededef fd	d
Zdd Zdd Z  ZS )WDSUrlsRandomSampler
total_urls
chunk_sizeconsumed_samplesdata_parallel_rankdata_parallel_sizenum_workers	drop_lastdata_shardingc
           
         s   t    || _|| _|| _|| dkrtd |||  | ||  | _| j| | _|| _	|| _
|| _|	| _t | _| j| j
 | _dS )a  Sampler for WebDataset Urls with data parallelism.
        Args:
            urls : The urls of the tar files from which to sample.
            total_urls (int): Total number of urls in the dataset.
            chunk_size (int): Number of objects per tar file.
            consumed_samples (int): Number of samples consumed so far by the training process.
                **Note samples here is not urls.**
            data_parallel_rank (int): Rank of the current data parallel process.
            data_parallel_size (int): Number of data parallel processes.
            drop_last (bool): If True, drop the remaining urls if the number is smaller than `data_parallel_size`.
                If False, pad the urls until its size is divisible by `data_parallel_size`.
            data_sharding (bool): If True, use data sharding before data shuffling, i.e. only shuffle within the data parallel group.
        r   z-Multimodal data resuming will be approximate!N)superr   urlsr   r   r   warningconsumed_urlsr   r   r   r   r    r   r   remaining_urls)
r
   r"   r   r   r   r   r   r   r   r    	__class__r   r   r   )   s   

zWDSUrlsRandomSampler.__init__c                 C   s(   | j r	| j| j S | j| j d | j S )N   )r   r   r   r   r   r   r   __len__V   s   zWDSUrlsRandomSampler.__len__c                 #   s   d\}}t jj }|d ur|j|j}}| j| j|  | j | j|  | _	| j
s/| jdkr6| j| j }n	| j| j | j }| j| j	|  | j	| }| jr|| j }|| j }| j|  t  }|| j  t j||d }	 fdd|	|d  D }
n(|}|}t  }|| j  t j||d }||d  }|| jd | j }
t j| j|d }t|
D ]7\}}|  j| j| j 7  _|d ur|| |krq|| jk rt| j| dV  qt| j||| j   dV  qd S )N)r   r(   r   )	generatorc                    s   g | ]} | qS r   r   ).0x	start_idxr   r   
<listcomp>z   s    z1WDSUrlsRandomSampler.__iter__.<locals>.<listcomp>)url)torchutilsdataget_worker_infoidr   r   r   r   r$   r   r%   r   r   r   r    r   	Generatormanual_seedr   randpermtolist	enumeratedictr"   )r
   	worker_idr   worker_infoactive_total_urlscurrent_epoch_urlsbucket_sizebucket_offsetg
random_idx	idx_rangefull_bucket_sizefull_bucket_offsetidx_range_totalidx_range_activeadditional_random_idxnidxr   r-   r   __iter__\   sT   




zWDSUrlsRandomSampler.__iter__)	r   r   r   r   boolr   r)   rL   __classcell__r   r   r&   r   r   (   s(    	
-r   )multiprocessingr   r1   
nemo.utilsr   webdataset.pytorchr   ImportErrorModuleNotFoundErrornemo.core.classesr#   r   r   r   r   r   r   <module>   s   