o
    ciN                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlZd dlmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZBmCZC d dlDmEZEmFZFmGZGmHZHmIZImJZJmKZK e4 \ZLZMeNeOZPe,G dd de!ZQdS )    N)AnyDictListOptionalSetTupleTypeUnion)version)ModelCatalog)ModelV2)TorchDistributionWrapper)TorchModelV2)Policy)#pad_batch_to_sequences_of_same_size)SampleBatch)_directStepOptimizerSingleton)NullContextManager
force_list)OldAPIStackOverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommendedis_overriddenoverride)&ERR_MSG_TORCH_POLICY_CANNOT_SAVE_MODEL)try_import_torch)'DIFF_NUM_GRAD_UPDATES_VS_SAMPLER_POLICYNUM_AGENT_STEPS_TRAINEDNUM_GRAD_UPDATES_LIFETIMELEARNER_STATS_KEY)convert_to_numpy)normalize_action)	with_lock)convert_to_torch_tensorTORCH_COMPILE_REQUIRED_VERSION)AlgorithmConfigDictGradInfoDictModelGradientsModelWeightsPolicyStateTensorStructType
TensorTypec                       sl  e Zd ZdZdddejjdejjdedef fdd	Z	d
d Z
eeededee dedeeee f fddZededededeeeeee f fddZededededeeeee f fddZedefddZedeeee f fddZedefddZededeeef fddZe d d!d"edeeef fd#d$Z!e deee"f fd%d&Z#e d'eeef dee de$d(edeeef f
d)d*Z%eee 	+	+dvd,ed-e&ee"ef  defd.d/Z'edeed! d!f fd0d1Z(d2d3 Z)ee	+	+dvd'eeef d4e*d5e&e deeee eeef f fd6d7Z+ee	+	+	+	+	+	+	+dwdeee, e,f de&ee  d8eee, e,f d9eee, e,f d:e&eee-f  d4e&e* d5e&e dee,ee eeef f fd;d<Z.e/ee	+	+	+	=	=dxd>eee, e,f deee, e,f de&ee  d8e&eee, e,f  d9e&eee, e,f  d?e*d@e*defdAdBZ0e/eedCedeeef fdDdEZ1ee	FdydGedHedefdIdJZ2eedydHedefdKdLZ3eedzdMedHefdNdOZ4e/eedCede5fdPdQZ6eedRe5dd+fdSdTZ7dUedee, fdVdWZ8eede9fdXdYZ:eedZe9dd+fd[d\Z;eede*fd]d^Z<eedefd_d`Z=eedee fdadbZ>eee de?f fdcddZ@eee dee?dd+f fdfdgZAeed{dhedie&e dd+fdjdkZBeedledd+fdmdnZCe/dodp ZDd{dCefdqdrZEdsee deeee eFf  fdtduZG  ZHS )|TorchPolicyV2z0PyTorch specific Policy class to use with RLlib.   )max_seq_lenobservation_spaceaction_spaceconfigr/   c                   s&  d _ |d< d_t |||  \ } tttj	
 }tdt| d |d s:dks:|sztd_fd	d
tttpNdD _ fdd
tttpadD _tdrvfddjD _ _nrtjj tjjjkrt }t|k rtd| d dfdd
t|D _jd _fdd
t|D }g _t|D ]\}}	t  }
j!|
"j|  qtdrfddtjD _jd _|_# _$t%& _'j( _)tt*+j)dk_,-  j./jj. 0 _1t23 _4d_5g _6dd tj7 D }j4D ]'}g }t|j8D ]\}}|d D ]
}|!||  qFq>j6!t9| q5j:;dd}dd
 t|D _<d_=> _?|_@i _AtjdsjD ]
 i jA < qdS dS )a  Initializes a TorchPolicy instance.

        Args:
            observation_space: Observation space of the policy.
            action_space: Action space of the policy.
            config: The Policy's config dict.
            max_seq_len: Max sequence length for LSTM training.
        torch	frameworkFzFound z visible cuda devices.
