o
    }oi                  	   @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZmZ d dlmZmZ d dlmZmZ d dlmZ ed	Zd
eeef deeef fddZG dd deZ	d#deeef dede
ee	f fddZ deeef de
eef fddZ!deeef defddZ"	d#deeef dede
ee	f fddZ#deeef deeef fddZ$d#d eeef dede
ee	f fd!d"Z%dS )$    N)BytesIO)Path)AnyDictUnion)FileSystemReaderload)BytesStorageMetadataTensorStorageMetadata)TarPathZarrPathStore)_mock_importNeMonemo_checkpointreturnc                 C   s$   t | }tj|rt|S t|S )a	  
    Creates Path / TarPath object suitable for navigating inside the nemo checkpoint.

    Args:
        nemo_checkpoint (Path, str): Path to the NeMo checkpoint.
    Returns:
        Path | TarPath: Suitable Path object for navigating through the checkpoint.
    )strospathisdirr   r   )r   string_path r   R/home/ubuntu/.local/lib/python3.10/site-packages/nemo/export/utils/model_loader.pynemo_to_path$   s   	r   c                       s2   e Zd ZdZdeeef ddf fddZ  ZS )TarFileSystemReaderzReader that accepts both Path and TarPath checkpoint directory.

    The FileSystemReader works with TarPath, but expects a pure Path.
    It's enough to skip the Path check in __init__.
    r   r   Nc                    s:   t |tr	t|n|}t | t |tr|| _dS dS )z>Makes sure that super().__init__ gets a pure path as expected.N)
isinstancer   r   super__init__r   )selfr   
super_path	__class__r   r   r   ;   s
   

zTarFileSystemReader.__init__)	__name__
__module____qualname____doc__r   r   r   r   __classcell__r   r   r   r   r   4   s    &r   Fcheckpoint_dirload_extra_statesc                 C   sR   t | }| }dd |j D }|r!|dd |j D  t||d |S )aa  
    Loads model state dictionary from torch_dist checkpoint.

    Args:
        checkpoint_dir (Path | TarPath): Path to the model weights directory.
        load_extra_states (bool): If set to true, loads BytesIO objects, related to the extra states.
    Returns:
        dict: Loaded model state dictionary (weights are stored in torch tensors).
    c                 S   s0   i | ]\}}t |tr|tj|j|jjd qS ))dtype)r   r
   torchemptysize
propertiesr(   .0ktpr   r   r   
<dictcomp>R   s    z4load_sharded_metadata_torch_dist.<locals>.<dictcomp>c                 S   s    i | ]\}}t |tr|g qS r   )r   r	   r-   r   r   r   r1   Z   s     )storage_reader)r   read_metadatastate_dict_metadataitemsupdater   )r&   r'   	fs_readermetadata
state_dictr   r   r    load_sharded_metadata_torch_distC   s   r:   dirc              	   C   sv   t | d}i }|D ]-}|jdd }|d}tj|dd|| jd | < W d   n1 s3w   Y  q|S )	z
    Loads model extra states from the .pt shards.

    Args:
        dir (Path | TarPath): Path to the directory with sharded extra states.
    Returns:
        dict: State dictionary corresponding to the loaded extra states.
    zshard_*_*.pt.r   rbT)weights_only/N)listglobnamesplitopenr)   r   )r;   pt_filesextra_statesfile
shard_nameopened_filer   r   r   %load_sharded_pickle_extra_state_scalea   s   	rJ   subdirc                 C   s   t | dg kS )z
    Checks if zarr directory contains extra states.

    Args:
        subdir (Path | TarPath): Directory inside the zarr checkpoint.
    Returns:
        bool: Is a directory with extra states
    zshard_0_*.pt)r@   rA   )rK   r   r   r   contains_extra_statest   s   	rL   c                 C   s   |r	t jtg i }|  D ]Q}| sq|r$t|r$|t| q|d 	 r`|j
}t|}ddl}||d}|jj
dkrUt |dd tjt j||< qt |dd ||< q|S )a'  
    Loads model dictionary from the zarr format.

    Args:
        checkpoint_dir (Path | TarPath): Path to the NeMo checkpoint.
        load_extra_states (bool): If set to True, the function will load BufferIO objects with extra states.
    Returns:
        dict: Model state dictionary.
    z.zarrayr   Nrbfloat16)r)   serializationadd_safe_globalsr   iterdiris_dirrL   r6   rJ   existsrB   r   zarrrD   r(   
from_numpyviewnumpyint16rN   )r&   r'   sharded_state_dictrK   keyzstorerT   arrr   r   r   load_sharded_metadata_zarr   s$   (r]   	nemo_pathc                 C   s,   | d   r
| d S | d   r| d S | S )a  
    Returns a Path pointing to the weights directory inside the NeMo checkpoint.

    Args:
        nemo_path (Path | TarPath): Path to the nemo checkpoint.
    Returns:
        Path | TarPath: Path to the weights directory inside the model checkpoint.
    model_weightsweights)rS   )r^   r   r   r   nemo_weights_directory   s
   	ra   checkpoint_pathc                 C   s   t | }t|}|d jdd}t|}W d   n1 s w   Y  |d dkr1t||dS |d dkrQtd	 t||dW  d   S 1 sLw   Y  td
|d  d)a/  
    Loads NeMo state dictionary. Weights are stored in torch.Tensor

    Args:
        checkpoint_path (str | Path): Path to the NeMo checkpoint.
        load_extra_states (bool): If True, loads BytesIO objects, corresponding to the extra states.
    Returns:
        dict: Model state dictionary.
    zmetadata.jsonrM   )modeNsharded_backendrT   )r'   
torch_distz1megatron.core.dist_checkpointing.strategies.torchzDistributed checkpoint backend z not supported)	r   ra   rD   jsonr   r]   r   r:   NotImplementedError)rb   r'   r^   nemo_weightsfconfig_dictr   r   r   load_model_weights   s   

 rk   )F)&rf   loggingos.pathr   ior   pathlibr   typingr   r   r   rW   tensorstorer)   torch.distributed.checkpointr   r   %torch.distributed.checkpoint.metadatar	   r
   nemo.export.tarutilsr   r   nemo.export.utils._mock_importr   	getLoggerLOGGERr   r   r   boolr:   rJ   rL   r]   ra   rk   r   r   r   r   <module>   sH   
"


"


"',