o
    ci                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZ d dlmZ d d	lmZ d d
lmZ e \ZZZe eZe	ddefddZe	defddZ dS )    N)Dict)DEFAULT_POLICY_ID)OldAPIStack)try_import_tf)deprecation_warning)NUM_ENV_STEPS_TRAINEDNUM_AGENT_STEPS_TRAINEDLEARN_ON_BATCH_TIMERLOAD_BATCH_TIMER)LearnerInfoBuilder)do_minibatch_sgd)log_oncereturnc                    s.  | j }| j}|j |d|dd}|d}|du r"|dd}| jt }|+ |dks2|dkrHt| fdd	|p? |D  ||g }n |}W d   n1 sWw   Y  |	|j
 | jt  |j
7  < | jt  | 7  < | jri |t d
< | j D ]\}	}
|
||t d
 |	< q|S )aV  Function that improves the all policies in `train_batch` on the local worker.

    .. testcode::
        :skipif: True

        from ray.rllib.execution.rollout_ops import synchronous_parallel_sample
        algo = [...]
        train_batch = synchronous_parallel_sample(algo.env_runner_group)
        # This trains the policy on one batch.
        print(train_one_step(algo, train_batch)))

    .. testoutput::

        {"default_policy": ...}

    Updates the NUM_ENV_STEPS_TRAINED and NUM_AGENT_STEPS_TRAINED counters as well as
    the LEARN_ON_BATCH_TIMER timer of the `algorithm` object.
    
num_epochsnum_sgd_iter   minibatch_sizeNsgd_minibatch_sizer   c                    s   i | ]}|  |qS  )
get_policy).0pidlocal_workerr   Q/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/execution/train_ops.py
<dictcomp><   s    
z"train_one_step.<locals>.<dictcomp>off_policy_estimation)configenv_runner_grouplocal_env_runnerget_timersr	   r   get_policies_to_trainlearn_on_batchpush_units_processedcount	_countersr   r   agent_stepsreward_estimatorsr   itemstrain)	algorithmtrain_batchpolicies_to_trainr   workersr   r   learn_timerinfoname	estimatorr   r   r   train_one_step   sD   



r3   c                 C   sp  t dr	tdd | j}| j}|j}|d|dd}|d}|du r)|d	 }tt|d
 p1d}|| }|| }	|	| dksDJ |	|ksLJ d|	 }| j
t }
|
0 i }|j D ] \}}|jduro|||soq_|  |j| j|dd||< q_W d   n1 sw   Y  | j
t }|W t|d}| D ]A\}}|j| }tdt|t| }td| t|D ] }tj|}t|D ]}|j|| | dd}||| qqq| }W d   n1 sw   Y  |
|j  ||j  | j!t"  |j 7  < | j!t#  |$ 7  < | j%r6i |t& d< | j% D ]\}}|'||t& d |< q%|S )a  Multi-GPU version of train_one_step.

    Uses the policies' `load_batch_into_buffer` and `learn_on_loaded_batch` methods
    to be more efficient wrt CPU/GPU data transfers. For example, when doing multiple
    passes through a train batch (e.g. for PPO) using `config.num_sgd_iter`, the
    actual train batch is only split once and loaded once into the GPU(s).

    .. testcode::
        :skipif: True

        from ray.rllib.execution.rollout_ops import synchronous_parallel_sample
        algo = [...]
        train_batch = synchronous_parallel_sample(algo.env_runner_group)
        # This trains the policy on one batch.
        print(multi_gpu_train_one_step(algo, train_batch)))

    .. testoutput::

        {"default_policy": ...}

    Updates the NUM_ENV_STEPS_TRAINED and NUM_AGENT_STEPS_TRAINED counters as well as
    the LOAD_BATCH_TIMER and LEARN_ON_BATCH_TIMER timers of the Algorithm instance.
    ,mulit_gpu_train_one_step_deprecation_warningz6ray.rllib.execution.train_ops.multi_gpu_train_one_step)oldr   r   r   r   Ntrain_batch_sizenum_gpusr   zBatch size too small!)buffer_index)num_devicesz== sgd epochs for {} ==r   )(r   r   r   r   r   r    intmathceilas_multi_agentr!   r
   policy_batchesr)   is_policy_to_traindecompress_if_needed
policy_mapload_batch_into_bufferr	   r   maxloggerdebugformatrangenprandompermutationlearn_on_loaded_batchadd_learn_on_batch_resultsfinalizer$   r%   r&   r   r   r'   r(   r   r*   )r+   r,   r   r.   r   r   r   r9   per_device_batch_size
batch_size
load_timernum_loaded_samples	policy_idbatchr/   learner_info_buildersamples_per_devicepolicynum_batches_rJ   batch_indexresultslearner_infor1   r2   r   r   r   multi_gpu_train_one_stepW   s~   








r\   )N)!loggingnumpyrH   r;   typingr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   ray.rllib.utils.frameworkr   ray.rllib.utils.deprecationr   ray.rllib.utils.metricsr   r   r	   r
   $ray.rllib.utils.metrics.learner_infor   ray.rllib.utils.sgdr   ray.utilr   tf1tftfv	getLogger__name__rD   r3   r\   r   r   r   r   <module>   s$    
=