o
    ߗiX3                     @   s   d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZm Z m!Z!mZm"Z" d dl#m$Z$m%Z% d dl&m'Z'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 ee9eeee:  ee: f f Z;dgZ<d/de:de9de9fddZ=	d0deej> defddZ?de	j@deAfdd ZB	d/d!ed"ee: de9de	j@fd#d$ZCd%e!dee;eej> f fd&d'ZDG d(d) d)eZE	d0d*e!d+e9d,e,d-ee% de!f
d.dZFdS )1    N)castDictListOptionalSequenceTupleUnion)_get_device_module)ShardedTensor)TensorProperties)Shard)ChunkShardingSpec)unflatten_state_dict)DefaultLoadPlanner)BytesStorageMetadataChunkStorageMetadataMetadataMetadataIndexSTATE_DICT_TYPEr   TensorStorageMetadata)LoadPlanLoadPlanner)_create_read_items create_read_items_for_chunk_list)load_state_dict)StorageReader)_element_wise_add_element_wise_sub_normalize_device_info)_get_default_group)_create_chunk_sharded_tensor)_remote_device)DTensor!load_sharded_optimizer_state_dictcudaglobal_rankdevice_typereturnc                 C   s2   |dkrdS t |}| rt|| |  S dS )Ncpu)r	   is_availabler   device_count)r%   r&   device_module r,   d/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/optimizer.py_gen_rank_device5   s   r.   pgc                    sl   t j j d u rfddtt  D }n fddt  D }tdtt	t
ttf  |dS )Nc                    s"   g | ]}d | dt |  qS rank:/)r.   .0idx)pg_device_typer,   r-   
<listcomp>E   s    z(_create_colwise_spec.<locals>.<listcomp>c              
      s*   g | ]}d | dt t | qS r0   )r.   distget_global_rankr3   r/   r6   r,   r-   r7   J   s    r   dim
placements)r8   distributed_c10d_get_pg_default_devicetyperangeget_world_sizesizer   r   r   r   r!   str)r/   r=   r,   r:   r-   _create_colwise_spec@   s   


rE   valc                 C   s   t | tu r.t|  dkrdS t |  d jtu rdS t |  d jtu r,tddS t | tu rFt | jtu sBt | jtu rFtddS )Nr   FTz2Cannot handle DTensor nested insided ShardedTensorzCannot handle nested DTensor)r@   r
   lenlocal_shardstensorr"   
ValueError_local_tensor)rF   r,   r,   r-   _is_nested_tensorT   s   rL   propsrC   c                 C   sP   |dkrt tjt| }n
t|t| }tj|| j| j| j| j	|dS )Nr(   )rC   dtypelayoutrequires_grad
pin_memorydevice)
r   torchrR   r	   current_deviceemptyrN   rO   rP   rQ   )rM   rC   r&   rR   r,   r,   r-   _alloc_tensorc   s   rV   
state_dictc                 C   s   i }d}|   D ]9\}}d| f||< t|rAt| dks$J dt|ts-J d| d }|jj|jj	f||< |j
j}q||fS )a+  
    Load the right TP slice of the optimizer state.

    This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata.
    We take advantage of the model state_dict producing a sliced ST to figure out what we need to load.
    This is pretty fragile and it might be easier for FSDP to compute this info for us.
    Returns a dictionary where keys are the same of the state_dict and the value is a tuple of
    (offset, size) for the current rank TP slice.
    N.B. The state_dict *MUST* come from FSDP.sharded_state_dict.
    N   z%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr   )itemsrC   rL   rG   rH   
isinstancer
   metadatashard_offsetsshard_sizesrI   _process_group)rW   specsdp_pgkeyvalueshardr,   r,   r-   _get_state_dict_2d_layoutw   s,   rd   c                       sz   e Zd ZU eeef ed< eed< eed< deee	e
 f ddf fddZdefd	d
Zdedejf fddZ  ZS )_ReaderWithOffsettranslationrW   r[   fqn_to_offsetr'   Nc                    s*   t    || _ti | _i | _i | _d S N)super__init__rg   r   r[   rW   rf   )selfrg   	__class__r,   r-   rj      s
   


z_ReaderWithOffset.__init__c                 C   s   g }i | _ | j D ]\}}| jj| }t|ts"|t|||7 }q
|| jvr0|t|||7 }q
| j| }t	|
 dks?J |
 d }ttt|jj|t|jjdg}t|tt||}|D ]"}	|	jjd usnJ t|	jj|}
tj|	jt|
d}|| j |	j< qd||7 }q
t|S )NrX   r   )offsetssizes)offset)rf   rW   rY   r[   state_dict_metadatarZ   r
   r   rg   rG   rH   r   rS   Sizer   r\   r]   r   r   r   
