o
    $i                  
   @   s*  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZmZmZmZmZmZ d dlZd dlmZ d dlZd dlmZ d dlmZmZmZ d dlmZ d dl m!Z! d dl"m#Z#m$Z$ d d	l%m&Z&m'Z' d d
l(m)Z) d dl*m+Z, d dl-m+Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 e5e6Z7e8dZ9e8dZ:e4ddG dd de j;Z<dej=j>de?de@fddZAdej=jBde@fddZCe#	d,dee?e,e.f ded dee?ef fd d!ZDe#dee?e,e.f d"e?de?fd#d$ZEe#d%ee?e,e.f d"e?de?fd&d'ZFe4d-d)e@fd*d+ZGdS ).    N)MappingProxyType)Any
CollectionDictListOptionalTupleUnion)version)COMPONENT_LEARNERCOMPONENT_LEARNER_GROUPCOMPONENT_RL_MODULE)
force_list)FaultTolerantActorManager)OldAPIStack5OverrideToImplementCustomLogic_CallToSuperRecommended)NOT_SERIALIZABLEserialize_type)	StateDict)
Checkpoint)sync_dir_between_nodes)log_once)	PublicAPIz1.1z2.1alpha)	stabilityc                   @   s  e Zd ZdZdZdZdZ	d(dddddeee	e
jf  dee d	ed
 dede	f
ddZddddee	e
jf dee	 d	ed
 ddfddZe	d(dee	e
jf d	ed
 dd fddZej	d(dddeee	ee	 f  deee	ee	 f  defddZejdeddfddZejdeeee	ef f fddZedefddZdeee	d f  fd d!Zdefd"d#Zd$d% Z 	d)d&d'Z!dS )*CheckpointableaM  Abstract base class for a component of RLlib that can be checkpointed to disk.

    Subclasses must implement the following APIs:
    - save_to_path()
    - restore_from_path()
    - from_checkpoint()
    - get_state()
    - set_state()
    - get_ctor_args_and_kwargs()
    - get_metadata()
    - get_checkpointable_components()
    statezclass_and_ctor_args.pklzmetadata.jsonNF)r   
filesystemuse_msgpackpathr   pyarrow.fs.FileSystemr   returnc                C   sL  |du rddl }tt }t| }|p|| }t|tr"|n| }|r3|s3t	j
j|\}}|j|dd t|}|  }d|vrMtt|d< ||| j  }	|	t|d W d   n1 smw   Y  ||| j  }	tt| |  d|	 W d   n1 sw   Y  |du}
|p| jdd	 |  D d
}|  D ]\}}|
r||vrq|| }t|trA| d }ddd}tt |j!||gd}|j"s|# |# }| }d}|
rt$%|&|}||kr
|j!|||fdd|gd q||fdd}tt |j!||gd}|j"s%|# |# }t'|||t| |fdd}|j!||gd q|
rJ|&|}n| j|d| }|j(||||d q|| j)|rddnd  }|| *}	|rt*dd}|||	 nt||	 W d   t|S W d   t|S 1 sw   Y  t|S )a  Saves the state of the implementing class (or `state`) to `path`.

        The state of the implementing class is always saved in the following format:

        .. testcode::
            :skipif: True

            path/
                [component1]/
                    [component1 subcomponentA]/
                        ...
                    [component1 subcomponentB]/
                        ...
                [component2]/
                        ...
                [cls.METADATA_FILE_NAME] (json)
                [cls.STATE_FILE_NAME] (pkl|msgpack)

        The main logic is to loop through all subcomponents of this Checkpointable
        and call their respective `save_to_path` methods. Then save the remaining
        (non subcomponent) state to this Checkpointable's STATE_FILE_NAME.
        In the exception that a component is a FaultTolerantActorManager instance,
        instead of calling `save_to_path` directly on that manager, the first healthy
        actor is interpreted as the component and its `save_to_path` method is called.
        Even if that actor is located on another node, the created file is automatically
        synced to the local node.

        Args:
            path: The path to the directory to save the state of the implementing class
                to. If `path` doesn't exist or is None, then a new directory will be
                created (and returned).
            state: An optional state dict to be used instead of getting a new state of
                the implementing class through `self.get_state()`.
            filesystem: PyArrow FileSystem to use to access data at the `path`.
                If not specified, this is inferred from the URI scheme of `path`.
            use_msgpack: Whether the state file should be written using msgpack and
                msgpack_numpy (file extension is `.msgpack`), rather than pickle (file
                extension is `.pkl`).

        Returns:
            The path (str) where the state has been saved.
        Nr   T	recursivecheckpoint_versionzutf-8)classctor_args_and_kwargsc                 S   s   g | ]}|d  qS r    .0cr(   r(   X/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/rllib/utils/checkpoints.py
<listcomp>       z/Checkpointable.save_to_path.<locals>.<listcomp>not_componentsc                 S   s   dd l }|j S Nr   )rayutilget_node_ip_address)_r2   r(   r(   r,   _get_ip   s   
z,Checkpointable.save_to_path.<locals>._get_ipremote_actor_idsc                 S   s,   | j ||d urt||dS |  |dS )Nr   r   )save_to_pathr2   get	get_state)w_path_state_use_msgpackr(   r(   r,   <lambda>   s   z-Checkpointable.save_to_path.<locals>.<lambda>c                 S   s:   dd l }| }| j||d urt|n|  |d |S )Nr   r9   )tempfilemkdtempr:   r2   r;   r<   )r=   r?   r@   rB   tmpdirr(   r(   r,   _save  s   z*Checkpointable.save_to_path.<locals>._savec                 S   s   dd l }|| d S r1   )shutilrmtree)r5   _dirrF   r(   r(   r,   _rmdir"  s   z+Checkpointable.save_to_path.<locals>._rmdir)
components)r   r   r   .msgpack.pklerrorN)+uuidpathlibPathrB   
gettempdirstruuid4
isinstanceas_posixpyarrowfs
FileSystemfrom_uri
create_dirget_metadata)CHECKPOINT_VERSION_LEARNER_AND_ENV_RUNNERopen_output_streamMETADATA_FILE_NAMEwritejsondumpsencodeCLASS_AND_CTOR_ARGS_FILE_NAMEpickledumptypeget_ctor_args_and_kwargsr<   get_checkpointable_componentsr   healthy_actor_idsnextiterforeach_actorokr;   r2   putpopr   r:   STATE_FILE_NAMEtry_import_msgpack)selfr   r   r   r   rP   tmp_dirrandom_dir_namemetadataf_state_provided	comp_namecomp	comp_pathactor_to_user6   _resultworker_ip_addrself_ip_addrcomp_state_refrE   worker_temp_dirrI   
comp_statefilenamemsgpackr(   r(   r,   r:   ^   s   4





zCheckpointable.save_to_path)	componentr   r   c                K   sz  t |tr|n| }|r|stjj|\}}t|}t	|| s,t
d| ddd |  D }| j||fd|i| |du r|| j }|d rxtdd	}||d }|j|d
d}	W d   n1 srw   Y  n||d }t|}	W d   n1 sw   Y  | |	 dd |  D }
|
| }|r| j||fd|i| dS dS dS )a.  Restores the state of the implementing class from the given path.

        If the `component` arg is provided, `path` refers to a checkpoint of a
        subcomponent of `self`, thus allowing the user to load only the subcomponent's
        state into `self` without affecting any of the other state information (for
        example, loading only the NN state into a Checkpointable, which contains such
        an NN, but also has other state information that should NOT be changed by
        calling this method).

        The given `path` should have the following structure and contain the following
        files:

        .. testcode::
            :skipif: True

            path/
                [component1]/
                    [component1 subcomponentA]/
                        ...
                    [component1 subcomponentB]/
                        ...
                [component2]/
                        ...
                [cls.METADATA_FILE_NAME] (json)
                [cls.STATE_FILE_NAME] (pkl|msgpack)

        Note that the self.METADATA_FILE_NAME file is not required to restore the state.

        Args:
            path: The path to load the implementing class' state from or to load the
                state of only one subcomponent's state of the implementing class (if
                `component` is provided).
            component: If provided, `path` is interpreted as the checkpoint path of only
                the subcomponent and thus, only that subcomponent's state is
                restored/loaded. All other state of `self` remains unchanged in this
                case.
            filesystem: PyArrow FileSystem to use to access data at the `path`. If not
                specified, this is inferred from the URI scheme of `path`.
            **kwargs: Forward compatibility kwargs.
        z`path` (z) not found!c                 S      h | ]}|d  qS r'   r(   r)   r(   r(   r,   	<setcomp>  r.   z3Checkpointable.restore_from_path.<locals>.<setcomp>r   NrK   TrM   F)strict_map_keyrL   c                 S   r   r'   r(   r)   r(   r(   r,   r     r.   only_comp_names)rV   rT   rW   rX   rY   rZ   r[   rQ   rR   _exists_at_fs_pathFileNotFoundErrorrj   $_restore_all_subcomponents_from_pathrr   with_suffixis_filers   open_input_streamloadrf   	set_state)rt   r   r   r   kwargsorig_comp_namesr   r   rx   r   new_comp_namesdiff_comp_namesr(   r(   r,   restore_from_pathF  sX   0




z Checkpointable.restore_from_pathc                 K   s  t |tr|n| }|r|stjj|\}}t|}zc|	|| j
  }t|}W d   n1 s8w   Y  |d }t|d d }|d d }tt|j D ]%\}	\}
}|
|v r~||
}|jtjjkrzt||	krz|||	< qY|||
< qYW n ty   | }g }|}Y nw t|| std| d| d|  d	t|tstd| d| d
||i |}|j|fd|i| |S )a  Creates a new Checkpointable instance from the given location and returns it.

        Args:
            path: The checkpoint path to load (a) the information on how to construct
                a new instance of the implementing class and (b) the state to restore
                the created instance to.
            filesystem: PyArrow FileSystem to use to access data at the `path`. If not
                specified, this is inferred from the URI scheme of `path`.
            kwargs: Forward compatibility kwargs. Note that these kwargs are sent to
                each subcomponent's `from_checkpoint()` call.

        Returns:
             A new instance of the implementing class, already set to the state stored
             under `path`.
        Nr%   r&   r      zThe class (z) stored in checkpoint (z+) does not seem to be a subclass of `cls` (z)!zA) does not seem to be an implementer of the `Checkpointable` API!r   )rV   rT   rW   rX   rY   rZ   r[   rQ   rR   r   re   rf   r   r   	enumerateinspect	signature
parametersitemsrq   kind_ParameterKindPOSITIONAL_OR_KEYWORDlen	Exception
issubclass
ValueErrorr   r   )clsr   r   r   rx   	ctor_infoctor	ctor_argsctor_kwargsi
param_nameparamvalobjr(   r(   r,   from_checkpoint  sV   




zCheckpointable.from_checkpointr/   rJ   r0   c                K      dS )a  Returns the implementing class's current state as a dict.

        The returned dict must only contain msgpack-serializable data if you want to
        use the `AlgorithmConfig._msgpack_checkpoints` option. Consider returning your
        non msgpack-serializable data from the `Checkpointable.get_ctor_args_and_kwargs`
        method, instead.

        Args:
            components: An optional collection of string keys to be included in the
                returned state. This might be useful, if getting certain components
                of the state is expensive (e.g. reading/compiling the weights of a large
                NN) and at the same time, these components are not required by the
                caller.
            not_components: An optional list of string keys to be excluded in the
                returned state, even if the same string is part of `components`.
                This is useful to get the complete state of the class, except
                one or a few components.
            kwargs: Forward-compatibility kwargs.

        Returns:
            The current state of the implementing class (or only the `components`
            specified, w/o those in `not_components`).
        Nr(   )rt   rJ   r0   r   r(   r(   r,   r<         zCheckpointable.get_statec                 C   r   )ab  Sets the implementing class' state to the given state dict.

        If component keys are missing in `state`, these components of the implementing
        class will not be updated/set.

        Args:
            state: The state dict to restore the state from. Maps component keys
                to the corresponding subcomponent's own state.
        Nr(   )rt   r   r(   r(   r,   r     r   zCheckpointable.set_statec                 C   r   )zReturns the args/kwargs used to create `self` from its constructor.

        Returns:
            A tuple of the args (as a tuple) and kwargs (as a Dict[str, Any]) used to
            construct `self` from its class constructor.
        Nr(   rt   r(   r(   r,   ri   $  r   z'Checkpointable.get_ctor_args_and_kwargsc                 C   s   | j | jtjtjdS )a  Returns JSON writable metadata further describing the implementing class.

        Note that this metadata is NOT part of any state and is thus NOT needed to
        restore the state of a Checkpointable instance from a directory. Rather, the
        metadata will be written into `self.METADATA_FILE_NAME` when calling
        `self.save_to_path()` for the user's convenience.

        Returns:
            A JSON-encodable dict of metadata information.
        )class_and_ctor_args_file
state_fileray_version
ray_commit)re   rr   r2   __version__
__commit__r   r(   r(   r,   r]   -  s
   zCheckpointable.get_metadatac                 C   s   g S )a  Returns the implementing class's own Checkpointable subcomponents.

        Returns:
            A list of 2-tuples (name, subcomponent) describing the implementing class'
            subcomponents, all of which have to be `Checkpointable` themselves and
            whose state is therefore written into subdirectories (rather than the main
            state file (self.STATE_FILE_NAME) when calling `self.save_to_path()`).
        r(   r   r(   r(   r,   rj   @  s   	z,Checkpointable.get_checkpointable_componentsc                    sN   t |}t |}|du st fdd|D s |v r%|du s# |vr%dS dS )ao  Returns True if a component should be checkpointed.

        Args:
            name: The checkpoint name.
            components: A list of components that should be checkpointed.
            non_components: A list of components that should not be checkpointed.

        Returns:
            True, if the component should be checkpointed and otherwise False.
        Nc                 3   s    | ]
}|  d  V  qdS )/N)
startswithr)   namer(   r,   	<genexpr>Z  s    z2Checkpointable._check_component.<locals>.<genexpr>TF)r   any)rt   r   rJ   r0   	comp_listnot_comp_listr(   r   r,   _check_componentK  s   zCheckpointable._check_componentc                 C   sV   |d u rd S t |}g }|D ]}||d r$||t|d d   q|s)d S |S )Nr   r   )r   r   appendr   )rt   r   rJ   subcomponentsr{   r(   r(   r,   _get_subcomponents`  s   z!Checkpointable._get_subcomponentsc                 K   s   |   D ]f\}}|d ur||vrqd }|d u r$|| }	t||	 s#qn|}	||d r8|t|d d  }n||kr=qt|tr^tj	 }
|
 }t||	|
|fdd}|j||d q|j|	f||d| qd S )Nr   r   c           	      S   s   dd l }dd l}|j }||kr| j|fd|i| d S | }t|||| | j|fd|i| W d    d S 1 s@w   Y  d S )Nr   r   )rB   r2   r3   r4   r   TemporaryDirectoryr   )	r=   _kwargsr>   _head_ip	_comp_argrB   r2   worker_node_iptemp_dirr(   r(   r,   _restore  s"   

"zECheckpointable._restore_all_subcomponents_from_path.<locals>._restorer7   )r   r   )rj   r   rW   r   r   rV   r   r2   r3   r4   rk   r   rn   r   )rt   r   r   r   r   r   rz   r{   comp_argcomp_dirhead_node_ipall_healthy_actorsr   r(   r(   r,   r   l  s>   


z3Checkpointable._restore_all_subcomponents_from_pathrO   )NN)"__name__
__module____qualname____doc__rr   re   r`   r   r	   rT   rQ   rR   r   boolr:   r   classmethodr   abcabstractmethodr   r<   r   r   r   r   ri   r   r]   r   rj   r   r   r   r(   r(   r(   r,   r   ?   s    
 m
ZW r   rY   r   r!   c                 C   s   |  |}|jtjjjkS )z:Returns `True` if the path can be found in the filesystem.)get_file_inforh   rX   rY   FileTypeNotFound)rY   r   validr(   r(   r,   r     s   
r   	file_infoc                 C   s   | j tjjjkS )z5Returns `True`, if the file info is from a directory.)rh   rX   rY   r   	Directory)r   r(   r(   r,   _is_dir  s   r   
checkpointr   r    c              
   C   s  ddt ddddd}t| ttfr|  } | r"|s"tjj| \}} t	
