o
    SiF                     @   s   d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ i ZG dd	 d	Zd'd
dZd(ddZd(ddZd(ddZd(ddZd(ddZd(ddZd(ddZdd Z	 eeeeeeeeeeeedZdejv rejd d D ]Zeee< qzd!d" Zd(d#d$Zd%d& ZdS ))z!Open URLs by calling subcommands.    N)PIPEPopen)urlparse)url2pathname   )utilsc                   @   st   e Zd ZdZdddg dddZdd	 Zd
d Zdd Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd ZdS )PipeaA  Wrapper class for subprocess.Pipe.

    This class looks like a stream from the outside, but it checks
    subprocess status and handles timeouts with exceptions.
    This way, clients of the class do not need to know that they are
    dealing with subprocesses.

    Args:
        *args: Passed to `subprocess.Pipe`
        mode: The mode for opening the pipe.
        timeout: Timeout for closing/waiting.
        ignore_errors: Don't raise exceptions on subprocess errors.
        ignore_status: List of status codes to ignore.
        **kw: Passed to `subprocess.Pipe`
    Ng      @F)modetimeoutignore_errorsignore_statusc                O   s   || _ dg| | _|| _||f| _|d dkr3t|dti|| _| jj| _| jdu r2t	| dn!|d dkrTt|dti|| _| jj
| _| jdu rTt	| dd| _dS )zCreate an IO Pipe.r   rstdoutNz: couldn't openwstdin)r   r   r
   argsr   r   procr   stream
ValueErrorr   status)selfr	   r
   r   r   r   kw r   D/home/ubuntu/.local/lib/python3.10/site-packages/webdataset/gopen.py__init__+   s    






zPipe.__init__c                 C   s   d| j  dS )z2Return a string representation of the Pipe object.z<Pipe >)r   r   r   r   r   __str__E   s   zPipe.__str__c                 C   s"   | j  }|dur|   dS dS )z'Poll the process and handle any errors.N)r   pollwait_for_child)r   r   r   r   r   check_statusI   s   
zPipe.check_statusc                 C   s   t tjdd}| jdur|rdS | j | _|r6td| j dt  d| jj	 d| j
 dt 
tjd | j| jvrM| jsOt| j
 d	| j d
t dS dS )z>Check the status variable and raise an exception if necessary.GOPEN_VERBOSEr   Nzpipe exit [ :z] filez: exit z (read) )intosenvirongetr   r   waitprintgetpidpidr   infosysstderrr   r   IOError)r   verboser   r   r   r   O   s   ,zPipe.wait_for_childc                 O      | j j|i |}|   |S )zWrap stream.read and checks status.

        Args:
            *args: Arguments to pass to stream.read
            **kw: Keyword arguments to pass to stream.read

        Returns:
            The result of stream.read
        )r   readr    r   r   r   resultr   r   r   r4   ^      
z	Pipe.readc                 O   r3   )zWrap stream.write and checks status.

        Args:
            *args: Arguments to pass to stream.write
            **kw: Keyword arguments to pass to stream.write

        Returns:
            The result of stream.write
        )r   writer    r5   r   r   r   r8   l   r7   z
Pipe.writec                 O   s*   | j j|i |}| j | _|   |S )zWrap stream.readLine and checks status.

        Args:
            *args: Arguments to pass to stream.readLine
            **kw: Keyword arguments to pass to stream.readLine

        Returns:
            The result of stream.readLine
        )r   readLiner   r   r   r    r5   r   r   r   r9   z   s   
zPipe.readLinec                 C   s2   | j js| j   | j| j| _|   dS dS )z>Wrap stream.close, wait for the subprocess, and handle errors.N)r   closedcloser   r*   r
   r   r   r   r   r   r   r;      s
   
z
Pipe.closec                 C   s   | S )Context handler.r   r   r   r   r   	__enter__   s   zPipe.__enter__c                 C   s   |    dS )r<   N)r;   )r   etypevalue	tracebackr   r   r   __exit__   s   zPipe.__exit__c                 C   s$   z|    W dS  ty   Y dS w )zClose the stream upon delete.

        This is a fallback for when users can't use context managers.
        We catch all exceptions since __del__ should never raise exceptions
        during garbage collection.
        N)r;   	Exceptionr   r   r   r   __del__   s
   zPipe.__del__)__name__
__module____qualname____doc__r   r   r    r   r4   r8   r9   r;   r=   rA   rC   r   r   r   r   r      s"    r   c                 C   sJ   t | tsdS |dur|| _|dur|| _|dur|| _|dur#|| _dS )a  Set options for Pipes.

    This function can be called on any stream. It will set pipe options only
    when its argument is a pipe.

    Args:
        obj: Any kind of stream
        timeout: Desired timeout
        ignore_errors: Desired ignore_errors setting
        ignore_status: Desired ignore_status setting
        handler: Desired error handler

    Returns:
        True if options were set, False otherwise
    FNT)
