o
    ߗi0e                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlm Z  d dl!m"Z"m#Z# d d	l$m%Z% d d
l&m'Z'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z= g dZ>dZ?e@eAd< eG dd dZBeG dd dZCdZDde@fddZEG dd deZFG dd deFZGG dd  d eFZHd!e2deIfd"d#ZJd$eId%ee2 deee2  fd&d'ZKd(ejLd)eejMej f d*e2d+e@de9f
d,d-ZNd.ed/ejOd0ejOd1e1d2eId3ePd4eIddfd5d6ZQG d7d8 d8eZRG d9d: d:eRZSG d;d< d<e8ZTG d=d> d>e7ZUG d?d@ d@eTe5ZVdS )A    N)ABCabstractmethod)contextmanager)	dataclass)Path)AnyCallablecastDict	GeneratorIOIterableIteratorListOptionalTupleUnion)Tensor)_get_available_device_type_get_device_module)narrow_tensor_by_index)MetadataMetadataIndexSTATE_DICT_TYPEStorageMeta)LoadItemTypeLoadPlanLoadPlannerReadItemSavePlanSavePlanner	WriteItemWriteItemType)BlockingAsyncStager)StorageReaderStorageWriterWriteResult)_create_file_view)Future)FileSystemWriterFileSystemReader
FileSystemFileSystemBase	.metadata_metadata_fnc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz#This is the per entry storage info.relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r:   r:   e/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/filesystem.pyr/   B   s
   
 r/   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r3   r4   r5   r7   r8   r:   r:   r:   r;   r<   K   s   
 r<   z.distcpreturnc                   C   s   t t S N)r7   uuiduuid4r:   r:   r:   r;   _generate_uuidS   s   rB   c                   @   sT   e Zd ZedededdfddZedddZedee	e
jef  fd	d
ZdS )_TensorLoadersizeobjr>   Nc                 C      d S r?   r:   selfrD   rE   r:   r:   r;   addX      z_TensorLoader.addc                 C   rF   r?   r:   rH   r:   r:   r;   start_loading\   rJ   z_TensorLoader.start_loadingc                 C   rF   r?   r:   rK   r:   r:   r;   values`   rJ   z_TensorLoader.valuesr>   N)r3   r4   r5   r   r9   objectrI   rL   r   r   torchr   rM   r:   r:   r:   r;   rC   W   s    "rC   c                   @   sZ   e Zd ZdeddfddZdededdfdd	Zdd
dZde	e
ejef  fddZdS )_SerialCpuLoaderresolve_funr>   Nc                 C   s   || _ g | _d S r?   )rR   items)rH   rR   r:   r:   r;   __init__f   s   
z_SerialCpuLoader.__init__rD   rE   c                 C   s   | j ||f d S r?   )rS   appendrG   r:   r:   r;   rI   j   s   z_SerialCpuLoader.addc                 C   rF   r?   r:   rK   r:   r:   r;   rL   m      z_SerialCpuLoader.start_loadingc                 c   sR    | j D ]"\}}| | }| }|  | kr!| }||fV  qd S r?   )rS   rR   detachcpustoragerD   numelclonerH   _rE   tensorr:   r:   r;   rM   p   s   z_SerialCpuLoader.valuesrN   )r3   r4   r5   r   rT   r9   rO   rI   rL   r   r   rP   r   rM   r:   r:   r:   r;   rQ   e   s
    
 rQ   c                	   @   s   e Zd Z		ddedeej deddfddZe	de
fd	d
Zdeeejef  fddZdddZdeeejef  fddZdededdfddZdddZdeeejef  fddZdS )_OverlappingCpuLoaderN@B rR   streaminflight_threshholdr>   c                 C   s   || _ g | _|| _d| _t | _d| _d| _|r|j	nt
 | _	t| j	| _ttjj|p0| j | _| j| j krF| j| j  d S d S )Nr   F)rR   rS   rb   in_flight_datacollectionsdequecurrent_itemsidxstarteddevice_typer   r   device_moduler	   rP   cudaStreamcurrent_streamra   wait_stream)rH   rR   ra   rb   r:   r:   r;   rT   }   s    
z_OverlappingCpuLoader.__init__c                 C   s   | j t| jkS r?   )rg   lenrS   rK   r:   r:   r;   _done   s   z_OverlappingCpuLoader._donec                 C   sl   g }| j | jkr| j  | j | jkr4| j }|  j |d  |d   8  _ || | j | jks|S Nr   )	rc   rb   ra   synchronizerf   popleftrZ   element_sizerU   )rH   drainedvalr:   r:   r;   _drain   s   

"
z_OverlappingCpuLoader._drainc                 C   s.  | j | j | jsu| j| jk r}| j| j \}}|  jd7  _| | }|j	j
| jkr6|jddd}n|j	t	dkrO|  | |j krO| }| j||f |  j| |  7  _| js| j| jk sW d    d S W d    d S W d    d S W d    d S 1 sw   Y  d S )N   rX   T)devicenon_blocking)rj   ra   rp   rc   rb   rS   rg   rR   rW   ry   typeri   torP   untyped_storagerD   rZ   itemsizer[   rf   rU   rt   r\   r:   r:   r;   _refill   s4   
"z_OverlappingCpuLoader._refillc                 C   s(   | j sJ t| jdkr| j  | jS rq   )rp   ro   rf   ra   rr   rK   r:   r:   r;   _finish   s   

z_OverlappingCpuLoader._finishrD   rE   c                 C   s"   | j rtd| j||f d S )Nz&cannot add items after loading started)rh   RuntimeErrorrS   rU   rG   r:   r:   r;   rI      s   z_OverlappingCpuLoader.addc                 C   s0   | j rd S d| _ | jjtdd |   d S )NTr   key)rh   rS   sortoperator
itemgetterr   rK   r:   r:   r;   rL      s
   z#_OverlappingCpuLoader.start_loadingc                 c   sB    |    | js|  }|   |E d H  | jr|  E d H  d S r?   )rL   rp   rw   r   r   )rH   ru   r:   r:   r;   rM      s   
z_OverlappingCpuLoader.values)Nr`   rN   )r3   r4   r5   r   r   rP   rl   r9   rT   propertyboolrp   r   r   r   rO   rw   r   r   r   rI   rL   r   rM   r:   r:   r:   r;   r_   |   s(    



 r_   itemc                 C   sB   d}| j d us	J | j jD ]}||9 }q| j jj}|tj| S Nrx   )tensor_datarD   
