o
    SiD                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
 ddlmZ ddlmZ ddlZddlZddlmZ ddlmZ dd	lmZ dd
lmZmZ dd Zdd Zd2ddZd2ddZdd Zdd ZedfddZ G dd deZ!ej"fddZ#ee#Z$dd Z%e	G d d! d!Z&e' Z(d"d# Z)G d$d% d%eZ*e*Z+d&d' Z,d(d) Z-d*d+ Z.G d,d- d-eZ/G d.d/ d/eZ0d0d1 Z1dS )3zdTrain PyTorch models directly from POSIX tar archive.

Code works locally or over HTTP connections.
    N)	dataclassfield)islice)List   )utils)pipelinefilter)IterableDataset)is_iterableobsoletec                 C   s4   |  d}d| }|tjv sJ d| tj| S )zLook up match in the environment with prefix WDS_.

    Args:
        m: A match object.

    Returns:
        str: The value of the environment variable WDS_<m.group(1)>.

    Raises:
        AssertionError: If the environment variable is not found.
    r   WDS_z!missing environment variable wds_)grouposenviron)mkey r   I/home/ubuntu/.local/lib/python3.10/site-packages/webdataset/shardlists.py	envlookup!   s   

r   c                 C   s   t dt| S )zSubstitute ${var} with the value of the environment variable WDS_var.

    Args:
        s (str): String to be substituted.

    Returns:
        str: The substituted string.
    z\$\{(\w+)\})resubr   sr   r   r   envsubst3      	r   c                 c   sD    t j|d\}}}}|dkrt| |d|E dH  dS | E dH  dS )zSplit the input sequence by PyTorch distributed rank.

    Args:
        src: The input sequence to be split.
        group: The process group for distributed training.

    Yields:
        Elements from the input sequence based on the node's rank.
    r   r   Nr   pytorch_worker_infor   srcr   rank
world_sizeworkernum_workersr   r   r   split_by_node?   s
   
r$   c                 c   s4    t j|d\}}}}|dkrtd| E dH  dS )a'  Ensure the input sequence is not split for multi-node training.

    Args:
        src: The input sequence.
        group: The process group for distributed training.

    Yields:
        Elements from the input sequence.

    Raises:
        ValueError: If multi-node training is detected.
    r   r   zWyou need to add an explicit nodesplitter to your input pipeline for multi-node trainingN)r   r   
ValueErrorr   r   r   r   single_node_onlyP   s
   r&   c                 c   s@    t  \}}}}|dkrt| |d|E dH  dS | E dH  dS )zSplit the input sequence by PyTorch DataLoader worker.

    Args:
        src: The input sequence to be split.

    Yields:
        Elements from the input sequence based on the worker's ID.
    r   Nr   )r   r    r!   r"   r#   r   r   r   split_by_workerc   s
   	r'   c                 C   sR   |  d}g }|D ]}tdD ]}|}t|}||kr nq|t| q	|S )aQ  Expand the urls if they are a string.

    If input is a string:
    - split on '::'
    - expand environment variables (using WDS_ prefix)
    - expand braces

    Otherwise:
    - return the input as a list

    Args:
        urls (str or List[str]): URL list or URL string.

    Returns:
        List[str]: List of expanded URLs.
    z::
   )splitranger   extendbraceexpand)urlsurllistresulturlilastr   r   r   expand_urlss   s   
r3   g    eAc                 C   sH   t | tr	t| S t | tr| S t| rtt| |S tdt|  )a>  Expand the given source into a list of URLs.

    Args:
        source (str or List[str] or Iterable): The source to be expanded.
        max_urls (int): Maximum number of URLs to return.

    Returns:
        List[str]: List of expanded URLs.

    Raises:
        ValueError: If the source type is not supported.
    zcannot handle )
isinstancestrr3   listr
   r   r%   type)sourcemax_urlsr   r   r   expand_source   s   

