o
    Si,                     @   s2  d Z ddlZddlZddlZddlZddlZddlZddlZddl	m
Z
mZmZ ddlmZ ddlmZ ddlmZ ddlmZ ejdd	Zeejd
dZeejddZdd ZdefddZdefddZdefddZedd Z d%ddZ!G dd dZ"d&dd Z#G d!d" d"Z$G d#d$ d$Z%dS )'z`Code related to caching files downloaded from storage
servers, object servers, and web servers.
    N)CallableIterableOptional)urlparse   )reraise_exception)obsolete	WDS_CACHEz./_cacheWDS_CACHE_SIZE1e18WDS_VERBOSE_CACHE0c                 C   s   t | }|jdv S )z$Check whether a URL is a local file. file)r   scheme)urlparsed r   D/home/ubuntu/.local/lib/python3.10/site-packages/webdataset/cache.pyislocal   s   
r   fnamec              	   C   sj   t j| s
J | ztjddgtjdd W n tjtfy%   tdw tjd| gtj	ddd}|j
S )zGet the file type of a file.r   .T)stdoutcheckz%UNIX/Linux file command not available)r   textr   )ospathexists
subprocessrunDEVNULLSubprocessErrorFileNotFoundErrorAssertionErrorPIPEr   )r   resultr   r   r   get_filetype    s   r'   c                 C   s   t | dF}|d}t|dkr%|dd dkr%|  dW  d   S t|dkr?|d	d
 dkr?|  dW  d   S |  dW  d   S 1 sNw   Y  dS )zDetermine file type by checking magic numbers.

    It checks common formats used with WebDataset: tar archives and gzip files.

    Args:
        fname (str): Path to the file to check

    Returns:
        str: Description of the file type
    rbi      r   s   z: gzip compressed dataNi  i  i  s   ustarz: POSIX tar archivez: data)openreadlen)r   fheaderr   r   r   magic_filetype-   s   
	$r/   c                 C   s4   t j| s
J | t| }d| v pd| v S )z&Check whether a file is a tar archive.ztar archivezgzip compressed)r   r   r   r/   lower)r   ftyper   r   r   check_tar_formatI   s   r2   c                 C   sB   |  dr| dd } | d}|D ]}td|r|  S q| S )z2Guess the actual URL from a "pipe:" specification.zpipe:   N z^(https?|hdfs|gs|ais|s3):)
startswithsplitrematch)specwordswordr   r   r   pipe_cleanerR   s   

r<   c                 C   sp   t | tsJ t| }|jdv r(|j}|d}|d}d|d| d S tj	j
| dd}|dd }|S )z Guess the cache name from a URL.)
Nr   r   httphttpsftpftpsgss3ais/Nz_+{}*,-)safei)
isinstancestrr   r   r   lstripr6   joinurllibparsequote)r   ndirr   r   list_of_directoriesquotedr   r   r   url_to_cache_name^   s   


rQ   c                   @   s<   e Zd ZdZdedejjddfddZdd	 Z	d
d Z
dS )
LRUCleanupz)Perform LRU cleanup on a cache directory.Ng   mBF   c                 C   s(   || _ || _|| _|| _|| _d| _dS )z"Initialize the LRU cleanup object.r   N)	cache_dir
cache_sizekeyfnverboseintervallast_run)selfrT   rU   rV   rW   rX   r   r   r   __init__}   s   	
zLRUCleanup.__init__c                 C   s
   || _ dS )zSet the cache directory.N)rT   )rZ   rT   r   r   r   set_cache_dir   s   
zLRUCleanup.set_cache_dirc              	   C   s^  t j| js	dS | jdurt | j | jk rdS zd}t | jD ]\}}}|D ]}|t jt j	||7 }q*q#|| j
krCW dS g }t | jD ]\}}}|D ]}|t j	|| qRqK|j| jdd t|dkr|| j
kr| }|t j|8 }| jrtd| tjd t | t|dkr|| j
kssW n ttfy   Y nw t | _dS )a  Performs cleanup of the file cache in cache_dir using an LRU strategy,
        keeping the total size of all remaining files below cache_size.

        This is a simple implementation that scans the directory twice - once to compute
        the total size and once to build a list of files for potential deletion. While
        not theoretically optimal for extremely large caches, it is efficient enough
        for practical purposes with typical cache sizes and file counts.
        Nr   T)keyreversez# deleting %sr   )r   r   r   rT   rX   timerY   walkgetsizerJ   rU   appendsortrV   r,   poprW   printsysstderrremoveOSErrorr#   )rZ   
total_sizedirpathdirnames	filenamesfilenamefilesr   r   r   r   cleanup   s<   	

