o
    ߗi2                  	   @   sP  U d dl Z d dlmZmZmZmZmZ d dlZd dlm	Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ dd	lmZmZmZmZmZmZmZ dd
lmZmZm Z m!Z!m"Z"m#Z# ddl$m%Z%m&Z& dgZ'ee( e)d< dej*defddZ+dedefddZ,dedede!fddZ-de(dede"fddZ.de(dedede"fddZ/de(dej*de"fddZ0de(defd d!Z1d"d# Z2d$d% Z3de(d&ed'ee dee fd(dZ4d)ede fd*d+Z5de(d,edee" fd-d.Z6dedefd/d0Z7dej*dee fd1d2Z8de(d3ed4edee fd5d6Z9d)ee(ef defd7d8Z:d9ed:ed;ed<efd=d>Z;dS )?    N)AnyCallablecastDictList)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__tensorreturnc                 C   s$   t tdgt|   |  dS )Nr   offsetssizes)r   torchSizelensize)r    r'   j/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/planner_helpers.py_create_chunk_from_tensor'   s   r)   shard_mdc                 C   s   t t| jt| jdS Nr    )r   r#   r$   shard_offsetsshard_sizes)r*   r'   r'   r(   _chunk_for_shard-   s   

r.   sharded_tensorc                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memorychunk
propertiesr&   )metadatatensor_propertiesr   r0   r1   r2   r3   r4   r   r.   r&   )r/   r*   shard_propertiesr7   r'   r'   r(   _sharded_tensor_metadata4   s   
r;   fqnc              	   C   sb   t |j|j|j\}}t|t|}}tt| |tj	t
t||dt| | ddS )Nr    r5   indextypetensor_data)r   shapedevice_mesh
placementsr#   r$   r   r   r   SHARDr   r   r   create_from_tensorto_localr&   )r<   r   r"   r!   r'   r'   r(   _create_write_items_for_dtensorH   s    rG   c                 C   s(   t |j}tt| |tjt||dS )Nr=   )r#   r$   r,   r   r   r   rD   r;   )r<   r/   r*   r!   r'   r'   r(   _create_write_item_for_shard\   s   rH   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   r    r5   r=   )r#   r$   r%   r&   r   r   r   TENSORr   r   r   rE   )r<   r   r!   r'   r'   r(   _create_write_item_for_tensorg   s   rJ   bytesc                 C   s   t t| tjdS )N)r>   r?   )r   r   r   BYTE_IO)r<   rK   r'   r'   r(   _create_write_item_for_bytesiot   s   rM   c              	   C   s.   t tj| t|f|t|ft|fdS N)r?   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   rL   r#   r$   rO   dest_offsetrQ   storage_offsetlengthr'   r'   r(   _create_read_item_for_byteio{   s   


rX   c              	   C   s(   t tj| t||t|t|dS rN   )r   r   rI   r#   r$   rO   rP   rQ   rR   rS   r'   r'   r(   _create_read_item_for_tensor   s   rZ   checkpoint_mdlocal_chunksc                 C   s   g }t |D ]L\}}t |jD ]B\}}t||sqg }g }	g }
t||dD ]\}}}}|| |	| |
| q%|tt| |j||	t| |j|||
d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    )saved_shardcurrent_shardrY   )	enumeratechunksr   r   appendrZ   r   r!   )r<   r[   r\   
read_itemsidxshardstorage_idx
storage_mdrR   rP   rS   _dimoffset_for_saved_tensoroffset_for_current_tensorrW   r'   r'   r(   r      s<   



state_dictc                    s   g }|   D ]?\ ttr|t  qttr.| fdd jD  qtt	j
r=|t  q|t  qt|S )Nc                 3   s    | ]	}t  |V  qd S )N)rH   ).0r*   r<   objr'   r(   	<genexpr>   s
    

