o
    ci\                     @   s  d dl Zd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lm Z m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZBmCZCmDZDmEZE erd dlFmGZG e* \ZHZIdZJG dd deKeZLG dd de ZMG dd de>e'ZNdS )    N)Enum)
CollectionDictIterableListOptionalTYPE_CHECKINGUnion)DataIterator)EnvToModulePipeline)ALL_MODULESDEFAULT_AGENT_IDDEFAULT_MODULE_ID!COMPONENT_ENV_TO_MODULE_CONNECTORCOMPONENT_RL_MODULE)Columns)MultiRLModuleSpec)SingleAgentEpisode)OfflinePreLearnerSCHEMA)MultiAgentBatch)override)Checkpointable)
get_devicetry_import_torch)DATASET_NUM_ITERS_EVALUATED$DATASET_NUM_ITERS_EVALUATED_LIFETIMEEPISODE_LEN_MAXEPISODE_LEN_MEANEPISODE_LEN_MINEPISODE_RETURN_MAXEPISODE_RETURN_MEANEPISODE_RETURN_MINMODULE_SAMPLE_BATCH_SIZE_MEANNUM_ENV_STEPS_SAMPLEDNUM_ENV_STEPS_SAMPLED_LIFETIMENUM_MODULE_STEPS_SAMPLED!NUM_MODULE_STEPS_SAMPLED_LIFETIMEOFFLINE_SAMPLING_TIMERWEIGHTS_SEQ_NO)MiniBatchRayDataIterator)Runnerconvert_to_torch_tensor)
DeviceType	EpisodeID	StateDict
TensorType)AlgorithmConfigtotal_eval_lossc                   @   s   e Zd ZdZdZdZdS )OfflinePolicyEvaluationTypesa  Defines the offline policy evaluation types.

    IS: Importance Sampling.
    PDIS: Per-Decision Importance Sampling. In contrast to IS this method
        weighs each reward and not the return as a whole. As a result it
        usually exhibits lower variance.
    ispdisN)__name__
__module____qualname____doc__ISPDIS r=   r=   f/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/offline_policy_evaluation_runner.pyr4   F   s    r4   c                   @   s2   e Zd Zdeeejf deeejf fddZdS )OfflinePolicyPreEvaluatorbatchreturnc              	      s  | j rDdd ldd l  fdd|d D }| |}| j| | jj| jj| j	
 r5| jjddnd | jddp>dddd	}nZ| jrtj| j|dt| jjB | jjd
d }| |}| j| | jj| jj| j	
 rx| jjddnd | jddpdddd	}n| j| j|t| jjB d| jj| j| jdd }g }|D ].}i }|tdt||tj< | |tj< | |tj< |j tj!d|tj!< |"| qd|iS )Nr   c                    s"   g | ]}t j| jd qS ))object_hook)r   
from_stateunpackbdecode).0statemnpmsgpackr=   r>   
<listcomp>\   s    z6OfflinePolicyPreEvaluator.__call__.<locals>.<listcomp>itemmax_seq_lenn_step   T)	num_itemsbatch_length_TrN   sample_episodesto_numpy)rS   schemainput_compress_columnsepisodesF)rT   rS   rU   observation_spaceaction_space)key)#input_read_episodesrJ   msgpack_numpy_validate_episodesepisode_bufferaddsampleconfigtrain_batch_size_per_learner_moduleis_statefulmodel_configgetinput_read_sample_batchesr   _map_sample_batch_to_episode_is_multi_agentr   input_read_schemarU   _map_to_episodesrW   rX   get_observationsslicelenr   OBSget_actionsACTIONSget_rewardsREWARDSget_extra_model_outputsACTION_LOGPappend)selfr@   rV   episode_dictsepisodeepisode_dictr=   rH   r>   __call__T   sz   




