o
    $i                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlm Z m!Z! d dl"m#Z# d d	l$m%Z% d d
l&m'Z'm(Z( d dl)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z?m@Z@mAZAmBZBmCZCmDZD e/ \ZEZFeGeHZIe*G dd de ZJe*G dd dZKeK ZLdS )    N)	AnyCallableDictListOptionalSetTupleTypeUnion)ModelCatalog)ModelV2)TorchDistributionWrapper)TorchModelV2)PolicyPolicyState)#pad_batch_to_sequences_of_same_size)SampleBatch)NullContextManager
force_list)OldAPIStackoverride)&ERR_MSG_TORCH_POLICY_CANNOT_SAVE_MODEL)try_import_torch)'DIFF_NUM_GRAD_UPDATES_VS_SAMPLER_POLICYNUM_AGENT_STEPS_TRAINEDNUM_GRAD_UPDATES_LIFETIMELEARNER_STATS_KEY)convert_to_numpy)normalize_action)	with_lock)convert_to_torch_tensor)AlgorithmConfigDictGradInfoDictModelGradientsModelWeightsTensorStructType
TensorTypec                       s  e Zd ZdZdddddddddejjdejjdedee	 d	ee
eeee egeeee f f  d
eee  dee
eee geeeeee f eeeeee f f f  dee
eeeeegeeee ee f f  dedee
egef  f fddZee		dddeeef dedee deeee eeef f fddZee							dedeee ef deee  deee ef deee ef deeeef  dee dee deeee eeef f fddZeee				dfdeee ef deee ef deee  deeee ef  deeee ef  d edefd!d"Zeeed#edeeef fd$d%Z ee	&dgd'ed(edefd)d*Z!eedgd(edefd+d,Z"eedhd-ed(efd.d/Z#eeed#ede$fd0d1Z%eed2e$ddfd3d4Z&d5edee fd6d7Z'eede(fd8d9Z)eed:e(ddfd;d<Z*eedefd=d>Z+eedefd?d@Z,eedee fdAdBZ-eede.f fdCdDZ/eedEe.ddf fdFdGZ0dHdId	edeeef fdJdKZ1deee2f fdLdMZ3deeef dee de	dNedeeef f
dOdPZ4dQedeeef fdRdSZ5deedI dIf fdTdUZ6eedidVedWee ddfdXdYZ7eedZeddfd[d\Z8ed]d^ Z9did#efd_d`Z:daee deeee e;f  fdbdcZ<  Z=S )jTorchPolicyz0PyTorch specific Policy class to use with RLlib.N   )modellossaction_distribution_classaction_sampler_fnaction_distribution_fnmax_seq_lenget_batch_divisibility_reqobservation_spaceaction_spaceconfigr*   r+   r,   r-   r.   r/   r0   c                   s  d _ |d< d_t |||  du r:tj|jd j d\}}tjjj	|jd j d |du r:|}
 tttj }tdt| d	 |d
 s\dks\|std_fddtttppdD _ fddtttpdD _tdrfddjD _ _nstjj tjjj krt! }t|k rt"d| d dfddt#|D _jd _fddt#|D }g _t#|D ]\}}t$% }j&|'j|  qtdr	fddt#jD _jd _t() _*j+ _,tj,dk_-.  j/0jj/ 1 _2 _3|dur?|_4nj5j6j7dkrMj5j6_4nd_4t89 _:g _;dd t#j< D }j:D ]'}g }t#|j=D ]\}}|d D ]
}|&||  qzqrj;&t>| qij?dd}dd t|D _@|_A|_B|_Cd_D|	_EtF|
r|
_GdS |
pd_GdS )a8  Initializes a TorchPolicy instance.

        Args:
            observation_space: Observation space of the policy.
            action_space: Action space of the policy.
            config: The Policy's config dict.
            model: PyTorch policy module. Given observations as
                input, this module must return a list of outputs where the
                first item is action logits, and the rest can be any value.
            loss: Callable that returns one or more (a list of) scalar loss
                terms.
            action_distribution_class: Class for a torch action distribution.
            action_sampler_fn: A callable returning either a sampled action,
                its log-likelihood and updated state or a sampled action, its
                log-likelihood, updated state and action distribution inputs
                given Policy, ModelV2, input_dict, state batches (optional),
                explore, and timestep. Provide `action_sampler_fn` if you would
                like to have full control over the action computation step,
                including the model forward pass, possible sampling from a
                distribution, and exploration logic.
                Note: If `action_sampler_fn` is given, `action_distribution_fn`
                must be None. If both `action_sampler_fn` and
                `action_distribution_fn` are None, RLlib will simply pass
                inputs through `self.model` to get distribution inputs, create
                the distribution object, sample from it, and apply some
                exploration logic to the results.
                The callable takes as inputs: Policy, ModelV2, input_dict
                (SampleBatch), state_batches (optional), explore, and timestep.
            action_distribution_fn: A callable returning distribution inputs
                (parameters), a dist-class to generate an action distribution
                object from, and internal-state outputs (or an empty list if
                not applicable).
                Provide `action_distribution_fn` if you would like to only
                customize the model forward pass call. The resulting
                distribution parameters are then used by RLlib to create a
                distribution object, sample from it, and execute any
                exploration logic.
                Note: If `action_distribution_fn` is given, `action_sampler_fn`
                must be None. If both `action_sampler_fn` and
                `action_distribution_fn` are None, RLlib will simply pass
                inputs through `self.model` to get distribution inputs, create
                the distribution object, sample from it, and apply some
                exploration logic to the results.
                The callable takes as inputs: Policy, ModelV2, ModelInputDict,
                explore, timestep, is_training.
            max_seq_len: Max sequence length for LSTM training.
            get_batch_divisibility_req: Optional callable that returns the
                divisibility requirement for sample batches given the Policy.
        torch	frameworkFNr*   )r5   )	obs_spacer2   num_outputsmodel_configr5   zFound z visible cuda devices.
