o
    wix1                  
   @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d d	lm Z  zd dl!Z"d d
l!mZm#Z# d dl$m%Z% d dl&m'Z' dZ(W n e)e*e+fy   dZ(e ,d Y nw z
d dl-m.Z. dZ/W n e)e+fy   dZ/Y nw de_0d1 Z2dd Z3dd Z4G dd deZ5e(rG dd de"j6Z7dS G dd deZ7dS )    N)CallableListUnion)Config)Image)SharedEpochWDSUrlsRandomSampler)
WebDataset)ApexGuardDefaults)IterableDataset)logging)r	   warn_and_continue)_shuffle)pytorch_worker_infoTF@Webdataset import failed! We recommend use `webdataset==0.2.48`.)parallel_statei H7zjpg jpeg png ppm pgm pbm pnmc                 C   sj   t dd| }| tvrdS t|}t|}|  |	d}W d   |S 1 s.w   Y  |S )z
    Function to load an image.
    If the image is corrupt, it returns a black image.
    Args:
        key: Image key.
        data: Image data stream.
    z.*[.] NRGB)
resublower_IMG_EXTENSIONSioBytesIOr   openloadconvert)keydata	extensionstreamimg r"   o/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/multimodal/data/common/webdataset.py
pil_loader>   s   

r$   c                  C   s    d} t  rt  rt  } | S )z8Get world size. How many GPUs are available in this job.   )distis_availableis_initializedget_world_size)
world_sizer"   r"   r#   r)   R   s
   r)   c                       sd   e Zd ZdZ				ddedeeee f dededef
 fd	d
Zdd Z	dd Z
dd Z  ZS )WebDatasetCommonzK
    A common dataset object shared by most of NeMo multimodal models.
    NTmap_fn
compose_fnconsumed_samples	filter_fn	decode_fnc	              	      s  t    || _|j| _t | _|j| _| jdd| _	|| _
|| _| jj| _|r>|jj}	|jdd | _|jdd | _n|jj}	|jdd | _|jdd | _t|	trk|	}
t|	}	t|	dkskJ d|
 d|v rtd|jj  d	| _|jjd usJ t|jj}t|| _W d    n1 sw   Y  td
dddid}tj d%i | jd|i| _!|jj"| _"d| _ntd| j  d| _d | _!d | _"t# | _$|	d %dr3|	D ]N}t|d>}t&|}d| j$vr	|d | j$d< |d | j$d< |d | j$d< n| j$d '|d  | j$d  |d 7  < W d    n	1 s)w   Y  q| j$}n-| j$}t(t)j*j+|	|d< t,t-j./|d |d< | jdd|d< |d t|d  |d< t01 | _2|d }|dppd}|| j2|  | | j2|  | _3| j3| | _|| j | _4|d u rt5n|}|d }t|}|dksJ d|d }| jd urtd| jj6 d t7|d | jj6 |d< | 8 \}}|9t:||d}|j;|t<d}| jd ur| jj=d ur|>|}|j(|t<d}t|t,s|g}|D ]}|9|}q|d |_?|d |d t|d  kr#t@d | j	rGtA \}}}}|j?| | j }td | d!|  |jB|d"}td#| td$|j? || _Cd S )&Ninfinite_samplerFaugmentations
filteringsr   zNo files found for boto3z%Init boto3 using credentials file at T   s3max_attemptsi?B )connect_timeoutsignature_versionretriesconfigr   z(Read Webdataset locally. Data stores at z.pklrb	tar_filestotal_key_count
chunk_size  num_workersr%   zDid not find any training data.z
Estimated z" will be remaining after filtering)bufsizeepoch)handlerzCTotal image count is not equal to chunk_size * number of tar files.zSetting nbatches=z" for infinite sampler. world_size=)nbatchesz#Total number of training shards: %dzTotal training key count: %d)r6   )Dsuper__init__dataset_cfgrA   r)   r*   
webdatasetwebdata_cfggetr1   gen_cfgr.   local_root_pathtraindataset_pathr2   r3   
validation
isinstancestrgloblenr   infor4   credentials_file	use_boto3r   jsonr   credentialsr   clientr6   bucketdictwdinfoendswithpickleextendmapwds
shardlistsexpand_urlslist	itertoolschainfrom_iterabler   get_data_parallel_world_sizedata_parallel_sizeconsumed_urls
skip_aheadr$   estimated_portionint_get_webdataset_and_epochcomposedetshuffle2decoder   
resolutionselecttotal_imageswarningr   
with_epoch_dataset)selfrH   r,   r-   r.   r/   rL   r0   is_trainrO   	glob_pathfinr;   dset_info_pathfp	dset_info
train_infor?   rA   shards_train_list
num_shardsshuffle_buffer_sizetrain_datasetrC   fnrankr*   	worker_idrE   	__class__r"   r#   rG   `   s   





	