z"OfflinePolicyPreEvaluator.__call__N)r7   r8   r9   r   strnumpyndarrayrz   r=   r=   r=   r>   r?   S   s    *r?   c                   @   s  e Zd Z	dHdddee fddZ		dId	ed
eddfddZdefddZ	d	ed
eddfddZ
eedd Zee	dHdddeeeee f  deeeee f  defddZdefddZdJddZdJddZeedd  Zeed!d" Z			dKd#ed$ed%ed&edef
d'd(Zeed)eddfd*d+Zd,ed-eddfd.d/Zd0ed1efd2d3Z eed4d5 Z!eed6d7 Z"e#de$fd8d9Z%d:d; Z&e#de'fd<d=Z(e#de)fd>d?Z*e#defd@dAZ+e#de,ee-j.j/f fdBdCZ0e#de1fdDdEZ2e#de3fdFdGZ4dS )LOfflinePolicyEvaluationRunnerNr`   r2   module_specc                 K   sh   || _ d | _d | _tj| fd|i| t|  |d| _| jj	| j
| jd| _t| jd | _d S )Nr`   spaces)r   deviceoffline_evaluation_type)+_OfflinePolicyEvaluationRunner__module_spec0_OfflinePolicyEvaluationRunner__dataset_iterator._OfflinePolicyEvaluationRunner__batch_iteratorr+   __init__r   re   &_OfflinePolicyEvaluationRunner__spacesr`   build_env_to_module_connector_spaces_device-_OfflinePolicyEvaluationRunner__env_to_moduler4   7_OfflinePolicyEvaluationRunner__offline_evaluation_type)rv   r`   r   kwargsr=   r=   r>   r      s   


z&OfflinePolicyEvaluationRunner.__init__FTexploretrainrA   c                 K   s   | j d u rt|  d| js| jdi | jj| _| jjt	| j
dd | jt |d u r3| jj}| j||dW  d    S 1 sDw   Y  d S )NzS doesn't have a data iterator. Can't call `run` on `OfflinePolicyEvaluationRunner`.rO   )rY   valuewindow)r   r   r=   )r   
ValueError_batch_iterator_create_batch_iteratorr`   iter_batches_kwargsr   metrics	log_valuer)   _weights_seq_nolog_timer(   r   	_evaluate)rv   r   r   r   r=   r=   r>   run   s*   
$z!OfflinePolicyEvaluationRunner.runc                    s   ddl m  dtttjf dtttttjf f fdd}dtttttjf f dtttttf f f fdd}td
j	||j
jj
jd	|S )Nr   )+convert_ndarray_batch_to_torch_tensor_batch_batchrA   c                 S   s   | d S )NrV   r=   r   r=   r=   r>   _collate_fn   s   zIOfflinePolicyEvaluationRunner._create_batch_iterator.<locals>._collate_fnc                    s    fdd| D S )Nc                    s   g | ]} |j tjd qS ))r   dtypes)r   torchfloat32)rF   rx   r   rv   r=   r>   rK      s    
z^OfflinePolicyEvaluationRunner._create_batch_iterator.<locals>._finalize_fn.<locals>.<listcomp>r=   r   r   r=   r>   _finalize_fn   s   zJOfflinePolicyEvaluationRunner._create_batch_iterator.<locals>._finalize_fn)iterator
collate_fnfinalize_fnminibatch_size	num_itersr=   )ray.air._internal.torch_utilsr   r   r{   r|   r}   r/   r1   r*   _dataset_iteratorr`   "offline_eval_batch_size_per_runner!dataset_num_iters_per_eval_runner)rv   r   r   r   r=   r   r>   r      s(   
z4OfflinePolicyEvaluationRunner._create_batch_iteratorc                 C   s  | j   d}t| jD ]\}}|D ]}| jt  }| jt |tj	 }|
|}	|	 }
|	|
}tj|v r?|tj }n|	|tj }| jtjkrhtt|t| }|tj  }||  }n| jtjkrt|t| }t||tj  }|tj jd d }||7 }| || q| t|| q| j jtt f|d ddd | j jtt!f|d dd | j "  | j # S )Nr   rO   sumT)reduceclear_on_reduce)r   )$r   activate_tensor_mode	enumerater   moduler   get_inference_action_dist_clsforward_inferencer   ACTION_DIST_INPUTSfrom_logitsr_   logprt   rp   r   r4   r;   r   prodexprr   r   rL   r<   dotshape_log_episode_metrics_log_batch_metricsrm   r   r   r   r   deactivate_tensor_moder   )rv   r   r   num_env_steps	iterationtensor_minibatchrx   action_dist_clsaction_logitsaction_distactionsaction_logpbehavior_action_logpweightepisode_returnoffline_returnweightsepisode_lenr=   r=   r>   r     s\   






z'OfflinePolicyEvaluationRunner._evaluatec                 C   s   dd| j ifS )Nr=   r`   )r`   rv   r=   r=   r>   get_ctor_args_and_kwargsT  s   z6OfflinePolicyEvaluationRunner.get_ctor_args_and_kwargs)not_components
componentsr   c                K   sx   t | jjt ddi}| t||r,| jjd| t|| t|d||t< | j|t	< | t
||r:| j |t
< |S )Nr   )default)r   r   r=   )r%   r   peek_check_componentr   r   	get_state_get_subcomponentsr   r)   r   _env_to_module)rv   r   r   r   rG   r=   r=   r>   r   [  s$   	


z'OfflinePolicyEvaluationRunner.get_statec                 C   s   t |S )z0Converts structs to a framework-specific tensor.r,   )rv   structr=   r=   r>   _convert_to_tensory  s   z0OfflinePolicyEvaluationRunner._convert_to_tensorc                 C      dS )zReleases all resources used by this EnvRunner.

        For example, when using a gym.Env in this EnvRunner, you should make sure
        that its `close()` method is called.
        Nr=   r   r=   r=   r>   stop}  s   z"OfflinePolicyEvaluationRunner.stopc                 C   r   )z:If this Actor is deleted, clears all resources used by it.Nr=   r   r=   r=   r>   __del__  s   z%OfflinePolicyEvaluationRunner.__del__c                 C   s   | j rt| ds
J dS )a  Checks that self.__init__() has been completed properly.

        Ensures that the instances has a `MultiRLModule` and an
        environment defined.

        Raises:
            AssertionError: If the EnvRunner Actor has NOT been properly initialized.
        r   N)r   hasattrr   r=   r=   r>   assert_healthy  s   z,OfflinePolicyEvaluationRunner.assert_healthyc                 C   s
   | j  S N)r   r   r   r=   r=   r>   get_metrics     
z)OfflinePolicyEvaluationRunner.get_metricsr@   	to_device
pin_memory
use_streamc                 C   sB   t |j|r| jnd ||d}tdd | D }t||d}|S )N)r   r   r   c                 s   s    | ]}t |V  qd S r   )rm   )rF   br=   r=   r>   	<genexpr>  s    zDOfflinePolicyEvaluationRunner._convert_batch_type.<locals>.<genexpr>)	env_steps)r-   policy_batchesr   maxvaluesr   )rv   r@   r   r   r   lengthr=   r=   r>   _convert_batch_type  s   z1OfflinePolicyEvaluationRunner._convert_batch_typerG   c                 C   s   t |v r| j|t   t|v r;|td}|dks| j|k r4|t }t|tj	r.t|}| j
| |dkr;|| _t|v rM| jjt|t ddd d S d S )Nr   r   TrY   r   r   with_throughput)r   r   	set_stater   re   r)   r   
isinstanceray	ObjectRefr   r%   r   	set_value)rv   rG   weights_seq_norl_module_stater=   r=   r>   r     s&   

z'OfflinePolicyEvaluationRunner.set_stater   r   c                 C   s   t dtt| jj| jjpd }| jjt	||d | jjt
||d | jjdtf||d | jjdtf||d | jjt|d|d | jjt|d|d | jjt|d|d | jjt|d|d dS )	z&Logs episode metrics for each episode.rO   r   agent_episode_return_meanmodule_episode_return_meanmin)r   r   r   N)r   intmathceilr`   "metrics_num_episodes_for_smoothingnum_offline_eval_runnersr   r   r   r!   r   r   r   r"   r   r    )rv   r   r   winr=   r=   r>   r     s8   
	