isinstancer   r
   r   r   handler)objr
   r   r   rI   r   r   r   set_options   s   
rK   rb    c                 C   s"   |  drtdd| } t| |S )zOpen a file.

    This works for local files; path names only.

    Args:
        url: URL to be opened
        mode: Mode to open it with
        bufsize: Requested buffer size

    Returns:
        An opened file object
    zfile:z	^file://? )
startswithresubopen)urlr	   bufsizer   r   r   
gopen_file   s   

rU   c                 C   sV   |  dsJ tjrtd| dd }|d dv r$t||d|dgd	S t| d
)a  Use gopen to open a pipe.

    This function deliberately uses shell=True with the pipe URL to enable shell command
    execution directly from URLs. This is an intentional design feature that allows users
    to construct processing pipelines using shell commands via the pipe: URL scheme.
    The purpose is to enable flexible data processing directly within data loading pipelines.

    Note: This feature requires careful use with trusted input sources only, as it will
    execute arbitrary shell commands specified in the URL.

    Args:
        url: A pipe: URL
        mode: Desired mode
        bufsize: Desired buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is unknown
    zpipe:z8gopen_pipe: unsafe_gopen is False, cannot open pipe URLs   Nr   )r   r   T   )r	   shellrT   r   : unknown mode)rO   r   enforce_securityr   r   )rS   r	   rT   cmdr   r   r   
gopen_pipe   s   r\   c                 C   s~   |d dkrdddddddd	d
d| g}t |||ddgdS |d dkr8dd	d
ddddd| g	}t |||ddgdS t| d)zOpen a URL with `curl`.

    Args:
        url: URL (usually, http:// etc.)
        mode: File mode
        bufsize: Buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is unknown
    r   r   curl--connect-timeout30--retry--retry-delay2-f-s-LrW      r	   rT   r   r   z-XPUTz-T-   rY   r   r   rS   r	   rT   cmd_argsr   r   r   
gopen_curl   s"   rn   c                 C   sb   |d dkrt dd| } ddd| g}t|||dd	gd
S |d dkr*t| dt| d)zOpen a URL with `curl`.

    Args:
        url: URL (usually, http:// etc.)
        mode: File mode
        bufsize: Buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is write or unknown
    r   r   z(?i)^htgs://zgs://r]   rd   re   rW   rf   rg   r   : cannot writerY   )rP   rQ   r   r   rl   r   r   r   
gopen_htgs  s   rp   c           
      C   s   ddl m}m}m} |d dkrA| | }||j|j|j|jd}| }dddddd	d
ddddd| |g}	t	|	||ddgdS |d dkrNt
| dt
| d)zOpen a URL with `curl`.

    Args:
        url: URL (usually, hf:// etc.)
        mode: File mode
        bufsize: Buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is write or unknown
    r   )HfFileSystem	get_token