z5_create_default_metadata_only_plan.<locals>.<genexpr>)items
isinstancer
   ra   rG   r	   extendr8   shards_metadatar#   TensorrJ   rM   r   )rj   requestsr'   rl   r(   "_create_default_metadata_only_plan   s   


ru   objectc                    s\   t dr S ttr fdd D S ttjr(t gS t gS )N__create_write_items__c                    s   g | ]	}t  |jqS r'   )rH   r8   rk   rd   r<   rv   r'   r(   
<listcomp>       z'_create_write_items.<locals>.<listcomp>)	hasattrrw   rp   r	   local_shardsr#   rs   rJ   rM   ry   r'   ry   r(   _create_write_items   s   

r~   c                 C   s8   t | j| j| j\}}t|t|}}t||dS r+   )r   rA   rB   rC   r#   r$   r   )r   r"   r!   r'   r'   r(   _create_chunk_from_dtensor   s   r   c                 C   sb   t | dr|  }|S t| trdd |  D }|S t| tjr(t| g}|S tdt	|  )N__create_chunk_list__c                 S   s   g | ]}t |jqS r'   )r.   r8   rx   r'   r'   r(   rz      s    
z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
r|   r   rp   r	   r}   r#   rs   r)   
ValueErrorr?   )r   r\   r'   r'   r(   _create_chunk_list   s    


r   mdrm   c              
   C   sx   t |ts.zt|}W n ty' } ztd|  ddt|  |d }~ww t| ||S tt| dt| dddgS )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rT   )rp   r   r   r   r?   r   rX   r   )r<   r   rm   r\   exr'   r'   r(   _create_read_items
  s,   

r   c                 C   s>   dt fdd}dtfdd}dtjfdd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    valuec                 S   st   t | dd }|tdkr8tj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S )Ndevicemetar   )rB   rC   rA   stride)getattrr#   r   distdistributed_c10d_get_pg_default_devicer?   r   r   current_device
empty_likerF   r
   
from_localrB   rC   r&   r   )r   r   device_typenew_local_tensordtensorr'   r'   r(   dtensor_func&  s    z&_init_state_dict.<locals>.dtensor_funcc                 S   s2   t | dd }|tdkrtdt|  d| S )Nr   r   zFound unsupported type z for meta device loading.)r   r#   r   RuntimeErrorr?   )r   r   r'   r'   r(   sharded_tensor_func;  s   z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sP   t | dd }|tdkr&tj j}ttjt|	 }tj
| |d}|S | S )Nr   r   r   )r   r#   r   r   r   r   r?   r   r   r   r   )r   r   r   r   r'   r'   r(   tensor_funcD  s   z%_init_state_dict.<locals>.tensor_funcN)r
   r   r#   rs   _iterate_state_dict)rj   r   r   r   r'   r'   r(   _init_state_dict!  s   	r   iter_objectr   r   r   c                    s   t | tr	 | S t | tr| S t | tjr| S t | ttttt	j
fs+| du r-| S t | trF|  D ]\}}t| | |< q6| S t | ttfrc fdd| D }t | trat|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]	}t | qS r'   )r   )rk   vr   r   r   r'   r(   rz   ~  r{   z'_iterate_state_dict.<locals>.<listcomp>)rp   r
   r	   r#   rs   intfloatstrrK   ioBytesIOdictro   r   listtuple)r   r   r   r   keyr   retr'   r   r(   r   X  s0   




r   )<r   typingr   r   r   r   r   r#   torch.distributeddistributedr   torch._utilsr   !torch.distributed._shard.metadatar   'torch.distributed._shard.sharded_tensorr	   torch.distributed.tensorr
   torch.distributed.tensor._utilsr   r8   r   r   r   r   r   r   r   plannerr   r   r   r   r   r   
reshardingr   r   r   r   __annotations__rs   r)   r.   r;   rG   rH   rJ   rM   rX   rZ   r   ru   r~   r   r   r   r   r   r'   r'   r'   r(   <module>   sx   
$ 	


77