dest_indexrp   r   dataclassesreplacer   )rk   requestsfqnobjmdrp   original_shardlocal_chunksreqsrioriginal_offsetoriginal_indexr,   r,   r-   create_local_plan   s@   


	
z#_ReaderWithOffset.create_local_planindexc                    s   t  | j||S rh   )ri   lookup_tensorrf   get)rk   r   rl   r,   r-   r      s   z_ReaderWithOffset.lookup_tensor)__name__
__module____qualname__r   r   __annotations__r   r   rD   r   intrj   r   r   rS   Tensorr   __classcell__r,   r,   rl   r-   re      s   
 " *re   model_state_dictoptimizer_keystorage_readerplannerc              	   C   sN  |  }t| \}}tj|j}t|}|du r?g }	tt D ]}
t	||
|
  }|	d|
 d|  q!td|	d}nt|}i }i }|j D ]\}}|j| }|d |kr\qLt|trfd||< qL|j dkrxt|j|j|||< qL|du rtt|j|j|t t |
 t d||< qL|d	 }||d|jfd }t|jj|jj|jj|jj|jj d
}|!t"#||}g }t|}|j$D ]}t%t&|j'( |krq|t)t|j|j*||d qt+j,|||d}||v r
|| d dur
t%t-t. || d ||< |||< qLt/|||durt0|n|d t1||j}|S )a  
    Load a state_dict in conjunction with FSDP sharded optimizer state.

    This is the current recommended way to checkpoint FSDP.
    >>> # xdoctest: +SKIP
    >>> import torch.distributed.checkpoint as dist_cp
    >>> # Save
    >>> model: torch.nn.Model
    >>> optim_params = model.parameters()
    >>> optim = torch.optim.SGD(optim_params, lr=0.01)
    >>> # Save
    >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    >>>     state_dict = {
    >>>         "optimizer": FSDP.optim_state_dict(model, optim),
    >>>         "model": model.state_dict()
    >>>     }
    >>>     dist_cp.save_state_dict(
    >>>         state_dict=optim_state,
    >>>         storage_writer=dist_cp.FileSystemWriter("checkpoint"),
    >>>         planner=dist_cp.DefaultSavePlanner(),
    >>>     )
    >>>
    >>> # Load
    >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT):
    >>>     model_state_dict = model_tp.state_dict()
    >>>     checkpoint = {
    >>>         "model": model_state_dict
    >>>     }
    >>>     dist_cp.load_state_dict(
    >>>         state_dict=checkpoint,
    >>>         storage_reader=dist_cp.FileSystemReader(checkpoint_file),
    >>>         planner=dist_cp.DefaultLoadPlanner(),
    >>>     )
    >>>     model.load_state_dict(checkpoint["model_state"])
    >>>
    >>>     optim_state = dist_cp.load_sharded_optimizer_state_dict(
    >>>         model_state_dict,
    >>>         optimizer_key="optimizer",
    >>>         storage_reader=dist_cp.FileSystemReader("checkpoint"),
    >>>     )
    >>>
    >>>     flattened_osd = FSDP.optim_state_dict_to_load(
    >>>        model, optim, optim_state["optimizer"]
    >>>     )
    >>>
    >>>     optim.load_state_dict(flattened_osd)
    Nr1   r2   r   r;   z
<bytes_io>rX   )rank
world_sizenum_devices_per_noder/      )rN   rO   rP   memory_formatrQ   )rI   r[   )process_group)rW   r   r   )2read_metadatard   r8   r>   r?   r@   r	   rA   rB   r   r*   appendr   rE   rq   rY   planner_datarZ   r   rC   numelrV   
propertiesr    get_rankr   r   ShardTensorPropertiesrN   rO   rP   r   rQ   build_metadatarS   rr   shards_metadatar   r!   	placementr   r   r]   r
   +_init_from_local_shards_and_global_metadatar   r   r   re   r   )r   r   r   r   r[   layout_specsr`   dp_pg_device_typer+   r=   idevice_infosharding_specrW   rg   ra   rb   key_pathspec_key
alloc_sizer   st_mdrH   current_rankshard_mdstr,   r,   r-   r#      s   5






	
)r$   rh   )Grt   typingr   r   r   r   r   r   r   rS   torch.distributeddistributedr8   torch._utilsr	   +torch.distributed._shard.sharded_tensor.apir
   0torch.distributed._shard.sharded_tensor.metadatar   r   -torch.distributed._shard.sharded_tensor.shardr   :torch.distributed._shard.sharding_spec.chunk_sharding_specr   )torch.distributed.checkpoint._nested_dictr   ,torch.distributed.checkpoint.default_plannerr   %torch.distributed.checkpoint.metadatar   r   r   r   r   r   $torch.distributed.checkpoint.plannerr   r   ,torch.distributed.checkpoint.planner_helpersr   r   .torch.distributed.checkpoint.state_dict_loaderr   $torch.distributed.checkpoint.storager   "torch.distributed.checkpoint.utilsr   r   r   "torch.distributed.distributed_c10dr   #torch.distributed.fsdp._shard_utilsr    torch.distributed.remote_devicer!   torch.distributed.tensorr"   rD   r   STATE_DICT_2D_LAYOUT__all__r.   ProcessGrouprE   r   boolrL   rV   rd   re   r#   r,   r,   r,   r-   <module>   sx   $$	 


%>