o
    }oi                     @   s   d dl mZ d dlmZ d dlmZ d dlZd dlm	  m
Z d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZmZ 				
ddejdedee dededefddZdedeeeejf ef fddZdd Z 		dddZ!dS )    )Path)OptionalN)dist_checkpointing)FileSystemReader)ConnectorMixinModelConnector)ckpt_to_weights_subdir)loggingmodel_utilsFTmodelsourceoutput_path	overwriteon_import_ckptreturnc                 C   s:   t | ts	td| |}|||d}|r||  |S )a  
    Overriding nemo/lightning/io/api.py:import_ckpt to add on_import_ckpt flag, which is used to

    Imports a checkpoint into a model using the model's associated importer, typically for
    the purpose of fine-tuning a community model trained in an external framework, such as
    Hugging Face. This function leverages the ConnectorMixin interface to integrate external
    checkpoint data seamlessly into the specified model instance.

    The importer component of the model reads the checkpoint data from the specified source
    and transforms it into the right format. This is particularly useful for adapting
    models that have been pre-trained in different environments or frameworks to be fine-tuned
    or further developed within the current system. The function allows for specifying an output
    path for the imported checkpoint; if not provided, the importer's default path will be used.
    The 'overwrite' parameter enables the replacement of existing data at the output path, which
    is useful when updating models with new data and discarding old checkpoint files.

    For instance, using `import_ckpt(Mistral7BModel(), "hf")` initiates the import process
    by searching for a registered model importer tagged with "hf". In NeMo, `HFMistral7BImporter`
    is registered under this tag via:
    `@io.model_importer(Mistral7BModel, "hf", default_path="mistralai/Mistral-7B-v0.1")`.
    This links `Mistral7BModel` to `HFMistral7BImporter`, designed for HuggingFace checkpoints.
    The importer then processes and integrates these checkpoints into `Mistral7BModel` for further
    fine-tuning.

    Args:
        model (pl.LightningModule): The model into which the checkpoint will be imported.
            This model must implement the ConnectorMixin, which includes the necessary
            importer method for checkpoint integration.
        source (str): The source from which the checkpoint will be imported. This can be
            a file path, URL, or any other string identifier that the model's importer
            can recognize.
        output_path (Optional[Path]): The path where the imported checkpoint will be stored.
            If not specified, the importer's default path is used.
        overwrite (bool): If set to True, existing files at the output path will be overwritten.
            This is useful for model updates where retaining old checkpoint files is not required.
        on_import_ckpt (bool): If set to True, the importer will also call importer.on_import_ckpt(model),
            which imports the tokenizer associated with the model. This is set to False for multi-modal LLMs
            like SpeechLM, where the tokenizer is associated with the SpeechLM model instead of the internal LLM.

    Returns
    -------
        Path: The path where the checkpoint has been saved after import. This path is determined
            by the importer, based on the provided output_path and its internal logic.

    Raises
    ------
        ValueError: If the model does not implement ConnectorMixin, indicating a lack of
            necessary importer functionality.

    Example:
        model = Mistral7BModel()
        imported_path = import_ckpt(model, "hf://mistralai/Mistral-7B-v0.1")
    z+Model must be an instance of ConnectorMixin)r   r   )
isinstancer   
ValueErrorimporterr   )r   r   r   r   r   r   	ckpt_path r   V/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/speechlm/utils/io.pyimport_ckpt   s   
<

r   ckpt_dirc                 C   sX   t | ts	t| } t| dd} t| }| }dd |j D }tj||d ||fS )zW
    Load a distributed checkpoint from a directory, return as pytorch state dict.
    F)	is_savingc                 S   s4   i | ]\}}t |jd kr|tj|j|jjdqS )TensorStorageMetadata)dtype)type__name__torchemptysize
propertiesr   ).0ktpr   r   r   
<dictcomp>n   s
    z)load_distributed_ckpt.<locals>.<dictcomp>)storage_reader)	r   r   r   r   read_metadatastate_dict_metadataitemsdcpload)r   	fs_readermetadata
state_dictr   r   r   load_distributed_ckptc   s   
r/   c                 C   sH   | }| dD ]}t| |std|j d| d| t| |} q| S )z,
    Get nested attribute of an object.
    .zObject z does not have attribute z, failed at )splithasattrAttributeError	__class__getattr)objattroriginal_objkeyr   r   r   get_nested_attr{   s   
r:   model_configOnemo.collections.speechlm.models.speech_to_text_llm_model.SpeechToTextLLMConfigc                 C   sZ   | j }t|r
|S td| j d|  t| j}t|| j	| j
 | dd}|S )z5
    Prepare distribute checkpoint for base LLM.
    z%Preparing distributed checkpoint for z from F)r   )language_model_from_pretrained	mcore_dcpcheck_is_distributed_checkpointr	   infolanguage_model_classr
   import_class_by_pathr   language_model_configlanguage_model_hub)r;   checkpoint_pathllm_model_clsr   r   r   r    prepare_pretrained_llm_dist_ckpt   s   
rG   )NFT)r;   r<   )"pathlibr   typingr   lightning.pytorchpytorchplr   torch.distributed.checkpointdistributed
checkpointr*   megatron.corer   r>   r   nemo.lightning.io.mixinr   r   nemo.lightning.io.plr   
nemo.utilsr	   r
   LightningModulestrboolr   tupledictTensorr/   r:   rG   r   r   r   r   <module>   s>   
$F