o
    }oi3                     @   s   d dl Z d dlZd dlmZmZmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZ e jdkrFeZneZe
d	Ze
d
ZG dd deeeef ZG dd deeeef ZdS )    N)Path	PosixPathWindowsPath)GenericOptionalTupleTypeVar)FileLockTimeout)	TrainerFn)ckpt_to_context_subdir)loggingntSourceTTargetTc                       s   e Zd ZdZdZdZdefddZdedefdd	Z	 fd
dZ
ddee dedefddZddee defddZddee defddZ  ZS )	Connectora  
    A generic connector class that provides a framework for transforming a source type (SourceT)
    to a target type (TargetT) while handling file paths based on the operating system.

    Attributes
    ----------
        default_path (Optional[Path]): A default path used when no path is explicitly provided.

    Methods
    -------
        init() -> TargetT:
            Should be implemented to initialize the target type from the source type.

        apply(output_path: Path) -> Path:
            Should be implemented to apply the transformation and save the result at the output path.

        __new__(cls, *args, **kwargs) -> 'Connector':
            Creates a new instance of the connector, using default_path if no path is provided.

        __call__(output_path: Optional[Path] = None, overwrite: bool = False) -> Path:
            Processes the transformation and handles file operations like overwriting.

        local_path(base_path: Optional[Path] = None) -> Path:
            Computes the local path for storage based on a base path or a default cache home.

        is_in_cache(base_path: Optional[Path] = None) -> bool:
            Checks if the transformed data is already cached at the specified base path.
    Ni  returnc                 C      t  )zIShould be implemented to initialize the target type from the source type.NotImplementedError)self r   O/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/io/connector.pyinitG      zConnector.initoutput_pathc                 K   r   )zYShould be implemented to apply the transformation and save the result at the output path.r   )r   r   kwargsr   r   r   applyK   r   zConnector.applyc                    s@   | j d ur|sd|vrt | | j S t j| g|R i |S )Npath)default_pathsuper__new__)clsargsr   	__class__r   r   r!   O   s   zConnector.__new__F	overwritec           	      K   s  |p|   }||jd }t|}| rd}zz5|j| jd$ |r-| r-t| | s>| j	|fi |}|p=|}W d    n1 sHw   Y  W n$ t
y^   td|    tyr } z	td|   d }~ww W | rzt| W |S  ty } ztd| d|  W Y d }~|S d }~ww |S | rzt| W w  ty } ztd| d|  W Y d }~w d }~ww w )Nz.lockF)timeoutz6Timeout occurred while trying to acquire the lock for zAn error occurred: zFailed to remove lock file z: )
local_pathwith_suffixsuffixr	   existsacquireLOCK_TIMEOUTshutilrmtreer   r
   r   error	ExceptionosremoveOSErrorwarning)	r   r   r&   r   _output_path	lock_pathlock	to_returner   r   r   __call__V   sT   
 "zConnector.__call__	base_pathc                 C   s2   |r|}n
ddl m} t|}|t| dd S )zQComputes the local path for storage based on a base path or a default cache home.r   )NEMO_CACHE_HOME:///)nemo.lightning.baser=   r   strreplace)r   r<   _baser=   r   r   r   r(   z   s
   zConnector.local_pathc                 C   s   | j |d S )zLChecks if the transformed data is already cached at the specified base path.)r<   )r(   r+   )r   r<   r   r   r   is_in_cache   s   zConnector.is_in_cache)NFN)__name__
__module____qualname____doc__r   r-   r   r   r   r   r!   r   boolr;   r(   rD   __classcell__r   r   r$   r   r   &   s    $ r   c                   @   s   e Zd ZdZ	ddejdeej dejfddZdd	e	dejd
e
ddfddZ	dde	deej de
deejejf fddZddee	 de	fddZdejfddZdddZdS )ModelConnectora,  
    A specialized connector that extends the generic Connector to handle model-specific operations
    such as setup, save, and load using the Lightning framework.

    Methods
    -------
        nemo_setup(model: pl.LightningModule, trainer: Optional[pl.Trainer] = None) -> pl.Trainer:
            Sets up the model and trainer using a specified strategy, preparing it for training or inference.

        nemo_save(output_path: Path, trainer: pl.Trainer):
            Saves the model's state to the specified path using the trainer's current strategy.

        nemo_load(path: Path, trainer: Optional[pl.Trainer] = None, cpu: bool = True) -> Tuple[Any, pl.Trainer]:
            Loads a model from the specified path, optionally using a CPU-focused strategy, and returns the model and
            trainer.
    Nmodeltrainerr   c           	   	   O   s   ddl m}m}m} |p|dd||ddd|d}tj|j_|j	| |j
  | sjd|j_| + ||j |  W d	   n1 sMw   Y  W d	   |S W d	   |S 1 sew   Y  |S )