propertiesdtyperP   _utils_element_size)r   rD   sr   r:   r:   r;   
_item_size   s   

r   binsrS   c           	      C   s   | dkr|gS dd |D }dd |D }dd t | D }dd t | D }|jtdd t|D ]\}}|||   | q2|D ] }tt|tdd	d
 }|| | ||  t|7  < qB|S )Nrx   c                 S      g | ]
}|j tjkr|qS r:   r{   r"   BYTE_IO.0wir:   r:   r;   
<listcomp>       z+_split_by_size_and_type.<locals>.<listcomp>c                 S      g | ]
}|j tjkr|qS r:   r   r   r:   r:   r;   r      r   c                 S   s   g | ]}g qS r:   r:   r   r]   r:   r:   r;   r          c                 S   s   g | ]}d qS )r   r:   r   r:   r:   r;   r      r   T)r   reverser   r   )ranger   r   	enumeraterU   minr   r   )	r   rS   bytes_wtensor_wbucketsbucket_sizesir   rg   r:   r:   r;   _split_by_size_and_type   s   r   ra   data
write_itemstorage_keyc                 C   s   |   }|jtjkrt|tjsJ | |  nt|t	j
s"J |jt	dks,J t	|ttt |  |   | }t|j|t|||dS )NrX   )indexsize_in_bytesstorage_data)tellr{   r"   r   
isinstanceioBytesIOwrite	getbufferrP   r   ry   saver	   r   bytesr&   r   r/   )ra   r   r   r   r1   r2   r:   r:   r;   _write_item   s   
r   create_stream
file_queueresult_queueplannerrb   	use_fsyncthread_countc              	   C   s  z	 |  \}}}	tj }
tt|
d }|dkr/tj s#|r/| r/|dkr/t|j|d}nt	|j}dd |	D }|D ]
}|
t|| q=|  dd |	D }g }| |dM}|D ]}||}|t|||| q]| D ]\}}|js|J |t|||| qs|rz	t|  W n ty   t  Y nw W d    n1 sw   Y  || q tjy   Y d S w )	NTrx   r   )rb   c                 S   r   r:   r   r   r:   r:   r;   r   6  r   z+_write_files_from_queue.<locals>.<listcomp>c                 S   r   r:   r   r   r:   r:   r;   r   ;  r   wb)
get_nowaitrP   _C_get_privateuse1_backend_namegetattrrk   is_availabler_   resolve_datarQ   rI   r   rL   rU   r   rM   is_cpuosfsyncfilenoAttributeErrorsyncputqueueEmpty)r   r   r   r   rb   r   r   	file_namer   write_itemscustom_backend_namecustom_device_modloaderr   r   r   write_resultsra   r   r^   r:   r:   r;   _write_files_from_queue  sd   	



