o
    }oi2                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlZd dlmZ d d	lmZ d d
lmZmZmZmZmZ G dd deZdeddfddZdd Z dd Z!dd Z"dS )    N)ProcessPoolExecutor)BytesIO)get_start_method)Path)NamedTemporaryFile)AnyCallableDictOptionalUnion)CheckpointIO)logging)DEFAULT_CHUNK_SIZE_MBDEFAULT_MAX_READ_CONCURRENCYDEFAULT_MAX_WRITE_CONCURRENCYSHARED_MEM_DIRS3Utilsc                	       s   e Zd ZdZeeedfdef fddZe	dd Z
ded	ed
efddZded	ed
efddZdd Z	ddeeef d	eeef dee d
dfddZdd fd	eeef dee d
eeef fddZd	eeef d
df fddZdddZ  ZS ) S3CheckpointIOzsA custom S3CheckpointIO module that supports checkpoint reading/writing with s3 when filepath
    is a s3 url.
    Fdirpathc              
      s   t |std| d|| _|| _|| _|| _	 g | _| jrk| jr(t	ddnd| _
t dkr;tj du r;tdtd	 | j
t}z|  td
 W n tyg } z
td|  |d}~ww g | _t   dS )a  
        Initialize the transfer configuration with custom values.

        This method overrides the default TransferConfig values in boto3.
        See https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/s3/transfer.html#TransferConfig

        Args:
            chunk_size_MB (int, optional): The size of chunks to use when transferring files.
                Default is 64 (MB).
            max_read_concurrency (int, optional): The maximum number of threads that will be making
                requests to perform a download. Default is 15.
            max_write_concurrency (int, optional): The maximum number of threads that will be making
                requests to perform an upload. Default is 10.
            async_checkpointing (bool, optional): Uses a ProcessPoolExecutor to do the main saving logic.
                This feature should be used with save_top_k as it's possible a previous checkpoint is removed while
                the current checkpoint write fails.
        z6Error attempting to initialize an S3CheckpointIO when zL is not an S3 url. Please use TorchCheckpointIO when using a non-S3 dirpath.   )max_workersNforkTz\torch.cuda should not be initialized when checkpointing subprocess is created by fork methodz.Creating asynchronous checkpointing subprocessz9Asynchronous heckpointing subprocess created successfullyzCFailed to create asynchronous checkpointing subprocess, exception: )r   	is_s3_urlAssertionErrorchunk_size_MBmax_read_concurrencymax_write_concurrency_async_checkpointing_temp_filesasync_checkpointingr   	_executorr   torchcudais_initialized	Exceptionr   infosubmit
dummy_funcresulterror_futuressuper__init__)selfr   r   r   r   r   futuree	__class__ Y/home/ubuntu/.local/lib/python3.10/site-packages/nemo/utils/callbacks/s3_checkpoint_io.pyr,   *   s8   


zS3CheckpointIO.__init__c                 C   s   | j S N)r   )r-   r2   r2   r3   r   g   s   z"S3CheckpointIO.async_checkpointing
checkpointpathreturnc                 C   s\   t  }ttdd}t|| td|j d| dt  | ddtj	
   ~|jS )zW
        Returns:
            filename of the temporary file in shared memory.
        Fdirdeletez'Time elapsed saving checkpoint dict to z for : .2f seconds, rank )timeperf_counterr   r   r!   saver   r%   namedistributedget_rank)r-   r5   r6   
start_timetempfiler2   r2   r3   _serialize_checkpoint_to_shmk   s   ,z+S3CheckpointIO._serialize_checkpoint_to_shmc              	   C   sP   t  }t }t|| t  | }td| d|ddtj   ~|S )zH
        Returns:
            The bytestring of the checkpoint.
        z1Time elapsed saving checkpoint dict to bytes for r;   r<   r=   )	r>   r?   r   r!   r@   r   r%   rB   rC   )r-   r5   r6   ssbytesttr2   r2   r3   _serialize_checkpoint_to_bytesy   s   z-S3CheckpointIO._serialize_checkpoint_to_bytesc                 C   s   | j sdS t }g }g }| j D ]}|d  r|| q|| q|D ]:}z|d   W n  tyP } ztd|d  d|d  d|  |d}~ww z
| j	
|d  W q&   Y q&|| _ tdt | d	d
 dS )aV  
        self._future is a list of tuples of form (future, destination path, source path)
        This function checks the result of all the futures, and updates the self._futures list appropriately.
        It also updates the list of self._temp_files, which is used to clean up leaked temporary files in SHARED_MEM during teardown.
        Nr   zFailed to upload    z to r   z, exception: z0Time elapsed checking uploading future results: r<   z seconds)r*   r>   r?   doneappendr(   r$   r   r)   r   removedebug)r-   rD   done_futuresin_progress_futuresitemr/   r2   r2   r3   _check_uploading_results_so_far   s2   