_fake_gpusr   cpuc                    s   g | ]} j qS  device.0_selfr7   T/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/policy/torch_policy_v2.py
<listcomp>q   s    z*TorchPolicyV2.__init__.<locals>.<listcomp>   c                    s"   g | ]}|d kr
 nt  qS r   )copydeepcopy)r;   i)modelr7   r?   r@   r   s    target_modelc                    s   i | ]}| j qS r7   )rG   )r;   mr=   r7   r?   
<dictcomp>w   s    z*TorchPolicyV2.__init__.<locals>.<dictcomp>z7TorchPolicy was not able to find enough GPU IDs! Found z, but num_gpus=.c                    s(   g | ]\}}| k rt d |qS )zcuda:{})r3   r9   formatr;   rE   id_num_gpusr7   r?   r@      s
    c                    s   g | ]
\}}| k r|qS r7   r7   rL   rN   r7   r?   r@          c                    s*   i | ]\}}|t  j j| qS r7   )rC   rD   rG   todevices)r;   rE   rH   r=   r7   r?   rI      s    Nc                 S   s   i | ]\}}||qS r7   r7   )r;   rE   pr7   r7   r?   rI      s    paramsnum_multi_gpu_tower_stacksc                 S   s   g | ]}g qS r7   r7   r:   r7   r7   r?   r@          tower_stats)Br4   _loss_initializedsuper__init___init_model_and_dist_class_get_num_gpus_for_policylistranger3   cudadevice_countloggerinfolenr9   intmathceilrR   model_gpu_towershasattrtarget_modelsrF   ray_privateworker_modeWORKER_MODEget_gpu_ids
ValueError	enumeraterC   rD   appendrQ   
dist_classunwrapped_model	threadingRLock_lockget_initial_state_state_inputstreeflatten_is_recurrent/_update_model_view_requirements_from_init_stateview_requirementsupdate_create_explorationexplorationr   	optimizer_optimizers_lossmulti_gpu_param_groups
parametersparam_groupssetr2   get_loaded_batchesdistributed_world_sizeget_batch_divisibility_reqbatch_divisibility_reqr/   rW   )r>   r0   r1   r2   r/   rs   gpu_idsidsrE   r<   
model_copymain_paramsoparam_indicespg_idxpgrS   num_buffers	__class__)rF   rO   r>   r?   rZ   B   s   &


	








zTorchPolicyV2.__init__c                 C      | j S N)rX   r=   r7   r7   r?   loss_initialized   s   zTorchPolicyV2.loss_initializedrF   rs   train_batchreturnc                 C   s   t )a  Constructs the loss function.

        Args:
            model: The Model to calculate the loss for.
            dist_class: The action distr. class.
            train_batch: The training data.

        Returns:
            Loss tensor given the input batch.
        )NotImplementedError)r>   rF   rs   r   r7   r7   r?   loss   s   zTorchPolicyV2.loss	obs_batchstate_batchesc                K      dS )ae  Custom function for sampling new actions given policy.

        Args:
            model: Underlying model.
            obs_batch: Observation tensor batch.
            state_batches: Action sampling state batch.

        Returns:
            Sampled action
            Log-likelihood
            Action distribution inputs
            Updated state
        )NNNNr7   r>   rF   r   r   kwargsr7   r7   r?   action_sampler_fn   s   zTorchPolicyV2.action_sampler_fnc                K   r   )aC  Action distribution function for this Policy.

        Args:
            model: Underlying model.
            obs_batch: Observation tensor batch.
            state_batches: Action sampling state batch.

        Returns:
            Distribution input.
            ActionDistribution class.
            State outs.
        )NNNr7   r   r7   r7   r?   action_distribution_fn     z$TorchPolicyV2.action_distribution_fnc                 C   r   )zCreate model.

        Note: only one of make_model or make_model_and_action_dist
        can be overridden.

        Returns:
            ModelV2 model.
        Nr7   r=   r7   r7   r?   