| } t||  rt||  r|dt| i |tjj|  dd}|D ]}|jrotd|jro|td	t|jd
 |  S qQt|| d  r|| d  }tj|d}W d   n1 sw   Y  d|v rt|d |d< || ntdrtd|  d dD ](}t|| d|   r|d|dkrdndt t| d|  d |  S qd}dD ]%}| d|  }	t||	 r||	 jr|dkrdnd} nq|du rtd||t|	d | d }
t||
 rYt||
 rYt }|tjj|
 dd}|D ]	}| |j qH|d|i | t! t" t# }t||  rt|| rt }|tjj| dd}|D ]}||j }t|| r| |j q|d|i |S t||  r||  jr|td	t| j$t| d |S tdt|  d )!a  Returns a dict with information about an Algorithm/Policy checkpoint.

    If the given checkpoint is a >=v1.0 checkpoint directory, try reading all
    information from the contained `rllib_checkpoint.json` file.

    Args:
        checkpoint: The checkpoint directory (str) or a Checkpoint object.
        filesystem: PyArrow FileSystem to use to access data at the `checkpoint`. If not
            specified, this is inferred from the URI scheme provided by `checkpoint`.

    Returns:
        A dict containing the keys:
        "type": One of "Policy" or "Algorithm".
        "checkpoint_version": A version tuple, e.g. v1.0, indicating the checkpoint
        version. This will help RLlib to remain backward compatible wrt. future
        Ray and checkpoint versions.
        "checkpoint_dir": The directory with all the checkpoint files in it. This might
        be the same as the incoming `checkpoint` arg.
        "state_file": The main file with the Algorithm/Policy's state information in it.
        This is usually a pickle-encoded file.
        "policy_ids": An optional set of PolicyIDs in case we are dealing with an
        Algorithm checkpoint. None if `checkpoint` is a Policy checkpoint.
    	AlgorithmcloudpickleN)rh   formatr$   checkpoint_dirr   
