o
    }oi~/                  	   @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
mZ d dlmZ zd dlmZ W n ey=   dZY nw d dlmZ d dlmZ d d	lmZ z
d d
lmZ dZW n eye   dZY nw dejfddZdefddZdefddZdefddZ de!fddZ"de!fddZ#de!dee!e!f fddZ$de!de!fddZ%ed d!de!fd"d#Z&d$e!de!fd%d&Z'd=d(e!d)e(fd*d+Z)d>d(e!d-e!fd.d/Zd?d(e!d0ed)e(de!fd1d2Z*G d3d4 d4Z+d5e+defd6d7Z,d8e
e	e!ef  d9ee-gef d:e	e!ef fd;d<Z.dS )@    N)	lru_cache)AnyCallableDictIterableTuple)urlparse)__version__git)	constants)logging)LogMode)	open_bestTFreturnc                  C   sH   t jtjd} | dkrtjtj dt	 }|S t| 
 }|S )a  
    Utility method to resolve a cache directory for NeMo that can be overriden by an environment variable.

    Example:
        NEMO_CACHE_DIR="~/nemo_cache_dir/" python nemo_example_script.py

    Returns:
        A Path object, resolved to the absolute path of the cache directory. If no override is provided,
        uses an inbuilt default which adapts to nemo versions strings.
     z.cache/torch/NeMo/NeMo_)osenvirongetr   NEMO_ENV_CACHE_DIRpathlibPathjoinpathhomeNEMO_VERSIONresolve)override_dirpath r   I/home/ubuntu/.local/lib/python3.10/site-packages/nemo/utils/data_utils.pyresolve_cache_dir(   s   r   c                 C   s4   zt | }t|jot|jW S  ty   Y dS w )z,Check if a path is from a data object store.F)r   boolschemenetlocAttributeError)r   resultr   r   r   is_datastore_path;   s   r%   c                 C   s
   |  dS )z%Check if a path is for a tarred file.z.tar)endswith)r   r   r   r   is_tarred_pathD      
r'   c                  C   s<   t tjtjd} | dkrdS | dkrdS tdtj )zCheck if store cache is shared.   r   FTzUnexpected value of env )intr   r   r   r    NEMO_ENV_DATA_STORE_CACHE_SHARED
ValueError)cache_sharedr   r   r   is_datastore_cache_sharedI   s   r.   c                  C   sZ   t jtjd} | dkrt  }n	t| 	  }|
tr&t j|}t j|dS )z#Return path to local cache for AIS.r   ais)r   r   r   r   NEMO_ENV_DATA_STORE_CACHE_DIRr   as_posixr   r   r   r&   r   r   dirnamejoin)r   	cache_dirr   r   r   ais_cache_baseV   s   
r5   c                   C   s
   t dS )zGet configured AIS endpoint.AIS_ENDPOINT)r   getenvr   r   r   r   ais_endpointd   r(   r8   uric                 C   sL   t | std|  t| j}|d }tj|dd  }t|t|fS )zParse a path to determine bucket and object path.

    Args:
        uri: Full path to an object on an object store

    Returns:
        Tuple of strings (bucket_name, object_path)
    z(Provided URI is not a valid store path: r)      N)r%   r,   r   PurePathpartsstr)r9   	uri_partsbucketobject_pathr   r   r   bucket_and_object_from_urii   s   	rA   endpointc                 C   s8   t | }|jr
|jstd|  tj|jt|jS )zConvert AIS endpoint to a valid dir name.
    Used to build cache location.

    Args:
        endpoint: AIStore endpoint in format https://host:port

    Returns:
        Directory formed as `host/port`.
    z$Unexpected format for ais endpoint: )r   hostnameportr,   r   r   r3   r=   )rB   r$   r   r   r   ais_endpoint_to_dir{   s   
rE   r)   )maxsizec                  C   sd   t d} | durtd|  | S d}tj|r$tjd|tj	d |S tj
d| dtj	d dS )	z-Return location of `ais` binary if available.r/   NzFound AIS binary at %sz/usr/local/bin/aisz%ais available at the default path: %smodez>AIS binary not found with `which ais` and at the default path .)shutilwhichr   debugr   r   isfileinfor   ONCEwarning)r   default_pathr   r   r   
ais_binary   s   
rR   
store_pathc                 C   sb   t | r*t }|std|  tjt t|}t| \}}tj|||}|S t	d|  )zConvert a data store path to a path in a local cache.

    Args:
        store_path: a path to an object on an object store

    Returns:
        Path to the same object in local cache.
    %AIS endpoint not set, cannot resolve zUnexpected store path format: )
r%   r8   RuntimeErrorr   r   r3   r5   rE   rA   r,   )rS   rB   local_ais_cachestore_bucketstore_object
local_pathr   r   r   datastore_path_to_local_path   s   	rZ      r   num_retriesc           
      C   s   t | rdt }|du rtd|  t }|std|  d| d|  d}d}t|D ]}tj|dtjtjdd	}|j}|	d