make_model     
zTorchPolicyV2.make_modelc                 C   r   )zCreate model and action distribution function.

        Returns:
            ModelV2 model.
            ActionDistribution class.
        NNr7   r=   r7   r7   r?   make_model_and_action_dist(  r   z(TorchPolicyV2.make_model_and_action_distc                 C   r   )zrGet batch divisibility request.

        Returns:
            Size N. A sample batch must be of size K*N.
        rA   r7   r=   r7   r7   r?   r   4  s   z(TorchPolicyV2.get_batch_divisibility_reqc                 C      i S )zStats function. Returns a dict of statistics.

        Args:
            train_batch: The SampleBatch (already) used for training.

        Returns:
            The stats dict.
        r7   )r>   r   r7   r7   r?   stats_fn>  r   zTorchPolicyV2.stats_fnr   ztorch.optim.Optimizerr   c                 C   r   )a  Called after each optimizer.zero_grad() + loss.backward() call.

        Called for each self._optimizers/loss-value pair.
        Allows for gradient processing before optimizer.step() is called.
        E.g. for gradient clipping.

        Args:
            optimizer: A torch optimizer object.
            loss: The loss tensor associated with the optimizer.

        Returns:
            An dict with information on the gradient processing step.
        r7   )r>   r   r   r7   r7   r?   extra_grad_processJ  s   z TorchPolicyV2.extra_grad_processc                 C   s   t i iS )zExtra values to fetch and return from compute_gradients().

        Returns:
            Extra fetch dict to be added to the fetch dict of the
            `compute_gradients` call.
        r   r=   r7   r7   r?   extra_compute_grad_fetches]  s   z(TorchPolicyV2.extra_compute_grad_fetches
input_dictaction_distc                 C   r   )a  Returns dict of extra info to include in experience batch.

        Args:
            input_dict: Dict of model input tensors.
            state_batches: List of state tensors.
            model: Reference to the model object.
            action_dist: Torch action dist object
                to get log-probs (e.g. for already sampled actions).

        Returns:
            Extra outputs to return in a `compute_actions_from_input_dict()`
            call (3rd return value).
        r7   )r>   r   r   rF   r   r7   r7   r?   extra_action_outg  r   zTorchPolicyV2.extra_action_outNsample_batchother_agent_batchesc                 C   s   |S )aW  Postprocesses a trajectory and returns the processed trajectory.

        The trajectory contains only data from one episode and from one agent.
        - If  `config.batch_mode=truncate_episodes` (default), sample_batch may
        contain a truncated (at-the-end) episode, in case the
        `config.rollout_fragment_length` was reached by the sampler.
        - If `config.batch_mode=complete_episodes`, sample_batch will contain
        exactly one episode (no matter how long).
        New columns can be added to sample_batch and existing ones may be altered.

        Args:
            sample_batch: The SampleBatch to postprocess.
            other_agent_batches (Optional[Dict[PolicyID, SampleBatch]]): Optional
                dict of AgentIDs mapping to other agents' trajectory data (from the
                same episode). NOTE: The other agents use the same policy.
            episode (Optional[Episode]): Optional multi-agent episode
                object in which the agents operated.

        Returns:
            SampleBatch: The postprocessed, modified SampleBatch (or a new one).
        r7   )r>   r   r   episoder7   r7   r?   postprocess_trajectory~  s   z$TorchPolicyV2.postprocess_trajectoryc                 C   sT   t | drtjj| j | jd dg}n
tj| j g}| jr(| j|}|S )zCustom the local PyTorch optimizer(s) to use.

        Returns:
            The local PyTorch optimizer(s) to use for this Policy.
        r2   lr)r   )	rh   r3   optimAdamrF   r   r2   r   get_exploration_optimizer)r>   
optimizersr7   r7   r?   r     s   
	zTorchPolicyV2.optimizerc              	   C   s   t | jrt | jrtdt | jr'|  }tj| j| jd | jd\}}n+t | jr3|  \}}ntj| j| jd | jd\}}tj	| j