r:   c                       s2   e Zd ZdZd	 fdd	Zdd Zdd Z  ZS )
SimpleShardListz,An iterable dataset yielding a list of URLs.Nc                    sZ   t    t|trt|}nt|}|| _t| jd ts J |du r(t }|| _dS )a-  Initialize the SimpleShardList.

        Args:
            urls (str or List[str]): A list of URLs as a Python list or brace notation string.
            seed (int or bool or None): Random seed for shuffling; if None, no shuffling is done,
                if True, a random seed is generated.
        r   TN)	super__init__r4   r5   r3   r6   r-   timeseed)selfr-   r?   	__class__r   r   r=      s   



zSimpleShardList.__init__c                 C   s
   t | jS )zfReturn the number of URLs in the list.

        Returns:
            int: The number of URLs.
        )lenr-   )r@   r   r   r   __len__   s   
zSimpleShardList.__len__c                 c   sB    | j  }| jdurt| j| |D ]}t|dV  qdS )~Return an iterator over the shards.

        Yields:
            dict: A dictionary containing the URL of each shard.
        Nr0   )r-   copyr?   randomRandomshuffledict)r@   r-   r0   r   r   r   __iter__   s   

zSimpleShardList.__iter__N)__name__
__module____qualname____doc__r=   rD   rL   __classcell__r   r   rA   r   r;      s
    r;   c              
   c   s    ddl }t }z
tddd}W n ty2 } ztt|dd tjd W Y d}~nd}~ww |	|}tdtjd t
| }td	t| d
| tjd t|D ]}||V  qVdS )zResample items from the source with replacement.

    Args:
        src: The source iterable.
        n (int): The number of items to yield. Defaults to sys.maxsize.

    Yields:
        Randomly chosen items from the source.
    r   Nz/dev/randomrb   2   filez# resampled loadingz# resampled got z samples, yielding )rH   r>   openread	ExceptionprintreprsysstderrrI   r6   rC   r*   choice)r   nrH   r?   exnrngitems_r   r   r   
resampled_   s    
&
re   c                 c   s2    d}| D ]	}|V  |d7 }q|dkrt ddS )zEnsure the source yields at least one item.

    Args:
        src: The source iterable.

    Yields:
        Items from the source.

    Raises:
        ValueError: If the source yields no items.
    r   r   zHpipeline stage received no data at all and this was declared as an errorN)r%   )r   countr   r   r   r   	non_empty   s   
rg   c                   @   sL   e Zd ZU dZdZeed< dZeed< dZ	e
ed< eedZee ed	< d
S )MSSourcez!Class representing a data source. nameperepochFresample)default_factoryr-   N)rN   rO   rP   rQ   rj   r5   __annotations__rl   intrm   boolr   r6   r-   r   r   r   r   r   rh     s   
 rh   c                 C   s   t jt j| S )zExpand user and environment variables in a string.

    Args:
        s (str): The string to expand.

    Returns:
        str: The expanded string.
    )r   path
expanduser
expandvarsr   r   r   r   expand  s   	ru   c                       s<   e Zd ZdZejdddeddf fdd	Zd	d
 Z  Z	S )ResampledShardsz<An iterable dataset yielding a list of URLs with resampling.r   NFg    .ATc                    st   t    t||| _|rt| jdkrtdt| jd ts"J || _|du r,t	j
n|| _|| _|| _d| _dS )a  Initialize the ResampledShards.

        Args:
            urls: A list of URLs as a Python list or brace notation string.
            nshards (int): The number of shards to yield. Defaults to sys.maxsize.
            seed (int): The seed for random number generation.
            worker_seed (Callable or None): A function to generate worker-specific seeds.
            deterministic (bool): Whether to use deterministic sampling.
            max_urls (int): Maximum number of URLs to consider.
            empty_check (bool): Whether to check for empty URL list.

        Raises:
            ValueError: If empty_check is True and no shards are found.
        r   z8empty_check=True, but no shards found in ResampledShardsNrk   )r<   r=   r:   r-   rC   r%   r4   r5   nshardsr   pytorch_worker_seedworker_seeddeterministicr?   epoch)r@   r-   rw   r?   ry   rz   r9   empty_checkrA   r   r   r=     s   

zResampledShards.__init__c              
   c   s    |  j d7  _ | jrt|  | j | j}nt|  | j | jt t	 t