hf_hub_urlr   )repo_idfilename	repo_typerevisionr]   r^   r_   r`   ra   rb   rc   rd   re   z-HzAuthorization:Bearer rW   rf   rg   r   ro   rY   )huggingface_hubrq   rr   rs   resolve_pathrt   path_in_reporv   rw   r   r   )
rS   r	   rT   rq   rr   rs   resolved_pathhttp_urltokenrm   r   r   r   gopen_hf;  sB   r~   c                 C   sd   |d dkrdd| g}t |||ddgdS |d dkr+dd	d
| g}t |||ddgdS t| d)zOpen a URL with `gsutil`.

    Args:
        url: URL (usually, gs:// etc.)
        mode: File mode
        bufsize: Buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is unknown
    r   r   gsutilcatrW   rf   rg   r   cpri   rj   rY   rk   rl   r   r   r   gopen_gsutilo  s"   
r   c                 C   sf   |d dkrdd| dg}t |||ddgdS |d d	kr,dd
d| g}t |||ddgdS t| d)zOpen a URL with `ais`.

    Args:
        url: URL (usually, ais:// etc.)
        mode: File mode
        bufsize: Buffer size

    Returns:
        A Pipe object

    Raises:
        ValueError: If the mode is unknown
    r   r   aisr)   ri   rW   rf   rg   r   putrj   rY   rk   rl   r   r   r   	gopen_ais  s"   r   c                 O   s   t |  d)zRaise a value error.

    Args:
        url: URL
        *args: Other arguments
        **kw: Other keywords

    Raises:
        ValueError: Always raised with the URL and a message
    z: no gopen handler defined)r   )rS   r   r   r   r   r   gopen_error  s   r   )__default__pipehttphttpsr   sftpftpsscpgshtgshfUSE_AIS_FORr#   c                 C   s   d}t tjdd}|tjvr| S tjrtdtj| dD ]'}|dd\}}t	d| || }|| krH|rDt
d	|  d
|  |  S q!| S )aY  Rewrite the URL based on environment variables.

    This function checks for URL rewrite rules defined in the GOPEN_REWRITE
    environment variable and applies them to the given URL. The rewrite rules
    allow for flexible modification of URLs before they are processed by the
    gopen system.

    The GOPEN_REWRITE environment variable should contain one or more rewrite
    rules separated by semicolons. Each rule consists of two parts separated
    by an equals sign: a pattern to match at the start of the URL, and a
    replacement string.

    Format of GOPEN_REWRITE:
    GOPEN_REWRITE="pattern1=replacement1;pattern2=replacement2;..."

    The function applies these rules in order, stopping at the first match.
    If a match is found, the pattern is replaced with the corresponding
    replacement at the start of the URL.

    The GOPEN_VERBOSE environment variable can be set to control logging.
    If GOPEN_VERBOSE is set to a non-zero value, the function will print
    information about any URL rewrites that occur.

    Note: This function performs basic URL rewriting without validation.
    Validation of the resulting URL for security concerns (such as path traversal)
    is the responsibility of the caller.

    Args:
        url (str): The original URL to potentially rewrite.

    Returns:
        str: The rewritten URL if a rewrite rule matches, otherwise the original URL.

    Example:
        If GOPEN_REWRITE is set to "http://old.com/=http://new.com/;ftp://=http://"
        and the input URL is "http://old.com/file.txt", the function will return
        "http://new.com/file.txt".
    GOPEN_REWRITEr!   r   zSrewrite_url: unsafe_gopen is False, cannot rewrite URLs using environment variables;=r   ^zGOPEN REWRITE z -> )r&   r'   r(   r)   r   rZ   r   splitrP   rQ   r+   )rS   namer2   r   kvnurlr   r   r   rewrite_url  s   '
r   c                 K   s  t tjdd}|rtd| ttjd |dv sJ || dkr7|dkr(tjj	S |dkr0tj
j	S td	| t| } t| }|jd
kr[tjrKtdt tjdd}t| ||dS |jdkrztjrgtdt tjdd}tt|j||dS td }t|j|}|| ||fi |S )a  Open the URL using various schemes and protocols.

    This function provides a unified interface for opening resources specified by URLs,
    supporting multiple schemes and protocols. It uses the `gopen_schemes` dispatch table
    to handle different URL schemes.

    Built-in support is provided for the following schemes:
    - pipe: for opening named pipes
    - file: for local file system access
    - http, https: for web resources
    - sftp, ftps: for secure file transfer
    - scp: for secure copy protocol

    When no scheme is specified in the URL, it is treated as a local file path.

    Environment Variables:
    - GOPEN_VERBOSE: Set to a non-zero value to enable verbose logging of file operations.
      Format: GOPEN_VERBOSE=1
    - USE_AIS_FOR: Specifies which cloud storage services should use AIS (and its cache) for access.
      Format: USE_AIS_FOR=aws:gs:s3
    - GOPEN_BUFFER: Sets the buffer size for file operations (in bytes).
      Format: GOPEN_BUFFER=8192

    Args:
        url (str): The source URL or file path to open.
        mode (str): The mode for opening the resource. Only "rb" (read binary) and "wb" (write binary) are supported.
        bufsize (int): The buffer size for file operations. Default is 8192 bytes.
        **kw: Additional keyword arguments to pass to the underlying open function.

    Returns:
        file-like object: An opened file-like object for the specified resource.

    Raises:
        ValueError: If an unsupported mode is specified.
        Other exceptions may be raised depending on the specific handler used for the URL scheme.

    Note:
    - For stdin/stdout operations, use "-" as the URL.
    - The function applies URL rewriting based on the GOPEN_REWRITE environment variable before processing.
    r!   r   GOPENr$   )rL   wbri   rL   r   zunknown mode rN   z5gopen: unsafe_gopen is False, cannot open local filesGOPEN_BUFFER)	bufferingr%   r   )r&   r'   r(   r)   r+   r.   r/   r0   r   bufferr   r   r   r   schemer   rZ   rR   r   pathgopen_schemes)rS   r	   rT   r   r2   prrI   r   r   r   gopen  s2   *

r   c                 K   s   t | dfi |S )zOpen url with gopen and mode "rb".

    Args:
        url: Source URL
        **kw: Other keywords forwarded to gopen

    Returns:
        An opened file-like object in read mode
    rL   )r   )rS   r   r   r   r   readerR  s   
r   )NNNN)rL   rM   ) rG   r'   rP   r/   
subprocessr   r   urllib.parser   urllib.requestr   rN   r   r.   r   rK   rU   r\   rn   rp   r~   r   r   r   dictr   r(   r   protor   r   r   r   r   r   r   <module>   sP    



&
"

4
""


7F