| j|| jd | jd}| jdrtd urhttjtk rhtd| jdrpdnd	}tj|| jd
| ddd| jd
| dd}||fS )NzGOnly one of make_model or make_model_and_action_dist can be overridden.rF   )r4   )	obs_spacer1   num_outputsmodel_configr4   torch_compile_learnerz3`torch.compile` is not supported for torch < 2.0.0!worker_indexlearnerrl   torch_compile__dynamo_backendinductorF_dynamo_mode)backenddynamicmode)r   r   r   rp   r   get_action_distr1   r2   r4   get_model_v2r0   r   r3   r
   parse__version__r%   compile)r>   rF   rs   r<   	logit_dimlwr7   r7   r?   r[     sJ   


	z(TorchPolicyV2._init_model_and_dist_classexploretimestepc                    s   d }t  ; |    d  fdd  D }|r1t jdgt|d  t j|d jd}| 	 ||||W  d    S 1 sDw   Y  d S )NTc                    s$   g | ]}d |dd v r | qS )state_inN   r7   r;   kr   r7   r?   r@     s    zATorchPolicyV2.compute_actions_from_input_dict.<locals>.<listcomp>rA   r   )dtyper9   )
r3   no_grad_lazy_tensor_dictset_trainingkeystensorrc   longr9   _compute_action_helper)r>   r   r   r   r   seq_lensr   r7   r   r?   compute_actions_from_input_dict  s"   	




$z-TorchPolicyV2.compute_actions_from_input_dictprev_action_batchprev_reward_batch
info_batchc	                    s   t  H t jt|t jd}
 tj|ddi}|d ur%t	||tj
< |d ur1t	||tj<  fdd|p9g D } |||
||W  d    S 1 sOw   Y  d S )Nr   is_trainingFc                       g | ]}t | jqS r7   r$   r9   r;   sr=   r7   r?   r@         z1TorchPolicyV2.compute_actions.<locals>.<listcomp>)r3   r   onesrc   int32r   r   CUR_OBSnpasarrayPREV_ACTIONSPREV_REWARDSr   )r>   r   r   r   r   r   episodesr   r   r   r   r   r7   r=   r?   compute_actions  s$   


$zTorchPolicyV2.compute_actionsTactionsactions_normalizedin_trainingc              	      sL  t  jrt  jstdt   tj|tj	|i}|d ur'||tj
< |d ur0||tj< tjt|tjd}	 fdd|pBg D } jrO jjdd t  jrj j j|||	ddd\}
}}||
 j}n j} |||	\}
}||
 j}|tj	 }|s jd rt| j}||}|W  d    S 1 sw   Y  d S )	NzfCannot compute log-prob/likelihood w/o an `action_distribution_fn` and a provided `action_sampler_fn`!r   c                    r   r7   r   r   r=   r7   r?   r@   A  r   z9TorchPolicyV2.compute_log_likelihoods.<locals>.<listcomp>F)r   )r   r   r   r   r   normalize_actions)r   r   r   rp   r3   r   r   r   r   ACTIONSr   r   r   rc   r   r   before_compute_actionsrF   rs   r2   r"   action_space_structlogp)r>   r   r   r   r   r   r   r   r   r   dist_inputsrs   	state_outr   r<   log_likelihoodsr7   r=   r?   compute_log_likelihoods  sN   






$z%TorchPolicyV2.compute_log_likelihoodspostprocessed_batchc                 C   s   | j r| j   i }| jj| ||d | |\}}| t |  jd7  _| j r7t| j dr7| j 	 |d< ni |d< |
d|t|jt| jt| jd |jpNd i |S )Npolicyr   resultrA   metricsrF   custom_metricsr   )rF   train	callbackson_learn_on_batchcompute_gradientsapply_gradientsr   num_grad_updatesrh   r  r   r   countr   r   )r>   r   learn_statsgradsfetchesr7   r7   r?   learn_on_batchd  s2   

