o
    }oi6                  
   @   s:  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZ zd dlZd dlZd
ZW n e yr Z! zdZW Y dZ![!ndZ![!ww dZ"dZ#dZ$dZ%dZ&dZ'G dd dZ(dd Z)dd Z*eeddddedee*eeej+d	d*de,de,ded efd!d"Z-eeddddedee*eeej+dd*de,de,d#e,d efd$d%Z.eeddddedee*eeej+dd*dede,de,d efd&d'Z/eeddddedee*eeej+dd*d#e,de,de,d efd(d)Z0dS )+    N)BytesIO)Path)ListOptionalTuple)TransferConfig)ClientError)before_sleep_logretryretry_if_exceptionstop_after_delaywait_exponential)logging)build_s3_url	is_s3_urlTFi   i   @z/dev/shm@      
   c                   @   s  e Zd ZdZ	 ed,dededefddZededdfd	d
Zee	e
fdedededefddZee	e
fdededededdf
ddZee	efdededededdf
ddZee	edfdedefddZe				d-dedededee dejjdee fddZeddddi fdedejjdefd d!Zed"edeeeef  fd#d$Zedefd%d&Zed'ee defd(d)Zed'edefd*d+ZdS ).S3Utilszy
    Utility class for interacting with S3. Handles downloading and uploading to S3, and parsing/formatting S3 urls.
    Fs3_pathmatch_directoryreturnc                 C   s   t | \}}|sdS t  }|jj}z|j|d|ddg }W n |jjy-   Y dS w |dkr4dS t	|dkoD|pD|d d 
|S )	z
        :s3_path: the path
        :match_directory: if the content is known to be a directory then set it to `True`. Since s3 isn't a file system, paths are funky and the concept of folders doesn't really exist.
        F   )BucketMaxKeysPrefixContents Tr   Key)r   parse_s3_url_get_s3_resourcemetaclientlist_objects_v2get
exceptionsNoSuchBucketlen
startswith)r   r   bucket_nameprefixs3	s3_clientobjs r.   G/home/ubuntu/.local/lib/python3.10/site-packages/nemo/utils/s3_utils.pys3_path_exists;   s   "zS3Utils.s3_path_existsNc                 C   s,   t jdd}t | \}}|j||d d S )NT
get_client)r   r   )r   r    r   delete_object)r   r,   bucketkeyr.   r.   r/   remove_objectS   s   zS3Utils.remove_objectchunk_size_MBmax_concurrencyc           
      C   s   t  }tjdd}t| \}}|t }t||d}t }	t||||| t	
d|  d| d| dt |	 dd		 |d
 |S )NTr1   multipart_chunksizer8   Time elapsed downloading z  to file stream with chunk_size=MB and max_concurrency=: .2f secondsr   )r   r   r    r   MBr   timeperf_counter_download_fileobj_with_retryr   infoseek)
r   r7   r8   bytes_bufferr,   r4   r5   
chunk_sizeconfig
start_timer.   r.   r/   download_s3_file_to_streamY   s    
z"S3Utils.download_s3_file_to_stream	file_pathc           
      C   s   t jdd}t | \}}|t }t||d}td|  d| d| d|  t }	t	||||| td|  d| d| d	| d
t |	 dd d S )NTr1   r9   zDownloading z to  with chunk_size=zMB and max_threads=r;   r<   r=   r>   r?   )
r   r    r   r@   r   r   rD   rA   rB   _download_file_with_retry)
r   rK   r7   r8   r,   r4   r5   rG   rH   rI   r.   r.   r/   download_s3_file_to_pathn   s    z S3Utils.download_s3_file_to_pathrF   c           
      C   s   t jdd}t |\}}|t }t||d}| d t }	t|| ||| t	
d| d| d| dt |	 d	d
	 d S )NTr1   r9   r   z'Time elapsed uploading bytes buffer to rL   r<   r=   r>   r?   )r   r    r   r@   r   rE   rA   rB   _upload_fileobj_with_retryr   rD   )
rF   r   r7   r8   r,   r4   r5   rG   rH   rI   r.   r.   r/   upload_file_stream_to_s3   s   
z S3Utils.upload_file_stream_to_s3c                 C   s   t j| }|dksJ d|  tjdd}t|\}}|t }	t|	|	|d}
t	 }t
|| |||
 |rBt j| rBt |  td|  d|t dd	| d
| d| dt	 | dd d S )Nr   zfile size is zero, Tr1   )multipart_thresholdr:   r8   zTime elapsed uploading file z	 of size z.1fzGB to rL   r<   r=   r>   r?   )ospathgetsizer   r    r   r@   r   rA   rB   _upload_file_with_retryexistsremover   rD   GB)rK   r   r7   r8   remove_file
total_sizer,   r4   r5   rG   rH   rI   r.   r.   r/   upload_file   s&   
 zS3Utils.upload_fileT	base_pathsuffixreturn_key_onlyprofilecredsc                    s   t ||}t | \}}t }||}	t|	|d}