zLRUCleanup.cleanup)__name__
__module____qualname____doc__intr   r   getctimer[   r\   rq   r   r   r   r   rR   z   s    
rR      Fc              	   C   s   |dt    }t| ,}t|d}	 ||}|sn|| qW d   n1 s.w   Y  W d   n1 s=w   Y  t || dS )z%Download a file from `url` to `dest`.z.tempwbTN)r   getpidgopenr*   r+   writerename)r   dest
chunk_sizerW   tempstreamr-   datar   r   r   download   s   

r   c                   @   s&   e Zd ZdZdefddZdd ZdS )StreamingOpenOpen a stream from a URL.Fc                 C   s   || _ || _dS )z%Initialize the streaming open object.N)rW   handler)rZ   rW   r   r   r   r   r[      s   
zStreamingOpen.__init__c                 c   s    |D ]Q}t |tr|d }t|}z#|jdv r(t|jd}t|||jdV  nt|}t||dV  W q tyT } z| |rHW Y d}~qW Y d}~ dS d}~ww dS )r   r   r   r(   r   r   
local_path)r   r   N)	rG   dictr   r   r*   r   r{   	Exceptionr   )rZ   urlsr   r   r   exnr   r   r   __call__   s&   



zStreamingOpen.__call__N)rr   rs   rt   ru   r   r[   r   r   r   r   r   r      s    r   c                   @   s   e Zd ZdZ	dedeeddddee de	egef d	e
d
e	ege
f de	ege
f dedefddZdedefddZdee deej fddZdS )	FileCachea	  Cache files from URLs.

    This class provides functionality to download and cache files from URLs,
    with options for validation, error handling, and cache management.

    Args:
        cache_dir (Optional[str]): The directory to use for caching. Defaults to None.
        url_to_name (Callable[[str], str]): Function to convert URLs to cache names.
        verbose (bool): Whether to print verbose output. Defaults to False.
        validator (Callable[[str], bool]): Function to validate downloaded files.
        handler (Callable[[Exception], bool]): Function to handle exceptions.
        cache_size (int): Maximum size of the cache in bytes. Defaults to -1 (unlimited).
        cache_cleanup_interval (int): Interval between cache cleanup operations in seconds.
    NFrE   rS   )url_to_namerW   	validatorr   rU   cache_cleanup_intervalrT   r   rW   r   r   rU   r   c                C   sZ   || _ || _|| _|d u rt| _n|| _|| _|dkr(t| j|| j|d| _d S d | _d S )Nr   )rW   rX   )r   r   r   default_cache_dirrT   rW   rR   cleaner)rZ   rT   r   rW   r   r   rU   r   r   r   r   r[      s   
zFileCache.__init__r   returnc                 C   s:  t |tsJ t|rt|jS | |}d|vs#J d| d| tj| jtj	|}tj
|dd tj| j|}tj|s| jrRtd||f tjd | jdur\| j  t||| jd	 | jr| |st|}t|d
}|d}W d   n1 sw   Y  t| td|||t|f |S )a2  Download a file from a given URL and return the path to the downloaded file.

        Args:
            url (str): The URL of the file to download.

        Returns:
            str: The path to the downloaded file.

        Raises:
            ValueError: If the downloaded file fails validation.
        rD   zbad cache name z for T)exist_okz# downloading %s to %sr_   N)rW   r(      z3%s (%s) is not a tar archive, but a %s, contains %s)rG   rH   r   r   r   r   r   rJ   rT   dirnamemakedirsr   rW   rf   rg   rh   r   rq   r   r   r'   r*   r+   ri   
ValueErrorrepr)rZ   r   
cache_namedestdirr~   r1   r-   r   r   r   r   get_file  s,   




zFileCache.get_filer   c                 c   s    |D ]P}t |tr|d }d}tdD ]>}z| |}t|d}W n' tyI } z| |r>t| |d9 }W Y d}~qW Y d}~ nd}~ww t|||dV   qdS )at  Download files from a list of URLs and yield file streams.

        Args:
            urls (Iterable[str]): An iterable of URLs to download files from.

        Yields:
            dict: A dictionary containing the URL, file stream, and local path of each downloaded file.

        Raises:
            Exception: If there's an error downloading or opening a file.
        r   g      ?
   r(   g      ?Nr   )	rG   r   ranger   r*   r   r   r`   sleep)rZ   r   r   delay_r~   r   er   r   r   r   0  s*   



zFileCache.__call__)N)rr   rs   rt   ru   rQ   r2   r   r   rH   r   boolr   rv   r[   r   r   ioIOBaser   r   r   r   r   r      s6    	
 "r   )r   )rx   F)&ru   r   r   r7   r   rg   r`   urllib.parserK   typingr   r   r   r   webdataset.gopenr{   handlersr   utilsr   environgetr   floatdefault_cache_sizerv   verbose_cacher   rH   r'   r/   r2   r<   rQ   rR   r   r   r   r   r   r   r   <module>   s6    	


>