policy_ids
module_idsr   Fr"   zcheckpoint-\d+z0.1)r$   r   rllib_checkpoint.json)fpr$   no_rllib_checkpoint_json_filez>No `rllib_checkpoint.json` file found in checkpoint directory zG! Trying to extract checkpoint info from other files found in that dir.)pklmsgpckzpolicy_state.Policyr   r   )rh   r   r$   r   )r   r   r   zalgorithm_state.zGiven checkpoint does not seem to be valid! No file with the name `algorithm_state.[pkl|msgpack|msgpck]` (or `checkpoint-[0-9]+`) found.)r   r   policiesr   r   )r$   r   r   zGiven checkpoint (zV) not found! Must be a checkpoint directory (or a file for older checkpoint versions).)%CHECKPOINT_VERSIONrV   Checkpoint_trainCheckpoint_tuneto_directoryrX   rY   rZ   r[   rQ   rR   r   rW   r   r   updaterT   FileSelectorr   rematch	base_namer
   Versionr   rb   r   r   loggerwarningr   setaddr   r   r   parent)r   r   infofile_info_listr   rx   rllib_checkpoint_info	extensionr   r   policies_dirr   modules_dirr   
module_dirr(   r(   r,   get_checkpoint_info  s  



r  msgpack_checkpoint_dirc                 C   s  ddl m} ddlm} ddlm} tdd}|| }| }t	|d |d< t
|d ts8|d  |d< n	||d |d< i }d	|v rUd
|d	 v rU|d	 d
i }t|d	 d< t|d	 d< tt|d< tj|d}	t|	d}
|||
 W d   n1 sw   Y  ttj|dd}
