o
    ciC                     @   s\  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3m4Z4m5Z5m6Z6 e	rd dl7m8Z8 e \Z9Z:dZ;G dd de/eZ<dS )    N)Any
CollectionDictIterableOptionalTYPE_CHECKINGUnion)DataIterator)ALL_MODULESCOMPONENT_RL_MODULE)SelfSupervisedLossAPI)MultiRLModuleSpec)MultiAgentBatchSampleBatch)unflatten_dict)override)Checkpointable)
get_devicetry_import_torch)	DATASET_NUM_ITERS_EVALUATED$DATASET_NUM_ITERS_EVALUATED_LIFETIMEMODULE_SAMPLE_BATCH_SIZE_MEANNUM_ENV_STEPS_SAMPLEDNUM_ENV_STEPS_SAMPLED_LIFETIMENUM_MODULE_STEPS_SAMPLED!NUM_MODULE_STEPS_SAMPLED_LIFETIMEOFFLINE_SAMPLING_TIMERWEIGHTS_SEQ_NO)MiniBatchRayDataIterator)convert_to_numpy)Runnerconvert_to_torch_tensor)
DeviceTypeModuleID	StateDict
TensorType)AlgorithmConfigtotal_eval_lossc                   @   s>  e Zd Z	dDdddee fddZee		dEd	ed
eddfddZ	de
fddZd	ed
eddfddZeedd Zee	dDdddeeeee f  deeeee f  defddZdefddZeedFddZeedFddZeedd  Zeed!d" Z			dGd#ed$ed%ed&edef
d'd(Zd)eeef d#eeef deeef fd*d+Zd,eddd#eeef d)eeef def
d-d.Z eed/eddfd0d1Z!d#eddfd2d3Z"eed4d5 Z#eed6d7 Z$d8d9 Z%e&de'fd:d;Z(d<d= Z)e&de*fd>d?Z+e&de,fd@dAZ-e&defdBdCZ.dS )HOfflineEvaluationRunnerNconfigr'   module_specc                 K   sH   || _ d | _d | _tj| fd|i| t|  t|  | | _	d S )Nr*   )
%_OfflineEvaluationRunner__module_spec*_OfflineEvaluationRunner__dataset_iterator(_OfflineEvaluationRunner__batch_iteratorr    __init__r   types
MethodTypeget_loss_for_module_fn_loss_for_module_fn)selfr*   r+   kwargs r6   _/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/offline_evaluation_runner.pyr/   -   s   

z OfflineEvaluationRunner.__init__FTexploretrainreturnc                 K   s   | j d u rt|  d| js| jdi | jj| _| jjt	| j
dd | jt |d u r3| jj}| j||dW  d    S 1 sDw   Y  d S )NzM doesn't have a data iterator. Can't call `run` on `OfflineEvaluationRunner`.   keyvaluewindow)r8   r9   r6   )r-   
ValueError_batch_iterator_create_batch_iteratorr*   iter_batches_kwargsr.   metrics	log_valuer   _weights_seq_nolog_timer   r8   	_evaluate)r4   r8   r9   r5   r6   r6   r7   runA   s*   
$zOfflineEvaluationRunner.runc                    sZ   dt ttjf dtf fdd}dtdtf fdd}td	 j|| jj jj	d|S )
N_batchr:   c                    sF   t | } tdd |  D tdd |  D d}  j| dd} | S )Nc                 S   s   i | ]	\}}|t |qS r6   )r   ).0	module_idmodule_datar6   r6   r7   
<dictcomp>l   s    zWOfflineEvaluationRunner._create_batch_iterator.<locals>._collate_fn.<locals>.<dictcomp>c                 s   s$    | ]}t tt| V  qd S N)lennextitervalues)rK   rM   r6   r6   r7   	<genexpr>p   s
    
zVOfflineEvaluationRunner._create_batch_iterator.<locals>._collate_fn.<locals>.<genexpr>	env_stepsF)	to_device)r   r   itemssumrS   _convert_batch_type)rJ   r4   r6   r7   _collate_fni   s   
zCOfflineEvaluationRunner._create_batch_iterator.<locals>._collate_fnbatchc                    s    j | dddS )NT)rW   
use_stream)rZ   )r]   r[   r6   r7   _finalize_fny   s   zDOfflineEvaluationRunner._create_batch_iterator.<locals>._finalize_fn)iterator
collate_fnfinalize_fnminibatch_size	num_itersr6   )
r   strnumpyndarrayr   r   _dataset_iteratorr*   "offline_eval_batch_size_per_runner!dataset_num_iters_per_eval_runner)r4   r5   r\   r_   r6   r[   r7   rB   e   s    z.OfflineEvaluationRunner._create_batch_iteratorc           
      C   s  | j   t| jD ]D\}}t|j t| j  }|r&td| d|r0| j	|j}n|r:| j