td|  dt | dd  r:t	t
 fdd|
}
|rCdd	 |
D S d
d	 |
D S )z
        Returns a list of keys that have the specified suffix
        :param base_path: the root of search
        :param suffix: the suffix to match, case sensitive
        :return: list of keys matching the suffix, relative to the base_path
        )	s3_bucket	s3_prefixz,Time elapsed reading all objects under path r=   r>   r?   c                    s   | j  S N)r5   endswith)or]   r.   r/   <lambda>   s    z0S3Utils.find_files_with_suffix.<locals>.<lambda>c                 S   s   g | ]}|j qS r.   )r5   .0re   r.   r.   r/   
<listcomp>   s    z2S3Utils.find_files_with_suffix.<locals>.<listcomp>c                 S   s   g | ]
}t |j|jqS r.   )r   r   r)   r5   rh   r.   r.   r/   rj      s    )r   r    r   rA   rB   r   _scan_objects_with_retryr   rD   listfilter)r\   r]   r^   r_   r`   r+   r)   r*   rI   r4   objects_listr.   rf   r/   find_files_with_suffix   s   
zS3Utils.find_files_with_suffixr2   c                 C   s   t jjdddi|}| d ur|d urtd| d ur'tj| djd|d}n)|d ur>t jd|d |d |d	 |d
}n|sIt jd|dn|jd|d}|rV|jjS |S )Nmax_pool_connections   z5Please provide profile or creds or neither, not both.)profile_namer+   )rH   AccessKeyIdSecretAccessKeySessionToken)aws_access_key_idaws_secret_access_keyaws_session_tokenrH   r.   )	botocorerH   Config
ValueErrorboto3Sessionresourcer!   r"   )r_   r`   r2   sessionrH   r+   r.   r.   r/   r       s$   "	zS3Utils._get_s3_resources3_urlc                 C   s6   t jd| t jd}|d u rdS | d | d fS )Nzs3://([^/]+)/(.*))flags)NNr   r   )rematchUNICODEgroups)r   r   r.   r.   r/   r      s   zS3Utils.parse_s3_urlc                 C   s
   t | |S rc   )r   )r4   r5   r.   r.   r/   r      s   
zS3Utils.build_s3_urlrS   c                 C   s   t | S rc   )r   )rS   r.   r.   r/   r     s   zS3Utils.is_s3_urlc                 C   s   t d| }|r|dS | S )a'  
        Use regex to find the pattern up to "-step=900-"
        s3://path/to/checkpoints/tp_rank_00_pp_rank_000/megatron_gpt--step=900-validation_loss=6.47-consumed_samples=35960.0-last.ckpt
        should return s3://path/to/checkpoints/tp_rank_00_pp_rank_000/megatron_gpt--step=900-
        z(.*step=\d+-)r   )r   searchgroup)rS   r   r.   r.   r/   parse_prefix_with_step  s   
zS3Utils.parse_prefix_with_step)F)NTNN)__name__
__module____qualname____doc__staticmethodstrboolr0   r6   DEFAULT_CHUNK_SIZE_MBDEFAULT_MAX_READ_CONCURRENCYintr   rJ   rN   DEFAULT_MAX_WRITE_CONCURRENCYrP   r[   r   ry   credentialsCredentialsr   ro   r    r   r   r   r   r   r.   r.   r.   r/   r   0   s      r   c                 C   s   | j j|d}t|S )N)r   )objectsrm   rl   )ra   rb   r   r.   r.   r/   rk     s   rk   c                 C   s   | j j}| j j}| d| }td| d|   t| }d|v s*d|v s*d|v r1td dS trFt| t	j
jrFtd	|    dS t| trdtd
| j  | jr^| jd d nd}|dv S td dS )z
    This function checks if the error is due to slowdown or is throttling related.
    If so, returns true to allow tenacity to retry the upload/download to S3.
    .zCaught exception of type r=   z<Code>SlowDown</Code>z<Code>RequestTimeout</Code>z<Code>InternalError</Code>z/Identified the Retriable Error retrying the jobTz&Caught awscrt.exceptions.AwsCrtError: z!Caught ClientError, response is: ErrorCodeN)SlowDownRequestTimeoutInternalErrorz)Non Retriable Error - Terminating the jobF)	__class__r   r   r   errorr   rD   crt_available
isinstanceawscrtr%   AwsCrtError__repr__r   response)	exception
class_namemodule_namefull_class_namemessage
error_coder.   r.   r/   is_slow_down_error  s&   


r   r      )
multiplierminmaxx   )waitstopr
   before_sleepr4   r5   rF   rH   c                 C      | j ||||d d S N)rz   )download_fileobj)r,   r4   r5   rF   rH   r.   r.   r/   rC   =  s   	rC   rK   c                 C   r   r   )download_file)r,   r4   r5   rK   rH   r.   r.   r/   rM   I     rM   c                 C   r   r   )upload_fileobj)r,   rF   r4   r5   rH   r.   r.   r/   rO   S  r   rO   c                 C   r   r   )r[   )r,   rK   r4   r5   rH   r.   r.   r/   rU   ]  r   rU   rc   )1rR   r   rA   ior   pathlibr   typingr   r   r   r|   ry   boto3.s3.transferr   botocore.exceptionsr   tenacityr	   r
   r   r   r   
nemo.utilsr   nemo.utils.s3_dirpath_utilsr   r   r   s3transfer.crt
s3transferr   ImportErrorer@   rX   SHARED_MEM_DIRr   r   r   r   rk   r   ERRORr   rC   rM   rO   rU   r.   r.   r.   r/   <module>   s    f!



"