td|d d|	t| tjtjd|
 W d   n1 sw   Y  | D ]&\}}||dd tj|d|}tj|dd ||}|j ||dd q|!  |S )a
  Converts an Algorithm checkpoint (pickle based) to a msgpack based one.

    Msgpack has the advantage of being python version independent.

    Args:
        checkpoint: The directory, in which to find the Algorithm checkpoint (pickle
            based).
        msgpack_checkpoint_dir: The directory, in which to create the new msgpack
            based checkpoint.

    Returns:
        The directory in which the msgpack checkpoint has been created. Note that
        this is the same as `msgpack_checkpoint_dir`.
    r   )r   )AlgorithmConfig)validate_module_idTrM   algorithm_classconfigworkerpolicy_statespolicy_mapping_fnis_policy_to_trainr$   zalgorithm_state.msgpckwbNr   r=   r   r   )rh   r$   r   r   r   r   r   r   exist_okpolicy_statecheckpoint_format)"ray.rllib.algorithmsr   %ray.rllib.algorithms.algorithm_configr  ray.rllib.core.rl_moduler  rs   r   __getstate__r   rV   dict	serialize_serialize_dictrq   r   rT   r   osr   joinopenrg   rb   listkeysr2   r   r   r   makedirs
get_policyexport_checkpointstop)r   r  r   r  r  r   algor   r
  r   rx   pidr  