|j}n| j|j}| j||jd}| | q
| j jttf|d ddd | j jttf|d dd t| D ]\}}	| j j|tf|	dd	 qp| j   | j  S )
Nz&Batch contains one or more ModuleIDs (z) that are not in this Learner!)fwd_outr]   r;   rY   Treduceclear_on_reduce)rm   r<   )rD   activate_tensor_mode	enumeraterA   setpolicy_batcheskeysmoduler@   forward_explorationforward_trainforward_inferencecompute_eval_losses_log_steps_evaluated_metricsrE   r
   r   r   r   rX   TOTAL_EVAL_LOSS_KEYdeactivate_tensor_moderm   )
r4   r8   r9   	iterationtensor_minibatchunknown_module_idsrk   eval_loss_per_modulemidlossr6   r6   r7   rH      sN   



z!OfflineEvaluationRunner._evaluatec                 C   s   dd| j ifS )Nr6   r*   )r*   r[   r6   r6   r7   get_ctor_args_and_kwargs   s   z0OfflineEvaluationRunner.get_ctor_args_and_kwargs)not_components
componentsr   c                K   s\   t | jjt ddi}| t||r,| jjd| t|| t|d||t< | j|t	< |S )Nr   )default)r   r   r6   )
r   rD   peek_check_componentr   rt   	get_state_get_subcomponentsrF   r   )r4   r   r   r5   stater6   r6   r7   r      s   	


z!OfflineEvaluationRunner.get_statec                 C   s   t |S )z0Converts structs to a framework-specific tensor.r!   )r4   structr6   r6   r7   _convert_to_tensor   s   z*OfflineEvaluationRunner._convert_to_tensorc                 C      dS )zReleases all resources used by this EnvRunner.

        For example, when using a gym.Env in this EnvRunner, you should make sure
        that its `close()` method is called.
        Nr6   r[   r6   r6   r7   stop   s   zOfflineEvaluationRunner.stopc                 C   r   )z:If this Actor is deleted, clears all resources used by it.Nr6   r[   r6   r6   r7   __del__   s   zOfflineEvaluationRunner.__del__c                 C   s   | j rt| ds
J dS )a  Checks that self.__init__() has been completed properly.

        Ensures that the instances has a `MultiRLModule` and an
        environment defined.

        Raises:
            AssertionError: If the EnvRunner Actor has NOT been properly initialized.
        rt   N)rh   hasattrr[   r6   r6   r7   assert_healthy   s   z&OfflineEvaluationRunner.assert_healthyc                 C   s
   | j  S rO   )rD   rm   r[   r6   r6   r7   get_metrics     
z#OfflineEvaluationRunner.get_metricsr]   rW   
pin_memoryr^   c                 C   sB   t |j|r| jnd ||d}tdd | D }t||d}|S )N)devicer   r^   c                 s   s    | ]}t |V  qd S rO   )rP   )rK   br6   r6   r7   rT     s    z>OfflineEvaluationRunner._convert_batch_type.<locals>.<genexpr>rU   )r"   rr   _devicemaxrS   r   )r4   r]   rW   r   r^   lengthr6   r6   r7   rZ     s   z+OfflineEvaluationRunner._convert_batch_typerk   c          	      C   sz   i }|D ]6}|| }|| }| j |  }t|tr)|j| || j|||d}n| j|| j|||d}|||< q|S )N)learnerrL   r*   r]   rk   rL   r*   r]   rk   )rt   	unwrapped
isinstancer   compute_self_supervised_lossr*   get_config_for_modulecompute_eval_loss_for_module)	r4   rk   r]   loss_per_modulerL   module_batchmodule_fwd_outrt   r   r6   r6   r7   rx     s*   



z+OfflineEvaluationRunner.compute_eval_lossesrL   c                C   s   | j ||||dS )Nr   )r3   )r4   rL   r*   r]   rk   r6   r6   r7   r   7  s   	z4OfflineEvaluationRunner.compute_eval_loss_for_moduler   c                 C   s   t |v r/|td}|dks| j|k r(|t  }t|tjr"t|}| j| |dkr/|| _t	|v rA| j
jt	|t	 ddd d S d S )Nr   rY   T)r=   r>   rm   with_throughput)r   getr   rF   r   ray	ObjectRefrt   	set_stater   rD   	set_value)r4   r   weights_seq_norl_module_stater6   r6   r7   r   G  s"   