_fake_gpusr   cpuc                    s   g | ]} j qS  device.0_selfr;   Z/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/rllib/policy/torch_policy.py
<listcomp>   s    z(TorchPolicy.__init__.<locals>.<listcomp>   c                    s"   g | ]}|d kr
 nt  qS r   )copydeepcopy)r?   i)r*   r;   rC   rD      s    target_modelc                    s   i | ]}| j qS r;   )rJ   )r?   mrA   r;   rC   
<dictcomp>   s    z(TorchPolicy.__init__.<locals>.<dictcomp>z7TorchPolicy was not able to find enough GPU IDs! Found z, but num_gpus=.c                    s(   g | ]\}}| k rt d |qS )zcuda:{})r4   r=   formatr?   rI   id_num_gpusr;   rC   rD      s
    c                    s   g | ]
\}}| k r|qS r;   r;   rO   rQ   r;   rC   rD          c                    s*   i | ]\}}|t  j j| qS r;   )rG   rH   rJ   todevices)r?   rI   rK   rA   r;   rC   rL      s    zPolicy.lossc                 S   s   i | ]\}}||qS r;   r;   )r?   rI   pr;   r;   rC   rL   	  s    paramsnum_multi_gpu_tower_stacksc                 S   s   g | ]}g qS r;   r;   r>   r;   r;   rC   rD         )Hr5   _loss_initializedsuper__init__r   get_action_distr3   get_model_v2r1   r2   _get_num_gpus_for_policylistranger4   cudadevice_countloggerinfolenr=   intmathceilrU   model_gpu_towershasattrtarget_modelsr*   ray_privateworker_modeWORKER_MODEget_gpu_ids
ValueError	enumeraterG   rH   appendrT   	threadingRLock_lockget_initial_state_state_inputs_is_recurrent/_update_model_view_requirements_from_init_stateview_requirementsupdate_create_explorationexplorationunwrapped_model_lossr+   __func____qualname__r   	optimizer_optimizersmulti_gpu_param_groups
parametersparam_groupssetget_loaded_batches
dist_classr-   r.   distributed_world_sizer/   callablebatch_divisibility_req)rB   r1   r2   r3   r*   r+   r,   r-   r.   r/   r0   r   	logit_dimgpu_idsidsrI   r@   
model_copymain_paramsoparam_indicespg_idxpgrV   num_buffers	__class__)r*   rR   rB   rC   r\   @   s   Q
&


	