d}tjdddkr<td|  t|| _t| jD ]}| jdt| jd }t| j| dV  qGd	S )
rE   r      WDS_SHOW_SEED01z# ResampledShards seed r   rF   N)r{   rz   r   	make_seedry   r?   r   getpidr>   time_nsurandomr   getr[   rH   rI   rb   r*   rw   randintrC   r-   rK   )r@   r?   rd   indexr   r   r   rL   B  s&   zResampledShards.__iter__)
rN   rO   rP   rQ   r]   maxsizerp   r=   rL   rR   r   r   rA   r   rv     s    $rv   c                 C   s(   z	t | d W dS  ty   Y dS w )zCheck for the existence of a Unix PID.

    Args:
        pid (int): The process ID to check.

    Returns:
        bool: True if the process is running, False otherwise.
    r   FT)r   killOSError)pidr   r   r   check_pid_is_running_  s   	r   c                 C   s   t dd| S )zRemove the last extension from a filename.

    Args:
        fname (str): The filename to process.

    Returns:
        str: The filename without the last extension.
    z\.[^.]*$ri   )r   r   )fnamer   r   r   without_last_extensionp  r   r   c                 C   s"   t d| }|s
dS t|dS )zGet the PID from a filename.

    Args:
        fname (str): The filename to process.

    Returns:
        int or None: The PID if found in the filename, None otherwise.
    z^(.*)\._(\d+)_$N   )r   matchrp   r   )r   r   r   r   r   get_pid_from_filename|  s   	r   c                   @   s>   e Zd ZdZ						ddd	Zd
d Zdd Zdd ZdS )DirectoryShardListz8An iterable dataset that yields shards from a directory.*.{tar,tgz,tar.tgz}r      mBrm   rH   Nc                 C   sL   | dsJ tj|sJ || _|| _|| _|| _|| _|| _|| _	dS )a  Initialize the DirectoryShardList.

        Args:
            path (str): The directory path to monitor for shards.
            pattern (str): The glob pattern to match shard files.
            poll (int): The polling interval in seconds.
            timeout (float): The maximum time to wait for new shards.
            mode (str): The mode for handling processed shards.
            select (str): The strategy for selecting shards.
            fate (Any): Currently unused parameter.
        /N)
endswithr   rr   isdirpollpatternmodeselectfatetimeout)r@   rr   r   r   r   r   r   r   r   r   r   r=     s   
zDirectoryShardList.__init__c                 C   s\   | j dkrt| dS | j dkrt|t|d  dS | j dkr,t|t| dS dS )zRecycle a processed shard based on the current mode.

        Args:
            activename (str): The name of the active shard file.
        unlinkkeepz._done_rm   N)r   r   r   renamer   )r@   
activenamer   r   r   recycle  s   


zDirectoryShardList.recyclec                 C   sD   t  tj| jdD ]}t|}|du rqt|s| | qdS )z<Clean up shard files associated with non-existent processes.z*._*_N)globr   rr   joinr   r   r   )r@   r   r   r   r   r   cleanup_files_without_processes  s   
z2DirectoryShardList.cleanup_files_without_processesc                 c   s   t   }t   | | jk rtt| j| j }t|dkr-| jdu r&dS t | j q| j	dkr;t
|dd d}n| j	dkrFt|}ntd| j	 |d	t  d
 }zt|| W n tyo   t | j Y qw t|dV  | | |   t   | | jk sdS dS )zIterate over the shards in the directory.

        Yields:
            dict: A dictionary containing the URL of each shard.
        r   Noldestc                 S   s   t | jS rM   )r   statst_mtime)fnr   r   r   <lambda>  s    z-DirectoryShardList.__iter__.<locals>.<lambda>)r   rH   zunknown selection strategy z._rd   rF   )r>   r   sortedr   rr   r   rC   r   sleepr   minrH   r_   r%   r   r   r   FileNotFoundErrorrK   r   r   )r@   r2   
candidates	candidater   r   r   r   rL     s2   