zTorchPolicyV2.learn_on_batchr   batchbuffer_indexc              
      s   | d t jdkr7 jd jdkr7|dksJ t| jd j jddd  | |g j	d< t|S |j
t jd}|D ]}t| jd j jddd qB fd	d
t|D }| j	|< t|d S )NTrA   r   r6   Fzeror  r/   shuffler   r~   _enable_new_api_stackpadding)
num_slicesc                    s    g | ]\}}|  j| qS r7   )	to_devicerR   )r;   rE   slicer=   r7   r?   r@          z8TorchPolicyV2.load_batch_into_buffer.<locals>.<listcomp>)r   rc   rR   typer   r/   r   r~   r   r   
timeslicesrq   )r>   r  r  slicesr  r7   r=   r?   load_batch_into_buffer  s:   

	
z$TorchPolicyV2.load_batch_into_bufferc                 C   s@   t | jdkr| jd dkr|dksJ tdd | j| D S )NrA   r   z/cpu:0c                 s       | ]}t |V  qd S r   rc   r;   br7   r7   r?   	<genexpr>      zCTorchPolicyV2.get_num_samples_loaded_into_buffer.<locals>.<genexpr>)rc   rR   sumr   )r>   r  r7   r7   r?   "get_num_samples_loaded_into_buffer  s   z0TorchPolicyV2.get_num_samples_loaded_into_bufferoffsetc                    s  j | s	tdjd  d u rjdjd   tj  jr1jD ]}|  q*tjdkrkjd jdkrk|dksFJ  tj d d krYj d d }nj d d    }	|S tjdkrj
 }jd j
u sJ jdd  D ]}|| q tdd	 j | D krj | }n fd
dj | D }i }t|D ]\}i }	jj||	d d|	i|d < q|}
g }tt|
d d D ](|
d d  d ur|tjtfdd|
D dd q|d  qtj
 D ]
\}| |_qt  jd7  _ttj|D ](\\}}|d  t|d|  t!jt"jd |jpSd i q0|#  |S )NzPMust call Policy.load_batch_into_buffer() before Policy.learn_on_loaded_batch()!minibatch_sizesgd_minibatch_sizetrain_batch_sizerA   r   r6   c                 s   r   r   r!  r   r7   r7   r?   r$    r%  z6TorchPolicyV2.learn_on_loaded_batch.<locals>.<genexpr>c                    s   g | ]
}|   qS r7   r7   r"  )device_batch_sizer(  r7   r?   r@     s    z7TorchPolicyV2.learn_on_loaded_batch.<locals>.<listcomp>r  r  tower_c                    s    g | ]}|d     jqS rB   rQ   r9   )r;   t)rE   r>   r7   r?   r@     r  )dimrF   )$r   rp   r2   r   rc   rR   rg   r  r  r  rF   
state_dictload_state_dictr&  rq   r  r  _multi_gpu_parallel_grad_calcr^   rr   r3   meanstackr   gradr
  r   r  zipr   r    r   r  r   r   r   )r>   r(  r  r/  r  r1  towerdevice_batchesbatch_fetchesr  tower_outputs	all_gradsrS   rF   r7   )r,  rE   r(  r>   r?   learn_on_loaded_batch  s~   







z#TorchPolicyV2.learn_on_loaded_batchc              	   C   s   t | jdks	J |jst|| jd| j| jddd |d | j|| jd d | 	|g}|d \}}|d  t | j
  < || | |  }|t|fi t|ifS )	NrA   Fr  r  Tr   r8   allreduce_latency)rc   rR   zero_paddedr   r/   r   r~   r   r   r3  r   r   r   r   dictr    )r>   r   r;  r<  	grad_infor  r7   r7   r?   r	  -  s&   