policy_dirpolicyr(   r(   r,   convert_to_msgpack_checkpoint  sZ   


	
r'  policy_checkpointc                 C   s>   ddl m} || }tj|dd |j|| dd ~|S )a
  Converts a Policy checkpoint (pickle based) to a msgpack based one.

    Msgpack has the advantage of being python version independent.

    Args:
        policy_checkpoint: The directory, in which to find the Policy checkpoint (pickle
            based).
        msgpack_checkpoint_dir: The directory, in which to create the new msgpack
            based checkpoint.

    Returns:
        The directory in which the msgpack checkpoint has been created. Note that
        this is the same as `msgpack_checkpoint_dir`.
    r   )r   Tr  r   r  )ray.rllib.policy.policyr   r   r  r  r!  r<   )r(  r  r   r&  r(   r(   r,   $convert_to_msgpack_policy_checkpoint  s   
r*  FrN   c                 C   s@   zddl }ddl}|  |W S  ty   | rtdY dS w )al  Tries importing msgpack and msgpack_numpy and returns the patched msgpack module.

    Returns None if error is False and msgpack or msgpack_numpy is not installed.
    Raises an error, if error is True and the modules could not be imported.

    Args:
        error: Whether to raise an error if msgpack/msgpack_numpy cannot be imported.

    Returns:
        The `msgpack` module, with the msgpack_numpy module already patched in. This
        means you can already encde and decode numpy arrays with the returned module.

    Raises:
        ImportError: If error=True and msgpack/msgpack_numpy is not installed.
    r   NzkCould not import or setup msgpack and msgpack_numpy! Try running `pip install msgpack msgpack_numpy` first.)r   msgpack_numpypatchr   ImportError)rN   r   r+  r(   r(   r,   rs     s   rs   rO   )F)Hr   r   rb   loggingr  rQ   r   rB   typesr   typingr   r   r   r   r   r   r	   
pyarrow.fsrX   	packagingr
   r2   ray.cloudpickler   rf   ray.rllib.corer   r   r   ray.rllib.utilsr   ray.rllib.utils.actor_managerr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.serializationr   r   ray.rllib.utils.typingr   	ray.trainr   r   ray.tuner   ray.tune.utils.file_transferr   ray.utilr   ray.util.annotationsr   	getLoggerr   r   r   r   r^   ABCr   rY   rZ   rT   r   r   FileInfor   r  r'  r*  rs   r(   r(   r(   r,   <module>   s    $


    w
 G[#