zTorchPolicy.__init__
input_dictexploretimestepreturnc                    s   t  = |    d  fdd  D }|r/t jdgt|d  t j|d jdnd }| 	 ||||W  d    S 1 sDw   Y  d S )NTc                    s$   g | ]}d |dd v r | qS )state_inN   r;   r?   kr   r;   rC   rD   1  s    z?TorchPolicy.compute_actions_from_input_dict.<locals>.<listcomp>rE   r   )dtyper=   )
r4   no_grad_lazy_tensor_dictset_trainingkeystensorrf   longr=   _compute_action_helper)rB   r   r   r   kwargsstate_batchesseq_lensr;   r   rC   compute_actions_from_input_dict$  s$   






$z+TorchPolicy.compute_actions_from_input_dict	obs_batchr   prev_action_batchprev_reward_batch
info_batchc	                    s   t  H t jt|t jd}
 tj|ddi}|d ur%t	||tj
< |d ur1t	||tj<  fdd|p9g D } |||
||W  d    S 1 sOw   Y  d S )Nr   is_trainingFc                       g | ]}t | jqS r;   r!   r=   r?   srA   r;   rC   rD   \      z/TorchPolicy.compute_actions.<locals>.<listcomp>)r4   r   onesrf   int32r   r   CUR_OBSnpasarrayPREV_ACTIONSPREV_REWARDSr   )rB   r   r   r   r   r   episodesr   r   r   r   r   r;   rA   rC   compute_actionsC  s$   


$zTorchPolicy.compute_actionsTactionsactions_normalizedc                    s   j r jd u rtdt   tj|tj|i}|d ur%||tj	< |d ur.||tj
< tjt|tjd}	 fdd|p@g D } jjdd  jrz j  j|||	ddd\}
}}W n@ ty } z(d|jd	 v sud
|jd	 v r j  j|tj ddd\}
}}n|W Y d }~nd }~ww  j} |||	\}
}||
 j}|tj }|s jd rt| j}||}|W  d    S 1 sw   Y  d S )NzfCannot compute log-prob/likelihood w/o an `action_distribution_fn` and a provided `action_sampler_fn`!r   c                    r   r;   r   r   rA   r;   rC   rD     r   z7TorchPolicy.compute_log_likelihoods.<locals>.<listcomp>F)r   )r   r   r   r   r   positional argumentr   unexpected keyword argument)policyr*   r   r   r   normalize_actions)r-   r.   rs   r4   r   r   r   r   ACTIONSr   r   r   rf   r   r   before_compute_actionsr*   	TypeErrorargsr   r3   r   action_space_structlogp)rB   r   r   r   r   r   r   r   r   r   dist_inputsr   	state_outer@   action_distlog_likelihoodsr;   rA   rC   compute_log_likelihoodsc  sf   





$z#TorchPolicy.compute_log_likelihoodspostprocessed_batchc                 C   s   | j r| j   i }| jj| ||d | |\}}| t |  jd7  _| j r0| j  |d< |	d|t
|jt| jt| jd |jpCd i |S )Nr   train_batchresultrE   r*   custom_metricsr   )r*   train	callbackson_learn_on_batchcompute_gradientsapply_gradients_directStepOptimizerSingletonnum_grad_updatesmetricsr~   r   countr   r   )rB   r   learn_statsgradsfetchesr;   r;   rC   learn_on_batch  s0   

zTorchPolicy.learn_on_batchr   batchbuffer_indexc                    s   | d t jdkr5 jd jdkr5|dksJ t| jd j jd  | |g j	d< t|S |j
t jd}|D ]}t| jd j jd q@ fdd	t|D }| j	|< t|d S )
NTrE   r   r:   Fr   r/   shuffler   r}   )
num_slicesc                    s    g | ]\}}|  j| qS r;   )	to_devicerU   )r?   rI   slicerA   r;   rC   rD          z6TorchPolicy.load_batch_into_buffer.<locals>.<listcomp>)r   rf   rU   typer   r/   r   r}   r   r   
timeslicesrt   )rB   r   r   slicesr   r;   rA   rC   load_batch_into_buffer  s2   

	
z"TorchPolicy.load_batch_into_bufferc                 C   s@   t | jdkr| jd dkr|dksJ tdd | j| D S )NrE   r   z/cpu:0c                 s       | ]}t |V  qd S Nrf   r?   br;   r;   rC   	<genexpr>      zATorchPolicy.get_num_samples_loaded_into_buffer.<locals>.<genexpr>)rf   rU   sumr   )rB   r   r;   r;   rC   "get_num_samples_loaded_into_buffer  s   z.TorchPolicy.get_num_samples_loaded_into_bufferoffsetc                    s  j | s	tdjd  d u rjdjd   tj  jr1jD ]}|  q*tjdkrkjd jdkrk|dksFJ  tj d d krYj d d }nj d d    }	|S tjdkrj
 }jd j