zDirectoryShardList.__iter__)r   r   r   rm   rH   N)rN   rO   rP   rQ   r=   r   r   rL   r   r   r   r   r     s    
	r   c                   @   sB   e Zd ZdZedddd Zdd Zdd	 Zd
d Zdd Z	dS )MultiShardSamplez=An iterable dataset that samples from multiple shard sources.z6this is going to be replaced with the WIDS JSON format)reasonc                 C   s   d| _ | | dS )zInitialize the MultiShardSample.

        Args:
            fname (str or dict): The filename of the YAML spec or a dictionary containing the spec.
        rk   N)r{   
parse_spec)r@   r   r   r   r   r=     s   zMultiShardSample.__init__c                    s  t | _t|tr|}d}nt|}t|}W d   n1 s!w   Y  t| 	td
 s;J t| t|ddg | _|d D ]}t| 	td
 saJ t| |d|dg }t|trs|g}d	d
 |D }|g krdg}t|dksJ | d|d  |dd  }|d }t|tr|g} fdd
|D }|dd}|dd}	|	t|krtd|	 d|	dkr|dkrtdt|||	|d}
| j|
 td| dt| d|	 tjd qJdS )zParse the specification for multiple shard sources.

        Args:
            fname (str or dict): The filename of the YAML spec or a dictionary containing the spec.
        z{dict}Nzprefix datasets bucketsprefixri   datasetsz#buckets name shards resample choosebucketsc                 S   s   g | ]}t |qS r   )ru   ).0r   r   r   r   
<listcomp>  s    z/MultiShardSample.parse_spec.<locals>.<listcomp>r   z2: FIXME support for multiple buckets unimplementedr   rj   @shardsc                    s2   g | ]}t  t|D ]}tj | qqS r   )r,   ru   r   rr   r   )r   r0   ubucketr   r   r   r     s   2 rm   rk   choosez	perepoch z- must be no greater than the number of shardsz&specify only one of perepoch or choose)rj   r-   rl   rm   z#  rV   )default_rngrb   r4   rK   rX   yaml	safe_loadsetkeysissubsetr)   r6   ru   r   sourcesr5   rC   r%   rh   appendr[   r]   r^   )r@   r   specstreamdsr   rj   r-   rm   nsampleentryr   r   r   r     sF   

**

&zMultiShardSample.parse_specc                 C   s   t || _dS )zSet the current epoch for consistent shard selection among nodes.

        Args:
            seed (int): The seed for the random number generator.
        N)rH   rI   rb   )r@   r?   r   r   r   	set_epoch  s   zMultiShardSample.set_epochc                 C   s   g }| j D ]3}|jdkr| jj|j|jd}n|jdkr/t|j}| j| |d|j }nt|j}||7 }q| j| |S )zGet the list of shards for the current epoch.

        Returns:
            list: A list of shard URLs for the current epoch.
        r   )kN)r   rm   rb   choicesr-   rl   r6   rJ   )r@   r/   r8   lr   r   r   get_shards_for_epoch%  s   





z%MultiShardSample.get_shards_for_epochc                 c   s$    |   }|D ]}t|dV  qdS )zIterate over the shards for the current epoch.

        Yields:
            dict: A dictionary containing the URL of each shard.
        rF   N)r   rK   )r@   r   shardr   r   r   rL   ;  s
   zMultiShardSample.__iter__N)
rN   rO   rP   rQ   r   r=   r   r   r   rL   r   r   r   r   r     s    
	*r   c                 C   s   |  dr	t| S t| S )zCreate a shard list based on the given specification.

    Args:
        spec (str): The specification for creating the shard list.

    Returns:
        IterableDataset: Either a MultiShardSample or a SimpleShardList based on the spec.
    z.yaml)r   r   r;   )r   r   r   r   	shardspecF  s   
	r   rM   )2rQ   r   r   os.pathrH   r   r]   r>   dataclassesr   r   	itertoolsr   typingr   r,   r   ri   r   filtersr   pytorchr	   r
   r   r   r   r$   r&   r'   r3   rp   r:   r;   r   re   	resampledrg   rh   rI   r   ru   rv   ResampledShardListr   r   r   r   r   r   r   r   r   r   <module>   sP   

+	A[`