zTorchPolicyV2.compute_gradients	gradientsc                 C   s   |t krt| jD ]\}}|  q	d S t| jdksJ t|| j D ]\}}|d urDt	|r:|
| j|_q%t|
| j|_q%| jd   d S )NrA   r   )r   rq   r   steprc   r7  rF   r   r3   	is_tensorrQ   r9   r6  
from_numpy)r>   rB  rE   optgrS   r7   r7   r?   r
  N  s   

zTorchPolicyV2.apply_gradients
stats_namec                    sz   g } j D ]!} jr j| }n|j}||v r&|t fdd||  qt|dks;J d| dt j  d|S )a  Returns list of per-tower stats, copied to this Policy's device.

        Args:
            stats_name: The name of the stats to average over (this str
                must exist as a key inside each tower's `tower_stats` dict).

        Returns:
            The list of stats tensor (structs) of all towers, copied to this
            Policy's device.

        Raises:
            AssertionError: If the `stats_name` cannot be found in any one
                of the tower's `tower_stats` dicts.
        c                    s   |   jS r   r.  )r   r=   r7   r?   <lambda>x  s    z/TorchPolicyV2.get_tower_stats.<locals>.<lambda>r   zStats `z+` not found in any of the towers (you have zV towers in total)! Make sure you call the loss function on at least one of the towers.)rg   rW   rr   rz   map_structurerc   )r>   rH  datarF   rW   r7   r=   r?   get_tower_stats_  s$   
zTorchPolicyV2.get_tower_statsc                 C   s   dd | j   D S )Nc                 S   s"   i | ]\}}||    qS r7   )r6   detachnumpy)r;   r   vr7   r7   r?   rI     s   " z-TorchPolicyV2.get_weights.<locals>.<dictcomp>)rF   r1  itemsr=   r7   r7   r?   get_weights  s   zTorchPolicyV2.get_weightsweightsc                 C   s   t || jd}| j| d S Nr8   )r$   r9   rF   r2  )r>   rR  r7   r7   r?   set_weights  s   zTorchPolicyV2.set_weightsc                 C   r   r   )r|   r=   r7   r7   r?   is_recurrent  s   zTorchPolicyV2.is_recurrentc                 C   s   t | j S r   )rc   rF   rx   r=   r7   r7   r?   num_state_tensors  s   zTorchPolicyV2.num_state_tensorsc                 C   s   dd | j  D S )Nc                 S   s   g | ]
}|    qS r7   )rM  r6   rN  r   r7   r7   r?   r@     rP   z3TorchPolicyV2.get_initial_state.<locals>.<listcomp>)rF   rx   r=   r7   r7   r?   rx     s   zTorchPolicyV2.get_initial_statec                    sX   t   }g |d< t| jD ]\}}t| }|d | q| jr*| j |d< |S )N_optimizer_variables_exploration_state)rY   	get_staterq   r   r!   r1  rr   r   )r>   staterE   r   optim_state_dictr   r7   r?   rY    s   
zTorchPolicyV2.get_staterZ  c                    s   | dd }|r4t|t| jksJ t| j|D ]\}}d|d i}t|d | jd|d< || qt| drFd|v rF| jj	|d d |d | _
t 	| d S )	NrW  r   rZ  r8   r   rX  )rZ  global_timestep)r   rc   r   r7  r$   r9   r2  rh   r   	set_stater\  rY   )r>   rZ  optimizer_varsr   r   r[  r   r7   r?   r]    s   


zTorchPolicyV2.set_state
export_dironnxc           	         st  t j|dd |r  j d jvr#tdg  jd<  jtj<  jtj }g }d}d| jv rL|	 jd|  |d7 }d| jv s5 fdd	 j
 D }t j|d
}tjj j|||f|d|dt|
 dtjg ddgdd	 t|
 dtjg D d	 dS t j|d}ztj j|d W dS  ty   t j|rt | tt Y dS w )aP  Exports the Policy's Model to local directory for serving.

        Creates a TorchScript model and saves it.

        Args:
            export_dir: Local writable directory or filename.
            onnx: If given, will export model in ONNX format. The
                value of this parameter set the ONNX OpSet version to use.
        T)exist_ok