u sJ jdd  D ]}|| q tdd	 j | D krj | }n fd
dj | D }i }t|D ]\}i }	jj||	d d|	i|d < q|}
g }tt|
d d D ](|
d d  d ur|tjtfdd|
D dd q|d  qtj
 D ]
\}| |_qt  jd7  _ttj|D ](\\}}|d  t|d|  t!jt"jd |jpSd i q0|#  |S )NzPMust call Policy.load_batch_into_buffer() before Policy.learn_on_loaded_batch()!minibatch_sizesgd_minibatch_sizetrain_batch_sizerE   r   r:   c                 s   r   r   r   r   r;   r;   rC   r   B  r   z4TorchPolicy.learn_on_loaded_batch.<locals>.<genexpr>c                    s   g | ]
}|   qS r;   r;   r   )device_batch_sizer   r;   rC   rD   E  s    z5TorchPolicy.learn_on_loaded_batch.<locals>.<listcomp>r   r   tower_c                    s    g | ]}|d     jqS rF   rT   r=   )r?   t)rI   rB   r;   rC   rD   \  r   )dimr*   )$r   rs   r3   r   rf   rU   rj   r   r   r   r*   
state_dictload_state_dictr   rt   r   r   _multi_gpu_parallel_grad_calcra   ru   r4   meanstackr   gradr   r   r   zipr~   r   extra_grad_infor   r   r   extra_compute_grad_fetches)rB   r   r   r  r   r  towerdevice_batchesbatch_fetchesr   tower_outputs	all_gradsrV   r*   r;   )r  rI   r   rB   rC   learn_on_loaded_batch  s~   







z!TorchPolicy.learn_on_loaded_batchc                 C   s   t | jdks	J |jst|| jd| j| jd |d | j|| jd d | 	|g}|d \}}|d  t | j
  < || | |  }|t|fi t|ifS )NrE   Fr   Tr   r<   allreduce_latency)rf   rU   zero_paddedr   r/   r   r}   r   r   r
  r   r~   r  r  dictr   )rB   r   r  r  	grad_infor   r;   r;   rC   r   {  s"   
zTorchPolicy.compute_gradients	gradientsc                 C   s   |t krt| jD ]\}}|  q	d S t| jdksJ t|| j D ]\}}|d urDt	|r:|
| j|_q%t|
| j|_q%| jd   d S )NrE   r   )r   rt   r   steprf   r  r*   r   r4   	is_tensorrT   r=   r  
from_numpy)rB   r  rI   optgrV   r;   r;   rC   r     s   

zTorchPolicy.apply_gradients
stats_namec                    sf   g } j D ]}||jv r|t fdd|j|  qt|dks1J d| dt j  d|S )a  Returns list of per-tower stats, copied to this Policy's device.

        Args:
            stats_name: The name of the stats to average over (this str
                must exist as a key inside each tower's `tower_stats` dict).

        Returns:
            The list of stats tensor (structs) of all towers, copied to this
            Policy's device.

        Raises:
            AssertionError: If the `stats_name` cannot be found in any one
            of the tower's `tower_stats` dicts.
        c                    s   |   jS r   r  )r   rA   r;   rC   <lambda>  s    z-TorchPolicy.get_tower_stats.<locals>.<lambda>r   zStats `z+` not found in any of the towers (you have zV towers in total)! Make sure you call the loss function on at least one of the towers.)rj   tower_statsru   treemap_structurerf   )rB   r!  datar  r;   rA   rC   get_tower_stats  s   

