o
    }oiG*                  
   @   s:  d dl Z d dlZd dlZd dlmZ d dlZd dlmZ d dlm	Z	 z4d dl
mZ d dlmZmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ dZW n eeefyi   dZe	 d Y nw dZ!erdddZefddZ"eddddfddZ#e$e#Z%G dd deeZ&dS G dd deZ&dS )    N)urlparse)ApexGuardDefaults)logging)cachefilters
shardlists)FluidInterface)reraise_exception)DataPipeline)IterableDataset)group_by_keystar_file_expanderTF@Webdataset import failed! We recommend use `webdataset==0.2.48`.
   rb    c              
   K   s  t tjdd}|rtd| tjtjd |dv sJ || dkr8|dkr)tj	j
S |dkr1tjj
S td	| d
|v r|d
 rd}|tk rz'|d j|d | d}|d  }|d t|k}|rht|W S |d7 }W n! ty }	 z|d7 }t|	 td| W Y d}	~	qBd}	~	ww |tk sFtd| |d|v r|d durtj|d | } t| }
|
jdkrt tjdd}t| ||dS |
jdkrt tjdd}t|
j||dS tjd }tj|
j|}|| ||fi |S )a  Open the URL.
        This uses the `gopen_schemes` dispatch table to dispatch based
        on scheme.
        Support for the following schemes is built-in: pipe, file,
        http, https, sftp, ftps, scp.
        When no scheme is given the url is treated as a file.
        You can use the OPEN_VERBOSE argument to get info about
        files being opened.

        This implementation is based on webdataset's gopen,
        with the modification of supporting reading from s3 object_store:
            https://webdataset.github.io/webdataset/api/webdataset/gopen.html#gopen
        Args:
            url (list[str]): the source URL
            mode (str): the mode ("rb", "r")
            bufsize (int): the buffer size
        GOPEN_VERBOSEr   GOPEN)file)r   wb-r   r   zunknown mode object_store	s3_clients3_bucket_name)BucketKeyBodyContentLength   z&Retrying tar file download, attempt {}Nz/Unable to read {} from PBSS. {} attempts tried.local_root_path GOPEN_BUFFER)	bufferingr   __default__)intosenvirongetprintgopen_webdatainfosysstderrstdinbufferstdout
ValueError_NUM_OBJECT_STORE_READ_ATTEMPTS
get_objectreadlenioBytesIO	ExceptionformatConnectionErrorpathjoinr   schemeopengopen_schemes)urlmodebufsizekwverboseattempts3_response_objectobject_content	full_readeprhandler rL   i/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/multimodal/data/common/webdataset_s3.pygopen.   sR   


rN   c                 k   s    | D ]M}t |tsJ |d|v sJ |d }zt|fi |}|j|d |V  W q tyP } z|j|f |_||rDW Y d}~qW Y d}~ dS d}~ww dS )zGiven a stream of url names (packaged in `dict(url=url)`), yield opened streams.

        Args:
            data: Iterator of dictionaires containing url paths.
            handler: Exception handler.
        r@   )streamN)
isinstancedictrN   updater8   args)datarK   rC   sampler@   rO   exnrL   rL   rM   
url_openery   s"   
rW   c           	      C   s0   t | |||||d}t||d}t||d}|S )a  
        Given an iterator of filenames, this function opens the URL streams
        and groups data by keys.

        Args:
            src: Iterator of data dictionaires containing URL names.
            handler: Exception handler.
            load_from_object_store (bool): A boolean flag to specify whether to load from
                object store.
            s3_client: If loading from object store, specify S3 client.
            s3_bucket_name: If loading from object store, specify S3 bucket name.
            local_root_path: If loading from local (or mounted) disk system,
                    specify the root path of the dataset.
        )rK   r   r   r   r   )rK   )rW   r   r   )	srcrK   load_from_object_storer   r   r   streamsfilessamplesrL   rL   rM   tarfile_samples   s   r]   c                       s<   e Zd ZdZedddddejdddddf fdd	Z  ZS )
WebDatasetz?Webdataset class modified to support loading from object store.FNr"   c              	      s  t    t|tr|rJ | | nt|trK|ds#|drKt|}t	|}W d   n1 s7w   Y  d|v sBJ | t
| nOt|tr_d|v sVJ | t
| n;|rj| t
| n0| t
| | | | t
j |du rd}|dur|r| t| n| t| |du s|dkr| t||
|||d dS |d	ks|dksJ | tj||	||d
 dS )a  
            Args:
                urls: An iterator containing a list of url names.
                handler: Exception handler.
                resampled: If true, sample shards from shard list with replacement.
                shardshuffle: If true, shuffles the entire shard list.
                cache_size: Size of cache.
                cache_dir: Path to store cache.
                detshuffle: Whether to use deterministic shuffling when shardshuffle is True.
                nodesplitter: Function for splitting urls among nodes.
                verbose: If True, prints logs.
                load_from_object_store (bool): A boolean flag to specify whether to load from
                    object store.
                s3_client: If loading from object store, specify S3 client.
                s3_bucket_name: If loading from object store, specify S3 bucket name.
                local_root_path: If loading from local (or mounted) disk system,
                    specify the root path of the dataset.
            z.yamlz.ymlNdatasetsTd   r   )rK   rY   r   r   r   r"   )rK   rD   
cache_size	cache_dir)super__init__rP   r   appendstrendswithr>   yaml	safe_loadr   MultiShardSamplerQ   ResampledShardsSimpleShardListsplit_by_workerr   
detshuffleshuffletarfile_to_samplesr   cached_tarfile_to_samples)selfurlsrK   	resampledshardshufflera   rb   rn   nodesplitterrD   rY   r   r   r   rO   spec	__class__rL   rM   rd      sP   
"



WebDataset.__init__)	__name__
__module____qualname____doc__r	   r   single_node_onlyrd   __classcell__rL   rL   rx   rM   r^      s    r^   c                       s   e Zd Z fddZ  ZS )r^   c                    s   t    td d S )Nr   )rc   rd   r   warning)rr   rx   rL   rM   rd   
  s   
rz   )r{   r|   r}   rd   r   rL   rL   rx   rM   r^   	  s    )r   r   )'r6   r&   r,   urllib.parser   rh   2nemo.collections.nlp.modules.common.megatron.utilsr   
nemo.utilsr   webdataset.gopenrN   r*   
webdatasetr   r   r   webdataset.compatr   webdataset.handlersr	   webdataset.pipeliner
   webdataset.pytorchr   webdataset.tariteratorsr   r   HAVE_WEBDATASETImportErrorAttributeErrorModuleNotFoundErrorr   r2   rW   r]   pipelinefilterrp   r^   rL   rL   rL   rM   <module>   sB   
K

"U