state_in_0g      ?r   zstate_in_{}rA   c                    s    i | ]}|d kr| j | qS )r   )_dummy_batchr   r=   r7   r?   rI     s
    
z.TorchPolicyV2.export_model.<locals>.<dictcomp>z
model.onnx	state_insoutput
state_outsc                 S   s   i | ]}|d diqS )r   
batch_sizer7   r   r7   r7   r?   rI     s    )export_paramsopset_versiondo_constant_foldinginput_namesoutput_namesdynamic_axeszmodel.pt)fN)osmakedirsr   rc  r   arrayr   SEQ_LENSrK   rr   r   pathjoinr3   r`  exportrF   r]   save	Exceptionexistsremovera   warningr   )	r>   r_  r`  r   rd  rE   dummy_inputs	file_namefilenamer7   r=   r?   export_model  sX   






zTorchPolicyV2.export_modelimport_filec                 C   s   | j |S )z!Imports weights into torch model.)rF   import_from_h5)r>   r  r7   r7   r?   import_model_from_h5  s   z"TorchPolicyV2.import_model_from_h5c              	   C   s  |dur|n| j d }|dur|n| j}| jr| j  d } }}t| jr9d}	| j| j||||d\}
}}}nN| jj||d t| jrW| j| j|||||dd\}}}n| j	}| |||\}}t
|tjsvt|tsvtd|j||| j}	| jj|	||d\}
}|du r| ||| j|	}|dur||tj< |durt| |tj< ||tj< |  jt|tj 7  _t|
||fS )	a-  Shared forward pass logic (w/ and w/o trajectory view API).

        Returns:
            A tuple consisting of a) actions, b) state_out, c) extra_fetches.
            The input_dict is modified in-place to include a numpy copy of the computed
            actions under `SampleBatch.ACTIONS`.
        Nr   )r   r   r   r   )r   r   F)r   r   r   r   r   r   z`dist_class` ({}) not a TorchDistributionWrapper subclass! Make sure your `action_distribution_fn` or `make_model_and_action_dist` return a correct distribution class.)action_distributionr   r   )r2   r\  rF   evalr   r   r   r   r   rs   
isinstance	functoolspartial
issubclassr   rp   rK   __name__get_exploration_actionr   r   ACTION_DIST_INPUTSr3   expfloatACTION_PROBACTION_LOGPrc   r   r!   )r>   r   r   r   r   r   extra_fetchesr   r   r   r   r   rs   r7   r7   r?   r     sd   

	






z$TorchPolicyV2._compute_action_helperc                 C   s0   t |ts	t|}|tjt|p| jd |S rS  )r  r   set_get_interceptorr  r  r$   r9   )r>   r   r9   r7   r7   r?   r   Y  s   
zTorchPolicyV2._lazy_tensor_dictsample_batchesc                    sF  t jt |ksJ t i t fdd t jdks*jd rZtt	j|jD ]$\}\}}} |||| t d  }t
|d trX|d |d q4n# fddtt	j|jD }|D ]}|  qm|D ]}|  qvg }	tt |D ]}| }
t
|
d tr|
d |
d |	|  q|	S )ah  Performs a parallelized loss and gradient calculation over the batch.

        Splits up the given train batch into n shards (n=number of this
        Policy's devices) and passes each data shard (in parallel) through
        the loss function using the individual devices' models
        (self.model_gpu_towers). Then returns each tower's outputs.

        Args:
            sample_batches: A list of SampleBatch shards to
                calculate loss and gradients for.

        Returns:
            A list (one item per device) of 2-tuples, each with 1) gradient
            list and 2) grad info dict.
        c                    s  t   z|jdkrt nt j| t|j|}t	|dr+|