zTorchPolicy.get_tower_statsc                 C   s   dd | j   D S )Nc                 S   s"   i | ]\}}||    qS r;   )r:   detachnumpy)r?   r   vr;   r;   rC   rL     s   " z+TorchPolicy.get_weights.<locals>.<dictcomp>)r*   r  itemsrA   r;   r;   rC   get_weights  s   zTorchPolicy.get_weightsweightsc                 C   s   t || jd}| j| d S Nr<   )r!   r=   r*   r	  )rB   r-  r;   r;   rC   set_weights  s   zTorchPolicy.set_weightsc                 C   s   | j S r   )r{   rA   r;   r;   rC   is_recurrent  s   zTorchPolicy.is_recurrentc                 C   s   t | j S r   )rf   r*   ry   rA   r;   r;   rC   num_state_tensors  s   zTorchPolicy.num_state_tensorsc                 C   s   dd | j  D S )Nc                 S   s   g | ]
}|    qS r;   )r(  r:   r)  r   r;   r;   rC   rD     rS   z1TorchPolicy.get_initial_state.<locals>.<listcomp>)r*   ry   rA   r;   r;   rC   ry     s   zTorchPolicy.get_initial_statec                    sX   t   }g |d< t| jD ]\}}t| }|d | q| jr*| j |d< |S )N_optimizer_variables_exploration_state)r[   	get_statert   r   r   r  ru   r   )rB   staterI   r   optim_state_dictr   r;   rC   r4    s   
zTorchPolicy.get_stater5  c                    s   | dd }|r4t|t| jksJ t| j|D ]\}}d|d i}t|d | jd|d< || qt| drFd|v rF| jj	|d d |d | _
t 	| d S )	Nr2  r   r5  r<   r   r3  )r5  global_timestep)r   rf   r   r  r!   r=   r	  rk   r   	set_stater7  r[   )rB   r5  optimizer_varsr   r   r6  r   r;   rC   r8    s   


zTorchPolicy.set_stater   ztorch.optim.Optimizerc                 C      i S )a  Called after each optimizer.zero_grad() + loss.backward() call.

        Called for each self._optimizers/loss-value pair.
        Allows for gradient processing before optimizer.step() is called.
        E.g. for gradient clipping.

        Args:
            optimizer: A torch optimizer object.
            loss: The loss tensor associated with the optimizer.

        Returns:
            An dict with information on the gradient processing step.
        r;   )rB   r   r+   r;   r;   rC   extra_grad_process  s   zTorchPolicy.extra_grad_processc                 C   s   t i iS )zExtra values to fetch and return from compute_gradients().

        Returns:
            Extra fetch dict to be added to the fetch dict of the
            `compute_gradients` call.
        r   rA   r;   r;   rC   r    s   z&TorchPolicy.extra_compute_grad_fetchesr   c                 C   r:  )a  Returns dict of extra info to include in experience batch.

        Args:
            input_dict: Dict of model input tensors.
            state_batches: List of state tensors.
            model: Reference to the model object.
            action_dist: Torch action dist object
                to get log-probs (e.g. for already sampled actions).

        Returns:
            Extra outputs to return in a `compute_actions_from_input_dict()`
            call (3rd return value).
        r;   )rB   r   r   r*   r   r;   r;   rC   extra_action_out   s   zTorchPolicy.extra_action_outr   c                 C   r:  )zReturn dict of extra grad info.

        Args:
            train_batch: The training batch for which to produce
                extra grad info for.

        Returns:
            The info dict carrying grad info per str key.
        r;   )rB   r   r;   r;   rC   r  6  s   
zTorchPolicy.extra_grad_infoc                 C   sT   t | drtjj| j | jd dg}n
tj| j g}| jr(| j|}|S )zCustom the local PyTorch optimizer(s) to use.

        Returns:
            The local PyTorch optimizer(s) to use for this Policy.
        r3   lr)r=  )	rk   r4   optimAdamr*   r   r3   r   get_exploration_optimizer)rB   
optimizersr;   r;   rC   r   B  s   
zTorchPolicy.optimizer
export_dironnxc           	         st  t j|dd |r  j d jvr#tdg  jd<  jtj<  jtj }g }d}d| jv rL|	 jd|  |d7 }d| jv s5 fdd	 j
 D }t j|d
}tjj j|||f|d|dt|
 dtjg ddgdd	 t|
 dtjg D d	 dS t j|d}ztj j|d W dS  ty   t j|rt | tt Y dS w )aP  Exports the Policy's Model to local directory for serving.

        Creates a TorchScript model and saves it.

        Args:
            export_dir: Local writable directory or filename.
            onnx: If given, will export model in ONNX format. The
                value of this parameter set the ONNX OpSet version to use.
        T)exist_ok