z2OfflinePolicyEvaluationRunner._log_episode_metrics
batch_sizer   c                 C   s   | j jttf| jdd | j jttf|d | j jttf|ddd | j jttf|dd | j jttf|ddd | j jttf|dd | j jtt	f|ddd | j jtt
f|ddd d	S )
z'Logs batch metrics for each mini batch.rO   r   )rY   r   r   T)rY   r   r   r   )rY   r   r   r   N)r   r   r   r)   r   r#   r&   r'   r   r$   r%   )rv   r   r   r=   r=   r>   r     sV   
z0OfflinePolicyEvaluationRunner._log_batch_metricsc                 C   s>   zt | j| js	dn| jj| _W d S  ty   d | _Y d S w )Nr   )r   r`   worker_index num_gpus_per_offline_eval_runner&_OfflinePolicyEvaluationRunner__deviceNotImplementedErrorr   r=   r=   r>   
set_device*  s   z(OfflinePolicyEvaluationRunner.set_devicec                    s   z2ddl m}  js  jj jj| jj jjfi jjd _	 j
  _ j fdd W d S  ty?   d  _Y d S w )Nr   )INPUT_ENV_SPACES)envr   inference_onlyc                    s   t |tjjr| jS |S r   )r   r   nnModuletor   )midmodr   r=   r>   <lambda>P  s   z;OfflinePolicyEvaluationRunner.make_module.<locals>.<lambda>)ray.rllib.envr  _module_specr`   get_multi_rl_module_specr  rW   rX   %offline_eval_rl_module_inference_onlyr   buildr   foreach_moduler  )rv   r  r=   r   r>   make_module6  s&   