zWebDatasetCommon.__init__c                    s    j }|d }|d } fdd|D }d} jsJtd  jd u s'J dt|t|| jt	 t
  jdp<d	d
 jjdd
d	}|j} jrat|t jpTd j j jd}||fS t|t jphdd}||fS )Nr?   r=   c                    s   g | ]
}t j j|qS r"   )ospathjoinrM   ).0xry   r"   r#   
<listcomp>   s    z>WebDatasetCommon._get_webdataset_and_epoch.<locals>.<listcomp>r   z&Initiating Webdataset Random Sampler..zUWebdataset Random Sampler should not be used with filters. Switch to infinite samplerrA   r%   Tdata_sharding)	urls
total_urlsr?   r.   data_parallel_rankrj   rA   	drop_lastr   F)rD   	resampledload_from_object_store	s3_clients3_bucket_name)rD   r   )r]   r1   r   rU   r3   r   rT   r.   r   get_data_parallel_rankri   rH   rK   rN   rC   rW   WebDatasetS3r   r6   r[   r	   )ry   r   r?   r   rC   r   r"   r   r#   ro      sH   
z*WebDatasetCommon._get_webdataset_and_epochc                 C   sj   | j  }| jdkr3| js3zt|}|  j| j| j 8  _W n ty*   d| _Y nw | jdkr3| jr|S )Nr   )rx   __iter__rl   r1   nextrj   rA   StopIteration)ry   ds_iter_r"   r"   r#   r     s   

zWebDatasetCommon.__iter__c                 C   s   | j jS N)rx   ru   r   r"   r"   r#   __len__  s   zWebDatasetCommon.__len__)NNNT)__name__
__module____qualname____doc__r   r   r   rn   rG   ro   r   r   __classcell__r"   r"   r   r#   r+   [   s*    
 )
r+   c                   @   s    e Zd Z	d
ddZdd Zd	S )rq   r@   d   r   c                 C   s   || _ || _|| _|| _d S r   )rB   initialseedrC   )ry   rB   r   r   rC   r"   r"   r#   rG     s   
detshuffle2.__init__c                 C   sz   t | jtr| j }n
|  jd7  _| j}t }t s$| j| }n| j| dt	   }|| t
|| j| j|S )Nr%   r   )rQ   rC   r   	get_valuerandomRandomr   r(   r   r   r   rB   r   )ry   srcrC   rngr   r"   r"   r#   run'  s   
zdetshuffle2.runN)r@   r   r   r   )r   r   r   rG   r   r"   r"   r"   r#   rq     s    
rq   c                       s   e Zd Z fddZ  ZS )rq   c                    s   t    td d S )Nr   )rF   rG   r   rv   r   r   r"   r#   rG   <  s   
r   )r   r   r   rG   r   r"   r"   r   r#   rq   ;  s    )8rS   r   rf   rX   r   r_   r   r   typingr   r   r   r4   torch.distributeddistributedr&   botocore.configr   PILr   5nemo.collections.multimodal.data.common.data_samplersr   r   5nemo.collections.multimodal.data.common.webdataset_s3r	   r   2nemo.collections.nlp.modules.common.megatron.utilsr
   nemo.core.classesr   NeMoIterableDataset
nemo.utilsr   rI   rb   r   webdataset.filtersr   webdataset.utilsr   HAVE_WEBDATASETImportErrorAttributeErrorModuleNotFoundErrorrv   megatron.corer   HAVE_MEGATRON_COREMAX_IMAGE_PIXELSsplitr   r$   r)   r+   PipelineStagerq   r"   r"   r"   r#   <module>   sV   	 B