$z.S3CheckpointIO._check_uploading_results_so_farNstorage_optionsc                 C   s   t jtr| ||}| j| d}n| ||}d}| jrc| 	  t
d| dtj   |rL| jt||| j| jd}| j|||f d S | jt||| j| j}| j||df d S t
d| dtj   |rt||| j| jd | j| d S t||| j| j d S )NTFzUploading checkpoint to z in asynchronous mode, rank rH   z in synchronous mode, rank )osr6   existsr   rF   r   rM   rJ   r   rS   r   r%   r!   rB   rC   r    r&   _upload_file_to_s3r   r   r*   _upload_bytes_to_s3rN   )r-   r5   r6   rT   	localfilesaved_as_filerH   r.   r2   r2   r3   save_checkpoint   s.   zS3CheckpointIO.save_checkpointc                 C   s   | S r4   r2   )storagelocr2   r2   r3   <lambda>   s    zS3CheckpointIO.<lambda>map_locationc              
   C   s   t jtrCttdd-}td| d|j dtj	
   tj||j| j| jd t|j}W d    |S 1 s<w   Y  |S tj|| j| jd}t|}|S )NTr8   zLoading checkpoint z# into a temp file in shared memory z, rank )s3_path	file_pathr   max_concurrency)r`   r   rb   )rU   r6   rV   r   r   r   r%   rA   r!   rB   rC   r   download_s3_file_to_pathr   r   loaddownload_s3_file_to_stream)r-   r6   r_   rE   r5   file_streamr2   r2   r3   load_checkpoint   s*   


zS3CheckpointIO.load_checkpointc                    s(   t |rt | d S t | d S r4   )r   r   remove_objectr+   remove_checkpoint)r-   r6   r0   r2   r3   ri      s   
z S3CheckpointIO.remove_checkpointc                 C   s   t j }| jr+td|  t }| jj	dd tdt | dd|  	 | j
r`| j
D ]/}tj|r_zt| W q2 ty^ } ztd| d|  W Y d }~q2d }~ww q2d S d S )	Nz8Entering teardown, waiting for all jobs to finish, rank T)waitzexecutor shut down after r<   r=   z#Error occurred while deleting file r;   )r!   rB   rC   r   r   r%   r>   r?   r    shutdownr   rU   r6   rV   rN   r$   )r-   rankrD   tfiler/   r2   r2   r3   teardown   s&   
 
"zS3CheckpointIO.teardownr4   )r7   N)__name__
__module____qualname____doc__r   r   r   strr,   propertyr   r	   rF   r   rJ   rS   r   r   r   r
   r[   r   rg   ri   rn   __classcell__r2   r2   r0   r3   r   %   sD    =
"


"


r   filepathr7   c                 C   sf   t | r/t | }td|  t j|ddd}|D ]}td| d|   t | qdS dS )a  
    before saving to s3, clean up any existing object with the same prefix megatron_gpt+step_count
    e.g. before we save "megatron_gpt--step=1400-validation_loss=6.32-consumed_samples=55920.0-last.ckpt"
    we need to clean up "megatron_gpt--step=1400-validation_loss=xxx-consumed_samples=yyy-last.ckpt"
    so that in case later we need to resume from step 1400, it has a single checkpoint file at step 1400
    z0Looking for conflicting checkpoint under prefix z	last.ckptF)	base_pathsuffixreturn_key_onlyz"Cleaning up conflicting last ckpt z before saving N)r   r   parse_prefix_with_stepr   r%   find_files_with_suffixrh   )rv   prefix_with_stepconflict_last_ckpts	last_ckptr2   r2   r3    _clean_up_conflicting_checkpoint   s   

r   c              
   C   s>   zt | t| |||| W d S  ty } z|d }~ww r4   )r   r   upload_filer$   )rY   r6   r   r   remove_filer/   r2   r2   r3   rW     s   rW   c              
   C   s<   zt | t| ||| W d S  ty } z|d }~ww r4   )r   r   upload_file_stream_to_s3r$   )rH   r6   r   r   r/   r2   r2   r3   rX     s   rX   c                   C   s   t d d S )Ng{Gz?)r>   sleepr2   r2   r2   r3   r'      s   r'   )#rU   r>   concurrent.futuresr   ior   multiprocessingr   pathlibr   rE   r   typingr   r   r	   r
   r   r!   )lightning.fabric.plugins.io.checkpoint_ior   
nemo.utilsr   nemo.utils.s3_utilsr   r   r   r   r   r   rs   r   rW   rX   r'   r2   r2   r2   r3   <module>   s$   	 X