a  
        Sets up the model and trainer using a specified strategy, preparing it for training or inference.

        Args:
            model (pl.LightningModule): The model to be set up.
            trainer (Optional[pl.Trainer]): The trainer to be used, if not provided a new one will be created.
        Returns
        -------
            pl.Trainer: The trainer configured with the model and strategy.
        r   MegatronStrategyTrainer_strategy_lib   cpuFT)ckpt_save_optimizeralways_save_context)devicesacceleratorstrategyN)nemo.lightningrP   rQ   rR   r   FITTINGstatefnrY   connectsetup_environment
state_dict	lazy_initinit_modulemegatron_lazy_init_contextconfigconfigure_model)	r   rM   rN   r#   r   rP   rQ   rR   _trainerr   r   r   
nemo_setup   s&   
	

(zModelConnector.nemo_setupTr   dump_ioc                 C   s   d|j _d|j _|j | t|}|jddd || t|j ddr-|j jj	dd ddl
m} ddlm} | rM|rO||jt|d	gd
 dS dS dS )av  
        Saves the model's state to the specified path using the trainer's current strategy.

        Args:
            output_path (Path): The path where the model checkpoint will be saved.
            trainer (pl.Trainer): The trainer with the strategy to save the model.
            dump_io (bool): If True, the IO configuration will be saved to the output path.
        FT)parentsexist_ok
async_save)blockingr   )TrainerContext)is_global_rank_zerorM   )
yaml_attrsN)rY   _setup_optimizers_init_model_parallelsetupr   mkdirsave_checkpointgetattrcheckpoint_iomaybe_finalize_save_checkpointnemo.lightning.io.plrm   nemo.utils.get_rankrn   from_trainerio_dumpr   )r   r   rN   rh   rm   rn   r   r   r   	nemo_save   s   

zModelConnector.nemo_saver   rT   c                 C   s  ddl m} ddlm}m}m} ddlm} ||dd}	d|	j_	d|	j_
d|	j_||	| |	jdu}
g }|
r<||	j |pM|d	|rDd
nd|ddd|d}|j|	 |j  |	 s~|rz||	j |	  W d   n1 stw   Y  n|	  |j| |
rddlm} ||	_|	|	}	dd |jj  D }|jjj||dd|d}|jj|dd |	|fS |j| |	|fS )a  
        Loads a model from the specified path.

        Args:
            path (Path): The path from which the model will be loaded.
            trainer (Optional[pl.Trainer]): The trainer to be used, if not provided a new one will be created.
            cpu (bool): If True, the model will be loaded with a CPU-focused strategy.

        Returns
        -------
            Tuple[pl.LightningModule, pl.Trainer]: The loaded model and the trainer configured with the model.
        r   )#set_modelopt_spec_if_exists_in_ckptrO   )load_contextrM   )subpathNFrS   rT   gpupytorch)ddpsetup_optimizers)rW   rX   rY   	callbacks)ckpt_to_weights_subdirc                 S   s   i | ]\}}d |v r||qS )z	.adapter.r   ).0kvr   r   r   
<dictcomp>  s    z,ModelConnector.nemo_load.<locals>.<dictcomp>)	is_saving)sharded_state_dict)strict)nemo.collections.llm.modeloptr}   rZ   rP   rQ   rR   nemo.lightning.io.apir~   rd   fp8	fp8_paramperform_initializationmodel_transformappendrY   r^   r_   r`   megatron_cpu_init_contextre   rr   rx   r   rN   megatron_parallelr   itemsrv   load_checkpointload_model_state_dict)r   r   rN   rT   r}   rP   rQ   rR   r~   rM   is_peft_ckptr   rf   r   adapter_sharded_state_dictadapter_stater   r   r   	nemo_load   sT   






zModelConnector.nemo_loadr<   c                 C   sd   |rt |}n
ddlm} t |}t| dr(| jdv r#|| jj S || j S |t| dd S )Nr   )NEMO_MODELS_CACHEr?   )z.ptz.pthr>   )	r   r@   r   rA   
startswithr*   parentnamerB   )r   r<   rC   r   r   r   r   r(   "  s   


zModelConnector.local_pathc                 C   sD   t | dr| j|_t |drt | jdr | jj|j_dS dS dS dS )z#Called after checkpoint is imported	tokenizer__io__N)hasattrr   r   )r   rM   r   r   r   on_import_ckpt2  s   
zModelConnector.on_import_ckpt/tmp/nemo_tokenizerc                 C   s(   ddl m} |j|dd}|| |S )z,Save HF tokenizer to the imported NeMo modelr   )AutoTokenizerT)trust_remote_code)transformersr   from_pretrainedsave_pretrained)r   tokenizer_name_or_path	save_pathr   tokr   r   r   save_hf_tokenizer_assets9  s   
z'ModelConnector.save_hf_tokenizer_assetsrE   )T)NT)r   )rF   rG   rH   rI   plLightningModuler   rQ   rg   r   rJ   r|   r   r   r(   r   r   r   r   r   r   rL      s0    
#
IrL   )r2   r.   pathlibr   r   r   typingr   r   r   r   lightning.pytorchr   r   filelockr	   r
    lightning.pytorch.trainer.statesr   nemo.lightning.ckpt_utilsr   
nemo.utilsr   r   BasePathr   r   r   rL   r   r   r   r   <module>   s    
d