6r   c                   @   sJ  e Zd Zeedeeejf dede	e
jddf fddZedeeejf dedeeejf fdd	Zedeeejf d
eeejf ddfddZedeeejf deeejf fddZedeeejf ddfddZeedeeejf defddZedeeejf defddZedeeejf ddfddZdS )r,   pathmoder>   Nc                 C   rF   r?   r:   )rH   r   r   r:   r:   r;   r   V  s   zFileSystemBase.create_streamsuffixc                 C   rF   r?   r:   rH   r   r   r:   r:   r;   concat_path]     zFileSystemBase.concat_pathnew_pathc                 C   rF   r?   r:   rH   r   r   r:   r:   r;   renamec  r   zFileSystemBase.renamec                 C   rF   r?   r:   rH   r   r:   r:   r;   	init_pathi  rJ   zFileSystemBase.init_pathc                 C   rF   r?   r:   r   r:   r:   r;   mkdirm  rJ   zFileSystemBase.mkdircheckpoint_idc                 C   rF   r?   r:   clsr   r:   r:   r;   validate_checkpoint_idq  s   z%FileSystemBase.validate_checkpoint_idc                 C   rF   r?   r:   r   r:   r:   r;   existsv  rJ   zFileSystemBase.existsc                 C   rF   r?   r:   r   r:   r:   r;   rm_filez  rJ   zFileSystemBase.rm_file)r3   r4   r5   r   r   r   r7   r   PathLiker   r   IOBaser   r   r   r   r   classmethodr   r   r   r   r:   r:   r:   r;   r,   U  sJ    ( "r,   c                
   @   s*  e Zd Zedeeejf dedee	j
ddf fddZdeeejf dedeeejf fdd	Zdeeejf deeejf fd
dZdeeejf deeejf ddfddZdeeejf ddfddZedeeejf defddZdeeejf defddZdeeejf ddfddZdS )r+   r   r   r>   Nc                 c   sF    t t||}t tj|V  W d    d S 1 sw   Y  d S r?   )r	   r   openr   r   )rH   r   r   ra   r:   r:   r;   r     s   "zFileSystem.create_streamr   c                 C   s   t t|| S r?   )r	   r   r   r:   r:   r;   r     s   zFileSystem.concat_pathc                 C   s   t |ts	t|}|S r?   )r   r   r   r:   r:   r;   r     s   
zFileSystem.init_pathr   c                 C   s   t t|t t| d S r?   )r	   r   r   r   r:   r:   r;   r     s   zFileSystem.renamec                 C   s   t t|jddd d S )NT)parentsexist_ok)r	   r   r   r   r:   r:   r;   r     s   zFileSystem.mkdirr   c                 C   sR   t |trdS dt|v rdS t|jD ]}| r&tt|tjr& dS qdS )NTz://F)r   r   r7   r   r   r   accessW_OK)r   r   pr:   r:   r;   r     s   
z!FileSystem.validate_checkpoint_idc                 C   s   t t| S r?   )r	   r   r   r   r:   r:   r;   r     s   zFileSystem.existsc                 C   s   t t|  d S r?   )r	   r   unlinkr   r:   r:   r;   r     s   zFileSystem.rm_file)r3   r4   r5   r   r   r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r:   r:   r:   r;   r+     s:    
&
 r+   c                       s`  e Zd ZdZ					d,deeejf dedede	d	e	d
ede
de
ddf fddZd-deeejdf ddfddZdeddfddZdedefddZdee dee fddZdededeee  fddZd ed!eee  ddfd"d#Zdee fd$d%Zedeeejf fd&d'Zedeeejf fd(d)Zedeeejf defd*d+Z  Z S )._FileSystemWritera  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    Trx   逖 r   single_file_per_rank
sync_filesr   per_thread_copy_ahead	overwriteargskwargsr>   Nc           	         sJ   t    t | _| j|| _|| _|| _|| _|| _	t
 | _|| _dS )a  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)superrT   r+   fsr   r   r   r   r   r   rB   save_idr   )	rH   r   r   r   r   r   r   r   r   	__class__r:   r;   rT     s   

z_FileSystemWriter.__init__r   c                 C   s   |r	| j || _t | _d S r?   )r   r   r   rB   r   rH   r   r:   r:   r;   reset  s   z_FileSystemWriter.resetis_coordinatorc                 C   rF   r?   r:   )rH   r   r:   r:   r;   set_up_storage_writer  rV   z'_FileSystemWriter.set_up_storage_writerplanc                 C   sX   | j | j | j | jr*| jr!td| j d| jd |S td| jd|S )Nz#Detected an existing checkpoint in z#, overwriting since self.overwrite=z. Past version 2.5 of PyTorch, `overwrite` will default to False. Set this variable to True to maintain this functionality or False to raise when an existing checkpoint is found.z-Checkpoint already exists and self.overwrite=.)	r   r   r   r   metadata_pathr   warningswarnr   rH   r  r:   r:   r;   prepare_local_plan  s   z$_FileSystemWriter.prepare_local_planplansc                 C   s   dd t |D }|S )Nc                 S   s*   g | ]\}}t j|td | ddqS )__r]   r   )dataclassesreplacer<   )r   r   r  r:   r:   r;   r     s    z9_FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)r   )rH   r	  	new_plansr:   r:   r;   prepare_global_plan  s   z%_FileSystemWriter.prepare_global_planr   c              
      s\  |j d  fdd}t }| jr1t| j|jD ]}| }| j| j	|}|
|||f qn|jD ]}| }| j| j	|}|
|||gf q4t }	g }
td| jD ]}tjt| jj||	|| j| j| jfd}|  |
| qWt| jj||	|| j| j| jd |
D ]}|  qg }z	 ||	 7 }q tjy   t }|| | Y S w )Nr   c                     s   j    t }  d7  | S r   )r=   DEFAULT_SUFFIX)r   
file_countstorage_planr:   r;   gen_file  s   z._FileSystemWriter.write_data.<locals>.gen_filerx   )targetr   )r   r   r   r   rb   r   r   )r   r   Queuer   r   r   rS   r   r   r   r   r   	threadingThreadr   r   r   r   startrU   joinr   r   r(   
set_result)rH   r  r   r  r   bucketr   r   r   r   threadsr]   tresfutr:   r  r;   
write_data  sf   



z_FileSystemWriter.write_datametadataresultsc              	   C   s   i }|D ]}| dd |D  q||_|  |_tt| j| jt d}| j	|d(}t
|| | jrOz	t|  W n tyN   t  Y nw W d    n1 sYw   Y  | j| jrl| j| j | j|| j d S )Nc                 S   s   i | ]}|j |jqS r:   )r   r   )r   wrr:   r:   r;   
<dictcomp>A  s    z,_FileSystemWriter.finish.<locals>.<dictcomp>z.tmpr   )updater   storage_metar	   r   r   r   r   r.   r   pickledumpr   r   r   r   r   r   r   r  r   r   )rH   r"  r#  
storage_mdwr_listtmp_pathmetadata_filer:   r:   r;   finish>  s&   
	z_FileSystemWriter.finishc                 C   s   t | j| jdS )N)r   r   )r   r   r   rK   r:   r:   r;   r'  U     z_FileSystemWriter.storage_metac                 C   s   t t| j| jtS r?   )r	   r   r   r   r   r.   rK   r:   r:   r;   r  X  s   z_FileSystemWriter.metadata_pathc                 C      | j S )zT
        return the checkpoint_id that will be used to save the checkpoint.
        r   rK   r:   r:   r;   r   \     z_FileSystemWriter.checkpoint_idc                 C   
   t |S r?   r+   r   r   r:   r:   r;   r   c     
z(_FileSystemWriter.validate_checkpoint_id)TTrx   r   Tr?   )!r3   r4   r5   r6   r   r7   r   r   r   r9   r   rT   r   r  r   r  r   r  r    r(   r&   r!  r   r.  r   r   r'  r   r  r   r   r   __classcell__r:   r:   r   r;   r     sZ    	
 "

C&r   c                       s
  e Zd Zdeeejf ddf fddZdede	j
fddZdd	eeejdf ddfd
dZdededed fddZdefddZdededdfddZdedefddZdee dee fddZedeeejf fddZed	eeejf defddZ  ZS ) r*   r   r>   Nc                    s2   t    t | _| j|| _i | _t | _d S r?   )	r   rT   r+   r   r   r   r   rB   load_idr   r   r:   r;   rT   i  s
   
zFileSystemReader.__init__sinfoc                 C   s   t ||j|jS r?   )r'   r1   r2   )rH   filer8  r:   r:   r;   _slice_filep  r/  zFileSystemReader._slice_filer   c                 C   s$   i | _ |r| j|| _t | _d S r?   )r   r   r   r   rB   r7  r   r:   r:   r;   r   s  s   zFileSystemReader.resetr  r   c                 C   sx  i }|j D ]}| j|j }|j}||g | q|  D ]\}}| j| j|}	| j	|	dx}
|D ]m}| j|j }| 
|
|}|jtjkr]t||j}|d ||| q4tttjttt |ddd}t||j|j}|| }| | ksJ d|j d|  d|  | | |!|| q4W d    n1 sw   Y  qt" }|#d  |S )	Nrbr   rX   T)map_locationweights_onlyzreq z mismatch sizes z vs )$rS   r   storage_indexr0   
setdefaultrU   r   r   r   r   r:  r{   r   r   r   r   readr2   seek
load_bytesr	   r   rP   loadr   r   r   storage_offsetslengthsresolve_tensorrW   rD   copy_commit_tensorr(   r  )rH   r  r   per_file	read_itemitem_mdr   r0   reqsr   ra   req
file_slice
read_bytesr^   target_tensorr   r:   r:   r;   	read_datay  sL   




zFileSystemReader.read_datac                 C   sn   | j | jd}| j |d}t|}W d    n1 sw   Y  t|dd d u r0t |_| j	|j_	|S )Nr-   r;  r'  )
r   r   r   r   r(  rC  r   r   r'  r7  )rH   r   r-  r"  r:   r:   r;   read_metadata  s   
zFileSystemReader.read_metadatar"  r   c                 C   s   |j | _ | j d usJ d S r?   r  )rH   r"  r   r:   r:   r;   set_up_storage_reader  s   z&FileSystemReader.set_up_storage_readerc                 C      |S r?   r:   r  r:   r:   r;   r    rV   z#FileSystemReader.prepare_local_planr	  c                 C   rT  r?   r:   )rH   r	  r:   r:   r;   r    rV   z$FileSystemReader.prepare_global_planc                 C   r0  )zT
        return the checkpoint_id that will be used to load the checkpoint.
        r1  rK   r:   r:   r;   r     r2  zFileSystemReader.checkpoint_idc                 C   r3  r?   r4  r   r:   r:   r;   r     r5  z'FileSystemReader.validate_checkpoint_idr?   )r3   r4   r5   r   r7   r   r   rT   r/   r   r   r:  r   r   r   r(   rQ  r   rR  r   rS  r  r   r  r   r   r   r   r6  r:   r:   r   r;   r*   h  s      ,&r*   c                       sl   e Zd ZdZ						ddeeejf deded	e	d
e	dededdfddZ
dedef fddZ  ZS )r)   r   Trx   r   Fr   r   r   r   r   cache_staged_state_dictr   r>   Nc              	   C   s*   t j| ||||||d tj| |d dS )aM  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            cache_staged_state_dict: Whether to cache the staged state_dict. This option decreases staging latency
                at the cost of increases memory usage. Additionally, if this parameter is set to True, it's the expectation
                that the stager is maintained and re-used for multiple dcp.async_save calls. Default to False.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        )r   r   r   r   r   r   )rU  N)r   rT   r#   )rH   r   r   r   r   r   rU  r   r:   r:   r;   rT     s   	
zFileSystemWriter.__init__
state_dictc                    s   d| _ t |S )zOverride of AsyncStager.stager   )r   r   stage)rH   rV  r   r:   r;   rW    s   zFileSystemWriter.stage)TTrx   r   FT)r3   r4   r5   r6   r   r7   r   r   r   r9   rT   r   rW  r6  r:   r:   r   r;   r)     s4    	
(r)   )Wrd   r  r   r   r   r(  r   r  r@   r  abcr   r   
contextlibr   r   pathlibr   typingr   r   r	   r
   r   r   r   r   r   r   r   r   rP   r   torch._utilsr   r   torch.distributed._shard._utilsr   %torch.distributed.checkpoint.metadatar   r   r   r   $torch.distributed.checkpoint.plannerr   r   r   r   r   r    r!   r"   $torch.distributed.checkpoint.stagingr#   $torch.distributed.checkpoint.storager$   r%   r&   "torch.distributed.checkpoint.utilsr'   torch.futuresr(   __all__r.   r7   r8   r/   r<   r  rB   rC   rQ   r_   r9   r   r   r   r   r   r  r   r   r,   r+   r   r*   r)   r:   r:   r:   r;   <module>   s   
8(
Z"

D*/ ;^