state_in_0g      ?r   zstate_in_{}rE   c                    s    i | ]}|d kr| j | qS )r   )_dummy_batchr   rA   r;   rC   rL   p  s
    
z,TorchPolicy.export_model.<locals>.<dictcomp>z
model.onnx	state_insoutput
state_outsc                 S   s   i | ]}|d diqS )r   
batch_sizer;   r   r;   r;   rC   rL     s    )export_paramsopset_versiondo_constant_foldinginput_namesoutput_namesdynamic_axeszmodel.pt)fN)osmakedirsr   rF  r   arrayr   SEQ_LENSrN   ru   r   pathjoinr4   rC  exportr*   r`   save	Exceptionexistsremoverd   warningr   )	rB   rB  rC  r   rG  rI   dummy_inputs	file_namefilenamer;   rA   rC   export_modelT  sX   






zTorchPolicy.export_modelimport_filec                 C   s   | j |S )z!Imports weights into torch model.)r*   import_from_h5)rB   rb  r;   r;   rC   import_model_from_h5  s   z TorchPolicy.import_model_from_h5c                 C   s  |dur|n| j d }|dur|n| j}|duo|g k| _| jr%| j  | jrKd }}| j| | j||||d}t|dkrE|\}	}
}}n|\}	}
}n| jj||d | j	rz| j	| | j|||||dd\}}}W nA t
y } z)d|jd v sd	|jd v r| j	| | j|tj ||dd
\}}}n|W Y d}~nd}~ww | j}| |||\}}t|tjst|tstd|j||| j}| jj|||d\}	}
|	|tj< | ||| j|}|dur||tj< |
durt|
 |tj< |
|tj< |  jt|tj 7  _t |	||fS )zShared forward pass logic (w/ and w/o trajectory view API).

        Returns:
            A tuple consisting of a) actions, b) state_out, c) extra_fetches.
        Nr   )r   r      F)r   r   r   r   r   r   r   r   r   )r   r   r   z`dist_class` ({}) not a TorchDistributionWrapper subclass! Make sure your `action_distribution_fn` or `make_model_and_action_dist` return a correct distribution class.)action_distributionr   r   )!r3   r7  r{   r*   evalr-   rf   r   r   r.   r   r   r   r   r   
isinstance	functoolspartial
issubclassr   rs   rN   __name__get_exploration_actionr   r<  ACTION_DIST_INPUTSr4   expfloatACTION_PROBACTION_LOGPr   )rB   r   r   r   r   r   r   r   action_sampler_outputsr   r   r   r   r   extra_fetchesr;   r;   rC   r     s   	







z"TorchPolicy._compute_action_helperc                 C   s0   t |ts	t|}|tjt|p| jd |S r.  )rh  r   set_get_interceptorri  rj  r!   r=   )rB   r   r=   r;   r;   rC   r     s   
zTorchPolicy._lazy_tensor_dictsample_batchesc                    sF  t jt |ksJ t i t fdd t jdks*jd rZtt	j|jD ]$\}\}}} |||| t d  }t
|d trX|d |d q4n# fddtt	j|jD }|D ]}|  qm|D ]}|  qvg }	tt |D ]}| }
t
|
d tr|
d |
d |	|  q|	S )ah  Performs a parallelized loss and gradient calculation over the batch.

        Splits up the given train batch into n shards (n=number of this
        Policy's devices) and passes each data shard (in parallel) through
        the loss function using the individual devices' models
        (self.model_gpu_towers). Then returns each tower's outputs.

        Args:
            sample_batches: A list of SampleBatch shards to
                calculate loss and gradients for.

        Returns:
            A list (one item per device) of 2-tuples, each with 1) gradient
            list and 2) grad info dict.
        c                    s  t   z|jdkrt nt j| t|j|}|	||}t
|t
jks2J ddi}t| }dd tt
|D }tjD ]\}}	j| }
t|D ]\}}||
v rl|jd url|jj  qY|| jdd ||	||  g }t|D ]\}}||
v r|jd ur||j |j||< qjrt }t j r|D ]}t jj|t jjjd qnt jj|t jjjd |	j D ]}|d	 D ]}|jd ur| jj  _qq|d  t | 7  < qLW d    n1 sw   Y   ||f| < W d    W d S 1 sw   Y  W d S  t!yc } z<d
d l"} t#d|  d| d| d|$  d	|f| < W d    n1 sLw   Y  W Y d }~d S W Y d }~d S d }~ww )Nr:   r  g        c                 S   s   g | ]}d qS r   r;   r>   r;   r;   rC   rD   :  rY   zNTorchPolicy._multi_gpu_parallel_grad_calc.<locals>._worker.<locals>.<listcomp>T)retain_graph)oprW   r   zError In tower z on device z2 during multi GPU parallel gradient calculation:: z
Traceback: 

)%r4   set_grad_enabledr   r   rb   r=   r   r   r   custom_lossrf   r   r`   r   ra   rt   r   r  r&  zero_backwardr~   r;  ru   r   timeis_availabledistributed
all_reduceReduceOpSUMall_reduce_coalescedr   rZ  	tracebackrs   
format_exc)	shard_idxr*   sample_batchr=   loss_outr  r   r  opt_idxr  r   	param_idxparamr   startr   param_grouprV   r   r  )grad_enabledlockresultsrB   r;   rC   _worker&  s   