z)OfflinePolicyEvaluationRunner.make_modulec                 C      | j S )zReturns the dataset iterator.r   r   r=   r=   r>   r   Z     z/OfflinePolicyEvaluationRunner._dataset_iteratorc                 C   s
   || _ dS )zSets the dataset iterator.Nr  )rv   r   r=   r=   r>   set_dataset_iterator_  r   z2OfflinePolicyEvaluationRunner.set_dataset_iteratorc                 C   r  r   )r   r   r=   r=   r>   r   c     z-OfflinePolicyEvaluationRunner._batch_iteratorc                 C   r  r   )r  r   r=   r=   r>   r   g  r  z%OfflinePolicyEvaluationRunner._devicec                 C   r  )z1Returns the `MultiRLModuleSpec` of this `Runner`.)r   r   r=   r=   r>   r  k  r  z*OfflinePolicyEvaluationRunner._module_specc                 C   r  )z$Returns the spaces of thsi `Runner`.)r   r   r=   r=   r>   r   p  r  z%OfflinePolicyEvaluationRunner._spacesc                 C   r  )z4Returns the env-to-module pipeline of this `Runner`.)r   r   r=   r=   r>   r   u  r  z,OfflinePolicyEvaluationRunner._env_to_modulec                 C   r  )z5Returns the offline evaluation type of this `Runner`.)r   r   r=   r=   r>   _offline_evaluation_typez  r  z6OfflinePolicyEvaluationRunner._offline_evaluation_typer   )FT)rA   N)TFF)5r7   r8   r9   r   r   r   boolr   r   r   r   r   r   r   r	   r{   r   r0   r   r1   r   r   r   r+   r   r   r   r   r   r   floatr   r   r  r  propertyr
   r   r  r*   r   r.   r   r  r   gymr   Spacer   r   r   r   r  r=   r=   r=   r>   r~      s    

#%
E





 (5

#r~   )O	gymnasiumr  r   r|   r   enumr   typingr   r   r   r   r   r   r	   ray.data.iteratorr
   "ray.rllib.connectors.env_to_moduler   ray.rllib.corer   r   r   r   r   ray.rllib.core.columnsr   (ray.rllib.core.rl_module.multi_rl_moduler   "ray.rllib.env.single_agent_episoder   $ray.rllib.offline.offline_prelearnerr   r   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   ray.rllib.utils.checkpointsr   ray.rllib.utils.frameworkr   r   ray.rllib.utils.metricsr   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   ray.rllib.utils.minibatch_utilsr*   ray.rllib.utils.runners.runnerr+   ray.rllib.utils.torch_utilsr-   ray.rllib.utils.typingr.   r/   r0   r1   %ray.rllib.algorithms.algorithm_configr2   r   _TOTAL_EVAL_LOSS_KEYr{   r4   r?   r~   r=   r=   r=   r>   <module>   s:    $
D
Y