o
    cin                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	 d dl
Z
d dlmZmZ d dlmZ d dlmZ er>d dlmZ dZed	d
G dd deZdS )    N)Path)TYPE_CHECKINGAnyDictOptional)4consume_prefix_in_state_dict_if_present_not_in_placeload_torch_model)FrameworkCheckpoint)	PublicAPI)Preprocessortorch_encoded_databeta)	stabilityc                   @   s   e Zd ZdZdZedddeeef de	d dd fd	d
Z
edddejjde	d dd fddZdde	ejj dejjfddZdS )TorchCheckpointzCA :class:`~ray.train.Checkpoint` with Torch-specific functionality.zmodel.ptN)preprocessor
state_dictr   r   returnc                C   sJ   t  }t|| j }t|d}t|| | |}|r#|	| |S )ah  Create a :class:`~ray.train.Checkpoint` that stores a model state dictionary.

        .. tip::

            This is the recommended method for creating
            :class:`TorchCheckpoints<TorchCheckpoint>`.

        Args:
            state_dict: The model state dictionary to store in the checkpoint.
            preprocessor: A fitted preprocessor to be applied before inference.

        Returns:
            A :class:`TorchCheckpoint` containing the specified state dictionary.

        Examples:

            .. testcode::

                import torch
                import torch.nn as nn
                from ray.train.torch import TorchCheckpoint

                # Set manual seed
                torch.manual_seed(42)

                # Function to create a NN model
                def create_model() -> nn.Module:
                    model = nn.Sequential(nn.Linear(1, 10),
                            nn.ReLU(),
                            nn.Linear(10,1))
                    return model

                # Create a TorchCheckpoint from our model's state_dict
                model = create_model()
                checkpoint = TorchCheckpoint.from_state_dict(model.state_dict())

                # Now load the model from the TorchCheckpoint by providing the
                # model architecture
                model_from_chkpt = checkpoint.get_model(create_model())

                # Assert they have the same state dict
                assert str(model.state_dict()) == str(model_from_chkpt.state_dict())
                print("worked")

            .. testoutput::
                :hide:

                ...
        zmodule.)
tempfilemkdtempr   MODEL_FILENAMEas_posixr   torchsavefrom_directoryset_preprocessor)clsr   r   tempdir
model_pathstripped_state_dict
checkpoint r    T/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/torch/torch_checkpoint.pyfrom_state_dict   s   8

zTorchCheckpoint.from_state_dictmodelc                C   s@   t  }t|| j }t|| | |}|r|| |S )a  Create a :class:`~ray.train.Checkpoint` that stores a Torch model.

        .. note::

            PyTorch recommends storing state dictionaries. To create a
            :class:`TorchCheckpoint` from a state dictionary, call
            :meth:`~ray.train.torch.TorchCheckpoint.from_state_dict`. To learn more
            about state dictionaries, read
            `Saving and Loading Models <https://pytorch.org/tutorials/beginner/saving_loading_models.html#what-is-a-state-dict>`_. # noqa: E501

        Args:
            model: The Torch model to store in the checkpoint.
            preprocessor: A fitted preprocessor to be applied before inference.

        Returns:
            A :class:`TorchCheckpoint` containing the specified model.

        Examples:

            .. testcode::

                from ray.train.torch import TorchCheckpoint
                import torch

                # Create model identity and send a random tensor to it
                model = torch.nn.Identity()
                input = torch.randn(2, 2)
                output = model(input)

                # Create a checkpoint
                checkpoint = TorchCheckpoint.from_model(model)
                print(checkpoint)

            .. testoutput::
                :hide:

                ...
        )	r   r   r   r   r   r   r   r   r   )r   r#   r   r   r   r   r    r    r!   
from_modela   s   -

zTorchCheckpoint.from_modelc                 C   s   |   !}t|| j }tj|stdtj	|dd}W d   n1 s(w   Y  t
|tjjr;|r;td t||d}|S )a  Retrieve the model stored in this checkpoint.

        Args:
            model: If the checkpoint contains a model state dict, and not
                the model itself, then the state dict will be loaded to this
                ``model``. Otherwise, the model will be discarded.
        z`model.pt` not found within this checkpoint. Make sure you created this `TorchCheckpoint` from one of its public constructors (`from_state_dict` or `from_model`).cpu)map_locationNzTorchCheckpoint already contains all information needed. Discarding provided `model` argument. If you are using TorchPredictor directly, you should do `TorchPredictor.from_checkpoint(checkpoint)` by removing kwargs `model=`.)saved_modelmodel_definition)as_directoryr   r   r   ospathexistsRuntimeErrorr   load
isinstancennModulewarningswarnr   )selfr#   r   r   model_or_state_dictr    r    r!   	get_model   s"   

zTorchCheckpoint.get_model)N)__name__
__module____qualname____doc__r   classmethodr   strr   r   r"   r   r0   r1   r$   r6   r    r    r    r!   r      s,    
D$6r   )r*   r   r2   pathlibr   typingr   r   r   r   r   ray.air._internal.torch_utilsr   r   (ray.train._internal.framework_checkpointr	   ray.util.annotationsr
   ray.data.preprocessorr   ENCODED_DATA_KEYr   r    r    r    r!   <module>   s    