rGd} nq-|sb|j
 jddd }	t|  d| d|	 |S dS )a!  Open a datastore object and return a file-like object.

    Args:
        path: path to an object
        num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism

    Returns:
        File-like object that supports read()
    NrT   z(AIS binary is not found, cannot resolve a?  . Please either install it or install Lhotse with `pip install lhotse`.
Lhotse's native open_best supports AIS Python SDK, which is the recommended way to operate with the data from AIStore.
See AIS binary installation instructions at https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.
z get z -FT)shellstdoutstderrtextr)   zutf-8ignore)errorsz* couldn't be opened with AIS binary after z. attempts because of the following exception: )r%   r8   rU   rR   range
subprocessPopenPIPEr^   peekr_   readdecodestripr,   )
r   r\   rB   binarycmddone_procstreamerrorr   r   r   !open_datastore_object_with_binary   s6   

rr   rbrH   c                 C   s,   t rt| |dS t| rt| S t| |dS )NrG   )LHOTSE_AVAILABLElhotse_open_bestr%   rr   open)r   rH   r   r   r   r      s
   r   forcec                 C   s   t | rGt| d}tj|r|rEtj|}tj|s$tj|dd t|d}|j	t
|  |d W d   |S 1 s@w   Y  |S | S )a  Download an object from a store path and return the local path.
    If the input `path` is a local path, then nothing will be done, and
    the original path will be returned.

    Args:
        path: path to an object
        force: force download, even if a local file exists
        num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism

    Returns:
        Local path of the object.
    )rS   T)exist_okwb)r\   N)r%   rZ   r   r   rM   r2   isdirmakedirsrv   writer   rh   )r   rw   r\   rY   	local_dirfr   r   r   get_datastore_object   s   

r   c                   @   s|   e Zd ZdZddededefddZed	efd
dZed	efddZ	dded	efddZ
dded	efddZdd ZdS )DataStoreObjecta'  A simple class for handling objects in a data store.
    Currently, this class supports objects on AIStore.

    Args:
        store_path: path to a store object
        local_path: path to a local object, may be used to upload local object to store
        get: get the object from a store
    NFrS   rY   r   c                 C   s0   |d urt d|| _|| _|r|   d S d S )Nz3Specifying a local path is currently not supported.)NotImplementedError_store_path_local_pathr   )selfrS   rY   r   r   r   r   __init__  s   zDataStoreObject.__init__r   c                 C      | j S )z Return store path of the object.)r   r   r   r   r   rS   $     zDataStoreObject.store_pathc                 C   r   )z Return local path of the object.)r   r   r   r   r   rY   )  r   zDataStoreObject.local_pathrw   c                 C   s   | j st| j|d| _| j S )zGet an object from the store to local cache and return the local path.

        Args:
            force: force download, even if a local file exists

        Returns:
            Path to a local object.
        )rw   )rY   r   rS   r   r   rw   r   r   r   r   .  s   	zDataStoreObject.getc                 C   s   t  )zPush to remote and return the store path

        Args:
            force: force download, even if a local file exists

        Returns:
            Path to a (remote) object object on the object store.
        )r   r   r   r   r   put<  s   	zDataStoreObject.putc                 C   s    t |  d| j d| j }|S )z2Return a human-readable description of the object.z: store_path=z, local_path=)typerS   rY   )r   descriptionr   r   r   __str__G  s   zDataStoreObject.__str__)NF)F)__name__
__module____qualname____doc__r=   r    r   propertyrS   rY   r   r   r   r   r   r   r   r     s    	
r   rX   c                 C   s   |   duS )zA convenience wrapper for multiprocessing.imap.

    Args:
        store_object: An instance of DataStoreObject

    Returns:
        True if get() returned a path.
    N)r   )rX   r   r   r   datastore_object_getM  s   	r   datahandlerkwc                 k   s    | D ]D}t |tsJ |d|v sJ |d }zt|dd}|j|d |V  W q tyG } z||r;W Y d}~qW Y d}~ dS d}~ww dS )a  
    Open URLs and yield a stream of url+stream pairs.
    This is a workaround to use lhotse's open_best instead of webdataset's default url_opener.
    webdataset's default url_opener uses gopen, which does not support opening datastore paths.

    Args:
        data: Iterator over dict(url=...).
        handler: Exception handler.
        **kw: Keyword arguments for gopen.gopen.

    Yields:
        A stream of url+stream pairs.
    urlrs   rG   )rp   N)
isinstancedictr   update	Exception)r   r   r   sampler   rp   exnr   r   r   wds_url_openerY  s    
r   )r[   )rs   )Fr[   )/r   r   rJ   rd   	functoolsr   typingr   r   r   r   r   urllib.parser   nemor	   r   ImportErrorr   
nemo.utilsr   nemo.utils.nemo_loggingr   lhotse.serializationr   ru   rt   r   r   r    r%   r'   r.   r=   r5   r8   rA   rE   rR   rZ   r*   rr   r   r   r   r   r   r   r   r   r   <module>   sX   	/"=