z!OfflineEvaluationRunner.set_statec                 C   s   |j  D ]L\}}| jj|tf| jdd t|}| jj|tf|d | jj|tf|ddd | jj|t	f|dd | jjt
tf|ddd | jjt
t	f|dd q| jjt
tf| ddd | jjt
tf| ddd	 d S )
Nr;   )r?   )r=   r>   rY   T)r=   r>   rm   rn   )r=   r>   rm   rl   )rm   r   )rr   rX   rD   rE   r   rF   rP   r   r   r   r
   r   rV   r   )r4   r]   r   r   module_batch_sizer6   r6   r7   ry   f  sZ   
z4OfflineEvaluationRunner._log_steps_evaluated_metricsc                 C   s>   zt | j| js	dn| jj| _W d S  ty   d | _Y d S w )Nr   )r   r*   worker_index num_gpus_per_offline_eval_runner _OfflineEvaluationRunner__deviceNotImplementedErrorr[   r6   r6   r7   
set_device  s   z"OfflineEvaluationRunner.set_devicec                    s   z2ddl m}  js  jj jj| jj jjfi jjd _	 j
  _ j fdd W d S  ty?   d  _Y d S w )Nr   )INPUT_ENV_SPACES)envspacesinference_onlyc                    s   t |tjjr| jS |S rO   )r   torchnnModuletor   )r   modr[   r6   r7   <lambda>  s   z5OfflineEvaluationRunner.make_module.<locals>.<lambda>)ray.rllib.envr   _module_specr*   get_multi_rl_module_specr   observation_spaceaction_space%offline_eval_rl_module_inference_onlyr,   buildrt   foreach_moduler   )r4   r   r6   r[   r7   make_module  s&   

z#OfflineEvaluationRunner.make_modulec                 C   s   | j jp| j  jd S )Ncompute_loss_for_module)r*   offline_loss_for_module_fnget_default_learner_class__dict__r[   r6   r6   r7   r2     s
   
z.OfflineEvaluationRunner.get_loss_for_module_fnc                 C      | j S )zReturns the dataset iterator.r-   r[   r6   r6   r7   rh        z)OfflineEvaluationRunner._dataset_iteratorc                 C   s
   || _ dS )zSets the dataset iterator.Nr   )r4   r`   r6   r6   r7   set_dataset_iterator  r   z,OfflineEvaluationRunner.set_dataset_iteratorc                 C   r   rO   )r.   r[   r6   r6   r7   rA        z'OfflineEvaluationRunner._batch_iteratorc                 C   r   rO   )r   r[   r6   r6   r7   r     r   zOfflineEvaluationRunner._devicec                 C   r   )z1Returns the `MultiRLModuleSpec` of this `Runner`.)r,   r[   r6   r6   r7   r     r   z$OfflineEvaluationRunner._module_specrO   )FT)r:   N)TFF)/__name__
__module____qualname__r   r   r/   r   r    boolrI   r   rB   rH   r   r   r   re   r   r%   r   r&   r   r   r   r   r   r   rZ   r   r   rx   r$   r   r   ry   r   r   r2   propertyr	   rh   r   r   rA   r#   r   r   r6   r6   r6   r7   r)   ,   s    
#!
>










5

#
r)   )=rf   r   r0   typingr   r   r   r   r   r   r   ray.data.iteratorr	   ray.rllib.corer
   r   ray.rllib.core.rl_module.apisr   (ray.rllib.core.rl_module.multi_rl_moduler   ray.rllib.policy.sample_batchr   r   ray.rllib.utilsr   ray.rllib.utils.annotationsr   ray.rllib.utils.checkpointsr   ray.rllib.utils.frameworkr   r   ray.rllib.utils.metricsr   r   r   r   r   r   r   r   r   ray.rllib.utils.minibatch_utilsr   ray.rllib.utils.numpyr   ray.rllib.utils.runners.runnerr    ray.rllib.utils.torch_utilsr"   ray.rllib.utils.typingr#   r$   r%   r&   %ray.rllib.algorithms.algorithm_configr'   r   _rz   r)   r6   r6   r6   r7   <module>   s0    $,