=(
:z:TorchPolicy._multi_gpu_parallel_grad_calc.<locals>._workerrE   r9   r   c                    s,   g | ]\}\}}}t j ||||fd qS ))targetr   )rv   Thread)r?   r  r*   r  r=   )r  r;   rC   rD     s    z=TorchPolicy._multi_gpu_parallel_grad_calc.<locals>.<listcomp>)rf   rj   rv   Lockr4   is_grad_enabledrU   r3   rt   r  rh  rs   r  rW  ra   rZ  ru   )rB   rv  r  r*   r  r=   last_resultthreadsthreadoutputsrH  r;   )r  r  r  r  rB   rC   r
    s>   T

	

z)TorchPolicy._multi_gpu_parallel_grad_calc)NN)NNNNNNN)NNNTrF   )r   r   r   )>rl  
__module__r   __doc__gymspacesSpacer"   r   r   r   r   r   r	   r   r   r
   r'   r   r   rg   r\   r   r   strboolr   r&   r`   r   r    r   r   r   r   r  r$   r   r   r'  r%   r,  r/  r0  r1  ry   r   r4  r8  r;  r   r  r<  r  r   ra  rd  r   r   r#   r
  __classcell__r;   r;   r   rC   r(   <   st   	

 e

	
T&1a

	



=
n	r(   c                       s4   e Zd ZdZdZ fddZdd Zdd Z  ZS )	DirectStepOptimizerzsTypesafe method for indicating `apply_gradients` can directly step the
    optimizers with in-place gradients.
    Nc                    s   t jd u rt | t _t jS r   )r  	_instancer[   __new__)clsr   r;   rC   r    s   
zDirectStepOptimizer.__new__c                 C   s   t | t |u S r   )r   )rB   otherr;   r;   rC   __eq__  s   zDirectStepOptimizer.__eq__c                 C   s   dS )Nr  r;   rA   r;   r;   rC   __repr__  s   zDirectStepOptimizer.__repr__)	rl  r  r   r  r  r  r  r  r  r;   r;   r   rC   r    s    r  )MrG   ri  loggingrh   rR  rv   r~  typingr   r   r   r   r   r   r   r	   r
   	gymnasiumr  r)  r   r$  rm   ray.rllib.models.catalogr   ray.rllib.models.modelv2r   (ray.rllib.models.torch.torch_action_distr   $ray.rllib.models.torch.torch_modelv2r   ray.rllib.policy.policyr   r   ray.rllib.policy.rnn_sequencingr   ray.rllib.policy.sample_batchr   ray.rllib.utilsr   r   ray.rllib.utils.annotationsr   r   ray.rllib.utils.errorr   ray.rllib.utils.frameworkr   ray.rllib.utils.metricsr   r   r   $ray.rllib.utils.metrics.learner_infor   ray.rllib.utils.numpyr   "ray.rllib.utils.spaces.space_utilsr   ray.rllib.utils.threadingr    ray.rllib.utils.torch_utilsr!   ray.rllib.utils.typingr"   r#   r$   r%   r&   r'   r4   nn	getLoggerrl  rd   r(   r  r   r;   r;   r;   rC   <module>   sZ    , 
	
        h