||}t|tjks6J ddi}t| }dd tt|D }tjD ]\}}	j| }
t|D ]\}}||
v rp|jd urp|jj  q]|| jdd ||	||  g }t|D ]\}}||
v r|jd ur||j |j||< qjrt }t j r|D ]}t jj|t jjjd	 qnt jj |t jjjd	 |	j!D ]}|d
 D ]}|jd ur| jj  _qq|d  t | 7  < qPW d    n1 sw   Y   ||f| < W d    W d S 1 sw   Y  W d S  t"yi } z>dd l#}! t$|j%d d |&  d d'| | |f| < W d    n1 sRw   Y  W Y d }~d S W Y d }~d S d }~ww )Nr6   custom_lossr>  g        c                 S   s   g | ]}d qS r   r7   r:   r7   r7   r?   r@     rV   zPTorchPolicyV2._multi_gpu_parallel_grad_calc.<locals>._worker.<locals>.<listcomp>T)retain_graph)oprT   r   z
 traceback
zIn tower {} on device {})(r3   set_grad_enabledr  r   r_   r9   r   r   rs   rh   r  rc   r   r]   r   r^   rq   r   r6  rK  zero_backwardr   r   rr   r   timeis_availabledistributed
all_reduceReduceOpSUMall_reduce_coalescedr   rw  	tracebackrp   args
format_excrK   )	shard_idxrF   r   r9   loss_outrA  r   r<  opt_idxrF  r   	param_idxparamr  startrG  param_grouprS   er  )grad_enabledlockresultsr>   r7   r?   _workerx  s   









>(

:z<TorchPolicyV2._multi_gpu_parallel_grad_calc.<locals>._workerrA   r5   r   c                    s,   g | ]\}\}}}t j ||||fd qS ))targetr  )ru   Thread)r;   r  rF   r   r9   )r  r7   r?   r@     s    z?TorchPolicyV2._multi_gpu_parallel_grad_calc.<locals>.<listcomp>)rc   rg   ru   Lockr3   is_grad_enabledrR   r2   rq   r7  r  rp   r  rt  r^   rw  rr   )r>   r  r  rF   r   r9   last_resultthreadsthreadoutputsre  r7   )r  r  r  r  r>   r?   r3  a  s>   T

	

z+TorchPolicyV2._multi_gpu_parallel_grad_calcr   )NNNNNNN)NNNTTrB   )r   r   r   )Ir  
__module____qualname____doc__gymspacesSpacer&   rd   rZ   r   r   r   r   r   r   r   r   r	   r,   r   r   r   r   r  r   r   r   r   r   strr   r   r   r   r   r   r   r   r   r   r[   boolr   r+   r]   r   r#   r   r  r  r'  r=  r(   r	  r
  rL  r)   rQ  rT  rU  rV  rx   r*   rY  r]  r~  r  r   r   r'   r3  __classcell__r7   r7   r   r?   r-   >   s    	
	

/

	 
D(5b$>
Qr-   )RrC   r  loggingre   ro  ru   r  typingr   r   r   r   r   r   r   r	   	gymnasiumr  rN  r   	packagingr
   rz   rj   ray.rllib.models.catalogr   ray.rllib.models.modelv2r   (ray.rllib.models.torch.torch_action_distr   $ray.rllib.models.torch.torch_modelv2r   ray.rllib.policy.policyr   ray.rllib.policy.rnn_sequencingr   ray.rllib.policy.sample_batchr   ray.rllib.policy.torch_policyr   ray.rllib.utilsr   r   ray.rllib.utils.annotationsr   r   r   r   r   ray.rllib.utils.errorr   ray.rllib.utils.frameworkr   ray.rllib.utils.metricsr   r   r   $ray.rllib.utils.metrics.learner_infor    ray.rllib.utils.numpyr!   "ray.rllib.utils.spaces.space_utilsr"   ray.rllib.utils.threadingr#   ray.rllib.utils.torch_utilsr$   r%   ray.rllib.utils.typingr&   r'   r(   r)   r*   r+   r,   r3   nn	getLoggerr  ra   r-   r7   r7   r7   r?   <module>   sH    ($


