o
    `۷ip                    @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZmZmZ d dlZd dlZd dlmZmZ d dlmZ d dlmZmZmZ d dlmZmZmZ d d	lmZm Z  d d
l!m"Z" d dl#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* e \Z+Z,Z-e  \Z.Z/dZ0ede1fddZ2eG dd de1Z3eG dd dZ4ede
e& de&fddZ5ede
e& ddfddZ6ddde'fddZ7ede&de3fdd Z8dS )!    N)partial)Number)DictIteratorListOptionalSetUnion)
Deprecateddeprecation_warning)Columns)DeveloperAPIExperimentalAPI	PublicAPI)is_compressedpackunpack)try_import_tftry_import_torch)convert_to_torch_tensor)ModuleIDPolicyIDSampleBatchType
TensorTypeViewRequirementsDict)log_oncedefault_policytensor_dictc              	   C   s  |  tj}|dur1trt|rt|dr1t|dkr1tr+t|r+t|	 
 S tt	|S |  D ]Q\}}|tjkr?q5t|tsHJ | |tjksW|dsW|drXq5t|ttfrdt|n|g}dd |D }zt|d }|r||W   S W q5 ty   Y q5w dS )a  Attempt to count timesteps based on dimensions of individual elements.

    Returns the first successfully counted number of timesteps.
    We do not attempt to count on INFOS or any state_in_* and state_out_* keys. The
    number of timesteps we count in cases where we are unable to count is zero.

    Args:
        tensor_dict: A SampleBatch or another dict.

    Returns:
        count: The inferred number of timesteps >= 0.
    Nnumpyr   	state_in_
state_out_c                 S   s(   g | ]}t |ttfrt|n|qS  )
isinstancer   listnparray).0_vr!   r!   S/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/policy/sample_batch.py
<listcomp>T   s    z+attempt_count_timesteps.<locals>.<listcomp>)getSampleBatchSEQ_LENStf	is_tensorhasattrlentorchintsumitemitemsr"   strINFOS
startswithdicttupletreeflatten	Exception)r   seq_lenskvv_list_lenr!   r!   r(   attempt_count_timesteps!   sF   


rC   c                   @   s  e Zd ZdZejZejZejZejZej	Z	ej
Z
ejZejZejZejZejZejZejZejZejZdZdZdZdZdZdZdZd	Zd
ZdZdZedd Z ede!fddZ"ede!fddZ#ede!fddZ$e%dd Z&e%dd Z'e(de)fddZ*e(de)fddZ+e,ee-ddd d!d" Z.edxd$d%Z/edyd'e)dd fd(d)Z0ede1e2e3e4f  fd*d+Z5ed,e6e3 de6e7 fd-d.Z8edzd/d0Z9ed{d2e:e3 de6d  fd3d4Z;	1d|d5e!d6e!dd fd7d8Z<d9e<dd fd:d;Z=e	1	1	1d}d<e:e! d=e:e! d>e:e! de6d  fd?d@Z>e-dAdd d~dBdCZ?d~dDe!dEe)fdFdAZ@e(	G	&	&	1ddHe3dIe)dJe)dKe:eAdL  fdMdNZBede!fdOdPZCd{dQdRZDed{dSe:eE ddTfdUdVZFed2eAe3e<f de4fdWdXZGeddYdZZHeId[d\ ZJd~d]eAe)d^f fd_d`ZKedadb ZLe%d&eMddcgfdde)deeNe3 dd fdfdgZOe%eMddcgfdeeNe3 dd fdhdiZPe%djdk ZQdldm ZRd9e<dd fdndoZSe-d&dpdqdr ZTe(	sddteUdueAe3e!f dd fdvdwZVd1S )r+   zWrapper around a dictionary with string keys and array-like values.

    For example, {"obs": [1, 2, 3], "reward": [0, -1, 1]} is a batch of three
    samples, each with an "obs" and "reward" attribute.
    action_distprev_actionsprev_rewardsenv_idagent_index	unroll_id
obs_embedsreturns_to_goattention_masksdonesobsc                 O   s  t j|v r	td|dd| _|dd| _|dd| _|dd| _|dd| _t	j
| g|R i | d| _t | _t | _t | _i | _d| _| t j}|du sbt|trjt|d	krj| t jd n&t|tr}tj|tjd
 | t j< }ntrt|strt|r|| t j< | jdu r|durtrt|st|d	krtrt|r|  | _nt|| _| jdu r| dd| _|  D ]\}}t|t tfr|t j!kst|| |< qt"| | _#g | _$dS )aP  Constructs a sample batch (same params as dict constructor).

        Note: All args and those kwargs not listed below will be passed
        as-is to the parent dict constructor.

        Args:
            _time_major: Whether data in this sample batch
                is time-major. This is False by default and only relevant
                if the data contains sequences.
            _max_seq_len: The max sequence chunk length
                if the data contains sequences.
            _zero_padded: Whether the data in this batch
                contains sequences AND these sequences are right-zero-padded
                according to the `_max_seq_len` setting.
            _is_training: Whether this batch is used for
                training. If False, batch may be used for e.g. action
                computations (inference).
        zSampleBatch cannot be constructed anymore with a `DONES` key! Instead, set the new TERMINATEDS and TRUNCATEDS keys. The values under DONES will then be automatically computed using terminated|truncated._time_majorN_max_seq_len_zero_paddedF_is_training_num_grad_updatesr   dtypeis_training)%r+   DONESKeyErrorpop
time_majormax_seq_lenzero_paddedrR   num_grad_updatesr9   __init___slice_seq_lens_in_Bsetaccessed_keys
added_keysdeleted_keysintercepted_valuesget_interceptorr*   r,   r"   r#   r0   r$   r%   int32r1   r.   r-   maxr4   r5   r   r7   rC   count
_slice_map)selfargskwargs	seq_lens_r?   r@   r!   r!   r(   r^      sR   








zSampleBatch.__init__returnc                 C      | j S )z2Returns the amount of samples in the sample batch.rh   rj   r!   r!   r(   __len__     zSampleBatch.__len__c                 C      t | S )zReturns the same as len(self) (number of steps in this batch).

        To make this compatible with `MultiAgentBatch.agent_steps()`.
        r0   rq   r!   r!   r(   agent_steps     zSampleBatch.agent_stepsc                 C   rt   )zReturns the same as len(self) (number of steps in this batch).

        To make this compatible with `MultiAgentBatch.env_steps()`.
        ru   rq   r!   r!   r(   	env_steps  rw   zSampleBatch.env_stepsc                 C   
   d| _ d S )NTr_   rq   r!   r!   r(   enable_slicing_by_batch_id      
z&SampleBatch.enable_slicing_by_batch_idc                 C   ry   )NFrz   rq   r!   r!   r(   disable_slicing_by_batch_id$  r|   z'SampleBatch.disable_slicing_by_batch_idc                 C   s&   | t j d pt j| v o| t j d S )zCReturns True if `self` is either terminated or truncated at idx -1.)r+   TERMINATEDS
TRUNCATEDSrq   r!   r!   r(   is_terminated_or_truncated(  s   z&SampleBatch.is_terminated_or_truncatedc                 C   s:   t | tj dd  otj| vpt | tj dd  S )zReturns True if this SampleBatch only contains one trajectory.

        This is determined by checking all timesteps (except for the last) for being
        not terminated AND (if applicable) not truncated.
        Nr~   )anyr+   r   r   rq   r!   r!   r(   is_single_trajectory/  s   
z SampleBatch.is_single_trajectory/concat_samples() from rllib.policy.sample_batchTnewerrorc                 C      d S Nr!   samplesr!   r!   r(   concat_samples;  s   zSampleBatch.concat_samplesotherc                 C   s   t | |gS )ag  Concatenates `other` to this one and returns a new SampleBatch.

        Args:
            other: The other SampleBatch object to concat to this one.

        Returns:
            The new SampleBatch, resulting from concating `other` to `self`.

        .. testcode::
            :skipif: True

            import numpy as np
            from ray.rllib.policy.sample_batch import SampleBatch
            b1 = SampleBatch({"a": np.array([1, 2])})
            b2 = SampleBatch({"a": np.array([3, 4, 5])})
            print(b1.concat(b2))

        .. testoutput::

            {"a": np.array([1, 2, 3, 4, 5])}
        )r   )rj   r   r!   r!   r(   concatA  s   zSampleBatch.concatFshallowc                    s^   t | }t fdd|}t|| j| j| j| jd}|| j	 | j
|_
| j|_| j|_|S )zCreates a deep or shallow copy of this SampleBatch and returns it.

        Args:
            shallow: Whether the copying should be done shallowly.

        Returns:
            A deep or shallow copy of this SampleBatch object.
        c                    s    t | tjrtj|   dS | S )Ncopy)r"   r$   ndarrayr%   r@   r   r!   r(   <lambda>f  s    z"SampleBatch.copy.<locals>.<lambda>)rO   rQ   rP   rS   )r9   r;   map_structurer+   rZ   r\   r[   r]   set_get_interceptorre   rb   rc   ra   )rj   r   copy_datar!   r   r(   r   Z  s"   

zSampleBatch.copyc                 #   sT      tjddu rdndt }t jD ]}t|f fdd	|V  qdS )a  Returns an iterator over data rows, i.e. dicts with column values.

        Note that if `seq_lens` is set in self, we set it to 1 in the rows.

        Yields:
            The column values of the row in this iteration.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({
               "a": [1, 2, 3],
               "b": [4, 5, 6],
               "seq_lens": [1, 2]
            })
            for row in batch.rows():
                print(row)

        .. testoutput::

            {"a": 1, "b": 4, "seq_lens": 1}
            {"a": 2, "b": 5, "seq_lens": 1}
            {"a": 3, "b": 6, "seq_lens": 1}
           Nc                    s   | d  j kr|| S S Nr   )r,   )pr@   irj   r>   r!   r(   r         z"SampleBatch.rows.<locals>.<lambda>)r*   r+   r,   r9   rangerh   r;   map_structure_with_path)rj   self_as_dictr   r!   r   r(   rowsx  s   zSampleBatch.rowskeysc                 C   s    g }|D ]	}| | |  q|S )a  Returns a list of the batch-data in the specified columns.

        Args:
            keys: List of column names fo which to return the data.

        Returns:
            The list of data items ordered by the order of column
            names in `keys`.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({"a": [1], "b": [2], "c": [3]})
            print(batch.columns(["a", "b"]))

        .. testoutput::

            [[1], [2]]
        )append)rj   r   outr?   r!   r!   r(   columns  s   zSampleBatch.columnsc                    s   |  tjdu}|r| jstd|stj| jntjt	| tj t
| }|tjd tfdd|} durK fddD |tj< | | i | _| S )a  Shuffles the rows of this batch in-place.

        Returns:
            This very (now shuffled) SampleBatch.

        Raises:
            ValueError: If self[SampleBatch.SEQ_LENS] is defined.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({"a": [1, 2, 3, 4]})
            print(batch.shuffle())

        .. testoutput::

            {"a": [4, 1, 3, 2]}
        NzbSampleBatch.shuffle not possible when your data has `seq_lens` defined AND is not zero-padded yet!c                    s   |   S r   r!   r   )permutationr!   r(   r     s    z%SampleBatch.shuffle.<locals>.<lambda>c                    s   g | ]} | qS r!   r!   )r&   r   )infosr!   r(   r)         z'SampleBatch.shuffle.<locals>.<listcomp>)r*   r+   r,   r\   
ValueErrorr$   randomr   rh   r0   r9   rY   r   r7   r;   r   updaterd   )rj   has_time_rankr   shuffledr!   )r   r   r(   shuffle  s    
	
zSampleBatch.shuffleNkeyc                    s  |du s|t jt jfv sJ d| d fdd} fdd}t j|t j|i}t jt jg}d}|durM|t jkrG| vrGt  d| d	||  }n#|D ]}|t jksZ| v ra||  } nqO|du rpt  d
| dtdd |D  jksJ d  d| d j d |S )a  Splits by `eps_id` column and returns list of new batches.
        If `eps_id` is not present, splits by `dones` instead.

        Args:
            key: If specified, overwrite default and use key to split.

        Returns:
            List of batches, one per distinct episode.

        Raises:
            KeyError: If the `eps_id` AND `dones` columns are not present.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            # "eps_id" is present
            batch = SampleBatch(
                {"a": [1, 2, 3], "eps_id": [0, 0, 1]})
            print(batch.split_by_episode())

            # "eps_id" not present, split by "dones" instead
            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 1, 0, 1]})
            print(batch.split_by_episode())

            # The last episode is appended even if it does not end with done
            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 1, 0, 0]})
            print(batch.split_by_episode())

            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 0, 0, 0]})
            print(batch.split_by_episode())


        .. testoutput::

            [{"a": [1, 2], "eps_id": [0, 0]}, {"a": [3], "eps_id": [1]}]
            [{"a": [1, 2, 3], "dones": [0, 0, 1]}, {"a": [4, 5], "dones": [0, 1]}]
            [{"a": [1, 2, 3], "dones": [0, 0, 1]}, {"a": [4, 5], "dones": [0, 0]}]
            [{"a": [1, 2, 3, 4, 5], "dones": [0, 0, 0, 0, 0]}]


        Nz"`SampleBatch.split_by_episode(key=z,)` invalid! Must be [None|'dones'|'eps_id'].c                     sn   g }  t j d }d}t jD ]} t j | }||kr*|  ||  |}|}q|  | j  | S r   )r+   EPS_IDr   rh   r   )slices
cur_eps_idoffsetr   next_eps_idrq   r!   r(   slice_by_eps_id&  s   z5SampleBatch.split_by_episode.<locals>.slice_by_eps_idc                     s|   g } d}t  jD ]$} tj | stj v r- tj | r-|  ||d   |d }q	| jkr<|  |d   | S )Nr   r   )r   rh   r+   r   r   r   )r   r   r   rq   r!   r(   "slice_by_terminateds_or_truncateds5  s   
zHSampleBatch.split_by_episode.<locals>.slice_by_terminateds_or_truncatedsz does not have key `z`!z does not have keys !c                 s   s    | ]}|j V  qd S r   rp   r&   sr!   r!   r(   	<genexpr>^  s    z/SampleBatch.split_by_episode.<locals>.<genexpr>zCalling split_by_episode on z	 returns zwhich should in total have z timesteps!)r+   r   rW   rX   r3   rh   )rj   r   r   r   key_to_methodkey_resolve_orderr   r!   rq   r(   split_by_episode  s4   0

zSampleBatch.split_by_episodestartendc                    s|  |  tjdur*t| tj dkr*dk r$ fdd|  D }n fdd|  D }|dur|dus:J d}d|}|| v r\| | || ||< |d7 }d|}|| v sEt| tj || }t|tt| }	t	||	krt	||	ksJ |	t	|dd  |d< nd}
d}d}t
| tj D ]\}}|
|7 }
|
 krd}d|}|du r|}|| v r| | ||d  ||< |d7 }d|}|| v st| tj || ||
   g }dk r|d   7  < t	|   }|dkr|d  |8  < t	|  ksJ  n|du r|
kr|}qt||| j| j| jd	S tt fd
d| | j| j| jdS )a  Returns a slice of the row data of this batch (w/o copying).

        Args:
            start: Starting index. If < 0, will left-zero-pad.
            end: Ending index.

        Returns:
            A new SampleBatch, which has a slice of this batch's data.
        Nr   c              
      sZ   i | ])\}}|t jkr|d s|ttj f|jdd  |jd|d  gqS )r   r   N)shaperU   r   )r+   r,   r8   r$   concatenatezerosr   rU   r&   r?   r@   r   r   r!   r(   
<dictcomp>t  s     
z%SampleBatch.slice.<locals>.<dictcomp>c                    s<   i | ]\}}|t jkr|d s|t fdd|qS )r   c                       |   S r   r!   r   r   r!   r(   r         z.SampleBatch.slice.<locals>.<dictcomp>.<lambda>)r+   r,   r8   r;   r   r   r   r!   r(   r     s
    zstate_in_{}r   r~   )r>   rR   rO   rS   c                    r   r   r!   valuer   r!   r(   r     r   z#SampleBatch.slice.<locals>.<lambda>rR   rO   rS   )r*   r+   r,   r0   r5   formatr#   nextiterr3   	enumeraterV   rZ   r]   r;   r   )rj   r   r   state_start	state_endr   	state_idx	state_keyr>   data_lenrh   r   seq_lendiffr!   r   r(   slicec  s   






zSampleBatch.sliceslice_c                    s   |j pd |jpt| tj t| krt|  dkr&dkr&|jdv s(J | tjd}t	 fdd| }|durft
t| tj d  }t
t| tj   }||| |tj< || tj< t|| j| j| jdS )2  Helper method to handle SampleBatch slicing using a slice object.

        The returned SampleBatch uses the same underlying data object as
        `self`, so changing the slice will also change `self`.

        Note that only zero or positive bounds are allowed for both start
        and stop values. The slice step must be 1 (or None, which is the
        same).

        Args:
            slice_: The python slice object to slice by.

        Returns:
            A new SampleBatch, however "linking" into the same data
            (sliced) as self.
        r   )r   NNc                       |   S r   r!   r   r   stopr!   r(   r     r   z*SampleBatch._batch_slice.<locals>.<lambda>r   )r   r   r0   r+   r,   steprY   r7   r;   r   r2   r3   r   rV   rZ   r]   )rj   r   r   r   info_slice_startinfo_slice_stopr!   r   r(   _batch_slice  s$   

zSampleBatch._batch_slicesize
num_slicesr?   c           	      C   s   |du r|du rt dd |dusJ |}|du rIt|ts J g }t| }d}|rG||t|  }|| }|| ||  ||8 }|}|s*|S t|tsPJ g }t| }d}|ro|| }|| ||  ||8 }|}|sZ|S )a(  Returns SampleBatches, each one representing a k-slice of this one.

        Will start from timestep 0 and produce slices of size=k.

        Args:
            size: The size (in timesteps) of each returned SampleBatch.
            num_slices: The number of slices to produce.
            k: Deprecated: Use size or num_slices instead. The size
                (in timesteps) of each returned SampleBatch.

        Returns:
            The list of `num_slices` (new) SampleBatches or n (new)
            SampleBatches each one of size `size`.
        Nr?   zsize or num_slicesr   )r   r"   r2   r0   r   )	rj   r   r   r?   r   leftr   len_r   r!   r!   r(   
timeslices  s8   
zSampleBatch.timesliceszSampleBatch.right_zero_padc                 C   r   r   r!   )rj   r[   exclude_statesr!   r!   r(   zero_pad$  s   zSampleBatch.zero_padr[   r   c                    sd    tj}|du rtd t|  fdd}t}t|| d__	S )a*  Right (adding zeros at end) zero-pads this SampleBatch in-place.

        This will set the `self.zero_padded` flag to True and
        `self.max_seq_len` to the given `max_seq_len` value.

        Args:
            max_seq_len: The max (total) length to zero pad to.
            exclude_states: If False, also right-zero-pad all
                `state_in_x` data. If True, leave `state_in_x` keys
                as-is.

        Returns:
            This very (now right-zero-padded) SampleBatch.

        Raises:
            ValueError: If self[SampleBatch.SEQ_LENS] is None (not defined).

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch(
                {"a": [1, 2, 3], "seq_lens": [1, 2]})
            print(batch.right_zero_pad(max_seq_len=4))

            batch = SampleBatch({"a": [1, 2, 3],
                                 "state_in_0": [1.0, 3.0],
                                 "seq_lens": [1, 2]})
            print(batch.right_zero_pad(max_seq_len=5))

        .. testoutput::

            {"a": [1, 0, 0, 0, 2, 3, 0, 0], "seq_lens": [1, 2]}
            {"a": [1, 0, 0, 0, 0, 2, 3, 0, 0, 0],
             "state_in_0": [1.0, 3.0],  # <- all state-ins remain as-is
             "seq_lens": [1, 2]}

        NzNCannot right-zero-pad SampleBatch if no `seq_lens` field present! SampleBatch=c           	         s   du r| d  ds| d tjkrd S |jtks |jjtju r&d g }ntjft	|dd   |jd}d }}tj D ]}||||  |||| < |7 }||7 }qA|t
|ksdJ |}t| D ]\}}|t
| d krz|||< || }qjd S )NTr   r   r   rT   )r8   r+   r,   rU   objecttyper$   str_r   r   r0   r   )	pathr   f_pad
f_pad_basef_baser   currr   r   r   lengthr[   rj   r!   r(   _zero_pad_in_placeX  s*   $

z6SampleBatch.right_zero_pad.<locals>._zero_pad_in_placeT)
r*   r+   r,   r   r0   r9   r;   r   r\   r[   )rj   r[   r   r>   r   r   r!   r   r(   right_zero_pad(  s   'r1   	framework
pin_memory
use_streamstreamtorch.cuda.Streamr   c                 C   sD   |dkr t dus
J |  D ]\}}t|||||d| |< q| S t)9TODO: transfer batch to given device as framework tensor.r1   N)r   r   r   )r1   r5   r   NotImplementedError)rj   devicer   r   r   r   r?   r@   r!   r!   r(   	to_device|  s   
	zSampleBatch.to_devicec                 C   s   t dd t| D S )a  Returns sum over number of bytes of all data buffers.

        For numpy arrays, we use ``.nbytes``. For all other value types, we use
        sys.getsizeof(...).

        Returns:
            The overall size in bytes of the data buffer (all columns).
        c                 s   s,    | ]}t |tjr|jnt|V  qd S r   )r"   r$   r   nbytessys	getsizeof)r&   r@   r!   r!   r(   r     s
    
z)SampleBatch.size_bytes.<locals>.<genexpr>)r3   r;   r<   rq   r!   r!   r(   
size_bytes  s   
zSampleBatch.size_bytesc                 C   s$   z|  |W S  ty   | Y S w )z=Returns one column (by key) from the data or a default value.)__getitem__rX   )rj   r   defaultr!   r!   r(   r*     s
   zSampleBatch.get	module_idMultiAgentBatchc                 C   s   t |pt| i| jS )ak  Returns the respective MultiAgentBatch

        Note, if `module_id` is not provided uses `DEFAULT_POLICY`_ID`.

        Args;
            module_id: An optional module ID. If `None` the `DEFAULT_POLICY_ID`
                is used.

        Returns:
            The MultiAgentBatch (using DEFAULT_POLICY_ID) corresponding
            to this SampleBatch.
        )r  DEFAULT_POLICY_IDrh   )rj   r  r!   r!   r(   as_multi_agent  s   zSampleBatch.as_multi_agentc                 C   s   t |tr
| |S |tjkr| tj S |dkr&tdr#tdddd | jS t	| |s5|| v r5| j
| t| |}| jdurR|| jvrM| || j|< | j| }|S )a'  Returns one column (by key) from the data or a sliced new batch.

        Args:
            key: The key (column name) to return or
                a slice object for slicing this SampleBatch.

        Returns:
            The data under the given key or a sliced version of this batch.
        rV   SampleBatch['is_training']SampleBatch.is_trainingFoldr   r   N)r"   r   _slicer+   rW   r   r   r   rV   r/   ra   addr9   r  re   rd   )rj   r   r   r!   r!   r(   r    s(   






zSampleBatch.__getitem__c                 C   s   |t jkr	tdt| dst| || dS |dkr+tdr&tdddd || _dS || vr5| j	
| t| || || jv rH|| j|< dS dS )	zInserts (overrides) an entire column (by key) in the data buffer.

        Args:
            key: The column name to set a value for.
            item: The data to insert.
        zCannot set `DONES` anymore in a SampleBatch! Instead, set the new TERMINATEDS and TRUNCATEDS keys. The values under DONES will then be automatically computed using terminated|truncated.rb   NrV   r
  r  Fr  )r+   rW   rX   r/   r9   __setitem__r   r   rR   rb   r  rd   )rj   r   r4   r!   r!   r(   r    s,   
	

zSampleBatch.__setitem__c                 C   sB   | j d urt| jtrd| jvr|  | j| jd< | jd S | jS )NrR   )re   r"   rR   boolrd   rq   r!   r!   r(   rV     s   


r  trainingztf1.placeholderc                 C   s   || _ | jdd dS )z1Sets the `is_training` flag for this SampleBatch.rR   N)rR   rd   rY   )rj   r  r!   r!   r(   set_training  s   zSampleBatch.set_trainingc                 C   s   | j | t| | d S r   )rc   r  r9   __delitem__rj   r   r!   r!   r(   r    s   zSampleBatch.__delitem__new_obsbulkr   c                    s     fdd}t | S )a  Compresses the data buffers (by column) in place.

        Args:
            bulk: Whether to compress across the batch dimension (0)
                as well. If False will compress n separate list items, where n
                is the batch size.
            columns: The columns to compress. Default: Only
                compress the obs and new_obs columns.

        Returns:
            This very (now compressed) SampleBatch.
        c                    sl   | d vrd S }t | D ]%\}}|t| d kr/ r#t|||< ntdd |D ||< || }qd S )Nr   r   c                 S      g | ]}t |qS r!   )r   r&   or!   r!   r(   r)   5  r   zDSampleBatch.compress.<locals>._compress_in_place.<locals>.<listcomp>)r   r0   r   r$   r%   )r   r   r   r   r   r  r   rj   r!   r(   _compress_in_place,  s   
z0SampleBatch.compress.<locals>._compress_in_placer;   r   )rj   r  r   r  r!   r  r(   compress  s   zSampleBatch.compressc                    s    fdd}t | S )a  Decompresses data buffers (per column if not compressed) in place.

        Args:
            columns: The columns to decompress. Default: Only
                decompress the obs and new_obs columns.

        Returns:
            This very (now uncompressed) SampleBatch.
        c                    s   | d  vrd S }| d d D ]}|| }qt |r%t||| d < d S t|dkrAt |d rCtdd |D || d < d S d S d S )Nr   r~   c                 S   r  r!   )r   r  r!   r!   r(   r)   U  r   zRSampleBatch.decompress_if_needed.<locals>._decompress_in_place.<locals>.<listcomp>)r   r   r0   r$   r%   )r   r   r   r   r   rj   r!   r(   _decompress_in_placeJ  s   
 z>SampleBatch.decompress_if_needed.<locals>._decompress_in_placer  )rj   r   r   r!   r  r(   decompress_if_needed<  s   z SampleBatch.decompress_if_neededc                 C   s   || j uri | _|| _ dS )z.Sets a function to be called on every getitem.N)re   rd   )rj   fnr!   r!   r(   r   [  s   

zSampleBatch.set_get_interceptorc                 C   s^   t |  }| tjd u rd| j d| dS |tj d| j dt| d  d| dS )NzSampleBatch(z: )z (seqs=r>   z): )r#   r   r*   r+   r,   rh   remover0   )rj   r   r!   r!   r(   __repr__c  s    zSampleBatch.__repr__c           
         s  | j r| |S |jpd |jpt| t| krt| | tjdurt| tj dkr| js_d}t	t
t| tj D ]\}}| j||fg|  || }q>| jt| tj |f | j  \}| j \}||| jr~| j | j fdd}| tjd}t|| }	|durt|ttjfr|| tj< ||| |	tj< t|	| j| j| j| jr| jnd| jdS | tjd}t fdd| }	|durt|ttjfr|| tj< |  |	tj< t|	| j| j| jdS )	r   r   Nc                    s4   | d t jkr| d ds|  S | S )Nr   r   )r+   r,   r8   )r   r   )start_paddedstart_seq_lenstop_paddedstop_seq_lenr!   r(   map_  s
   z SampleBatch._slice.<locals>.map_)rR   rO   rQ   rP   rS   c                    r   r   r!   r   r   r!   r(   r     r   z$SampleBatch._slice.<locals>.<lambda>r   )r_   r   r   r   r0   r*   r+   r,   ri   r   mapr2   extendr   r\   r[   rY   r7   r;   r   r"   r#   r$   r   rV   rZ   r]   r   )
rj   r   sum_r   lstart_unpaddedstop_unpaddedr*  r   r   r!   )r   r&  r'  r   r(  r)  r(   r  m  s\   





	
zSampleBatch._slice)r   c                 C   st  g }g }|  tjd urt| tj dkrt| tj |k s#J dd}d}d}d}d}|t| tj k r| tj | }	||	7 }|| jsG|	n| j7 }||kr|d }
| jsv|||| f ||7 }||kru|| }||	| 8 }|d8 }n	|||f |}|||
f d}|d }|d7 }|t| tj k s6||fS d}|| j	k r|||| f ||7 }|| j	k s||fS )Nr   zFERROR: `slice_size` must be larger than the max. seq-len in the batch!r   )
r*   r+   r,   r0   r$   allr\   r[   r   rh   )rj   
slice_sizedata_slicesdata_slices_states	start_poscurrent_slize_sizeactual_slice_idx	start_idxidxr   end_idxoverheadr   r!   r!   r(   _get_slice_indices  sT    

zSampleBatch._get_slice_indiceslastview_requirementsindexc              	   C   sN  t jt jt jt jt jt ji}i }| D ]\}}|jdu rq|j	p"|}|dkr|
||}|jdurz| | d }t| | }	|	|j }
|t jt jfv rMdnd}|j| }|j| d }|dkrad}tt|| | |
 d g|| g||< qtdd | | ||< q| | ||dkr|d nd ||< qt |tjdgtjd	d
S )a  Creates single ts SampleBatch at given index from `self`.

        For usage as input-dict for model (action or value function) calls.

        Args:
            view_requirements: A view requirements dict from the model for
                which to produce the input_dict.
            index: An integer index value indicating the
                position in the trajectory for which to generate the
                compute_actions input dict. Set to "last" to generate the dict
                at the very end of the trajectory (e.g. for value estimation).
                Note that "last" is different from -1, as "last" will use the
                final NEXT_OBS as observation input.

        Returns:
            The (single-timestep) input dict for ModelV2 calls.
        Fr=  Nr~   r   r   c                 S   s   | dd  S )Nr~   r!   r   r!   r!   r(   r   8  r   z8SampleBatch.get_single_step_input_dict.<locals>.<lambda>rT   )r>   )r+   OBSNEXT_OBSPREV_ACTIONSACTIONSPREV_REWARDSREWARDSr5   used_for_compute_actionsdata_colr*   
shift_fromr0   batch_repeat_valueshift_tor$   r%   r   r;   r   rf   )rj   r>  r?  last_mappings
input_dictview_colview_reqrG  r   traj_lenmissing_at_end	obs_shiftfrom_to_r!   r!   r(   get_single_step_input_dict  sF   





	

z&SampleBatch.get_single_step_input_dict)r   r+   rn   r+   )F)rn   r+   r   )NN)NNN)Tr1   FFN)rn   N)r=  )W__name__
__module____qualname____doc__r   r@  rC  rE  r   r   r7   r,   TACTION_DIST_INPUTSACTION_PROBACTION_LOGPVF_PREDSVALUES_BOOTSTRAPPEDr   rA  ACTION_DISTrB  rD  ENV_IDAGENT_INDEX	UNROLL_ID
OBS_EMBEDSRETURNS_TO_GOATTENTION_MASKSrW   CUR_OBSr   r^   r2   rr   rv   rx   r   r{   r}   r   r  r   r   staticmethodr
   r   r   r   r   r   r6   r   r   r   r   r   r   r   r   r   r   r   r   r   r	   r   r  r*   r   r	  r  r  propertyrV   r  r  	frozensetr   r  r!  r   r%  r  r<  r   rT  r!   r!   r!   r(   r+   c   s    
m


%5r
[/
6T

%&
	
 


W
2
r+   c                
   @   s  e Zd ZdZedeeef defddZ	edefddZ
edefd	d
ZedefddZededed  fddZeedeeef dedeed f fddZeeedddded  dd fddZed6ddZe				d7dededeed   fd!d"Zedefd#d$Zeded%d&gfd'ed(ee ddfd)d*Zeed%d&gfd(ee dd fd+d,Zed6d-d.Z d/edefd0d1Z!d2d3 Z"d4d5 Z#dS )8r  a  A batch of experiences from multiple agents in the environment.

    Attributes:
        policy_batches (Dict[PolicyID, SampleBatch]): Dict mapping policy IDs to
            SampleBatches of experiences.
        count: The number of env steps in this batch.
    policy_batchesrx   c                 C   s,   |  D ]	}t|tsJ q|| _|| _dS )au  Initialize a MultiAgentBatch instance.

        Args:
            policy_batches: Dict mapping policy IDs to SampleBatches of experiences.
            env_steps: The number of environment steps in the environment
                this batch contains. This will be less than the number of
                transitions this batch contains across all policies in total.
        N)valuesr"   r+   rk  rh   )rj   rk  rx   r@   r!   r!   r(   r^   N  s   
zMultiAgentBatch.__init__rn   c                 C   ro   )zThe number of env steps (there are >= 1 agent steps per env step).

        Returns:
            The number of environment steps contained in this batch.
        rp   rq   r!   r!   r(   rx   a  s   zMultiAgentBatch.env_stepsc                 C   ro   )zSame as `self.env_steps()`.rp   rq   r!   r!   r(   rr   j  rs   zMultiAgentBatch.__len__c                 C   s"   d}| j  D ]}||j7 }q|S )zThe number of agent steps (there are >= 1 agent steps per env step).

        Returns:
            The number of agent steps total in this batch.
        r   )rk  rl  rh   )rj   ctbatchr!   r!   r(   rv   o  s   zMultiAgentBatch.agent_stepsr?   c           
   	      s  ddl m} g }| j D ]\}}| D ]}||tj |tj |tj	 ||f qq|
  g t| d fdd}t|dd D ])\}}	|	D ]\}}}}} | jd	i | qOd7 |krr|  dksrJ qIdkrz|  tdksJ S )
a  Returns k-step batches holding data for each agent at those steps.

        For examples, suppose we have agent1 observations [a1t1, a1t2, a1t3],
        for agent2, [a2t1, a2t3], and for agent3, [a3t3] only.

        Calling timeslices(1) would return three MultiAgentBatches containing
        [a1t1, a2t1], [a1t2], and [a1t3, a2t3, a3t3].

        Calling timeslices(2) would return two MultiAgentBatches containing
        [a1t1, a1t2, a2t1], and [a1t3, a2t3, a3t3].

        This method is used to implement "lockstep" replay mode. Note that this
        method does not guarantee each batch contains only data from a single
        unroll. Batches might contain data from multiple different envs.
        r   )SampleBatchBuilderc                     s>   dksJ t dd   D } d   |  d S )Nr   c                 S      i | ]	\}}||  qS r!   )build_and_resetr   r!   r!   r(   r     r   zDMultiAgentBatch.timeslices.<locals>.finish_slice.<locals>.<dictcomp>)r  r5   clearr   )rn  	cur_slicecur_slice_sizefinished_slicesr!   r(   finish_slice  s   z0MultiAgentBatch.timeslices.<locals>.finish_slicec                 S   s   | d d S )N   r!   )xr!   r!   r(   r     r   z,MultiAgentBatch.timeslices.<locals>.<lambda>r   Nr!   ))ray.rllib.evaluation.sample_batch_builderro  rk  r5   r   r   r+   r   rZ  rb  sortcollectionsdefaultdict	itertoolsgroupby
add_valuesr0   )
rj   r?   ro  steps	policy_idrn  rowrw  _groupr!   rs  r(   r   {  s<   

zMultiAgentBatch.timeslicesc                 C   s(   t | dkrt| v r| t S t| |dS )a  Returns SampleBatch or MultiAgentBatch, depending on given policies.
        If policy_batches is empty (i.e. {}) it returns an empty MultiAgentBatch.

        Args:
            policy_batches: Mapping from policy ids to SampleBatch.
            env_steps: Number of env steps in the batch.

        Returns:
            The single default policy's SampleBatch or a MultiAgentBatch
            (more than one policy).
        r   rk  rx   )r0   r  r  r  r!   r!   r(   wrap_as_needed  s   zMultiAgentBatch.wrap_as_neededr   Tr   r   c                 C   rt   r   )concat_samples_into_ma_batchr   r!   r!   r(   r     s   zMultiAgentBatch.concat_samplesc                 C   s   t dd | j D | jS )z{Deep-copies self into a new MultiAgentBatch.

        Returns:
            The copy of self with deep-copied data.
        c                 S   rp  r!   r   r   r!   r!   r(   r     r   z(MultiAgentBatch.copy.<locals>.<dictcomp>)r  rk  r5   rh   rq   r!   r!   r(   r     s   zMultiAgentBatch.copyr1   FNr   r   r   r   c                 C   sJ   |dkr#t dus
J | j D ]\}}|j|||||d| j|< q| S t)r   r1   N)r   r   r   r   )r1   rk  r5   r   r   )rj   r   r   r   r   r   pidpolicy_batchr!   r!   r(   r     s   
	zMultiAgentBatch.to_devicec                 C   s   t dd | j D S )ze
        Returns:
            The overall size in bytes of all policy batches (all columns).
        c                 s   s    | ]}|  V  qd S r   )r  )r&   br!   r!   r(   r     s    z-MultiAgentBatch.size_bytes.<locals>.<genexpr>)r3   rk  rl  rq   r!   r!   r(   r    s   zMultiAgentBatch.size_bytesrN   r  r  r   c                 C   s"   | j  D ]	}|j||d qdS )a8  Compresses each policy batch (per column) in place.

        Args:
            bulk: Whether to compress across the batch dimension (0)
                as well. If False will compress n separate list items, where n
                is the batch size.
            columns: Set of column names to compress.
        )r  r   N)rk  rl  r  )rj   r  r   rn  r!   r!   r(   r    s   zMultiAgentBatch.compressc                 C   s   | j  D ]}|| q| S )zDecompresses each policy batch (per column), if already compressed.

        Args:
            columns: Set of column names to decompress.

        Returns:
            Self.
        )rk  rl  r!  )rj   r   rn  r!   r!   r(   r!    s   z$MultiAgentBatch.decompress_if_neededc                 C   s   | S )zSimply returns `self` (already a MultiAgentBatch).

        Returns:
            This very instance of MultiAgentBatch.
        r!   rq   r!   r!   r(   r	     s   zMultiAgentBatch.as_multi_agentr   c                 C   s
   | j | S )z0Returns the SampleBatch for the given policy id.)rk  r  r!   r!   r(   r  )  r|   zMultiAgentBatch.__getitem__c                 C      d t| j| jS Nz!MultiAgentBatch({}, env_steps={})r   r6   rk  rh   rq   r!   r!   r(   __str__-     zMultiAgentBatch.__str__c                 C   r  r  r  rq   r!   r!   r(   r%  2  r  zMultiAgentBatch.__repr__)rn   r  rU  )$rV  rW  rX  rY  r   r   r   r+   r2   r^   rx   rr   rv   r   r   rh  r	   r  r
   r   r   r   r  r   r   r  r   rj  r   r6   r  r!  r	  r  r  r%  r!   r!   r!   r(   r  D  s|    @





r  r   rn   c              	      s,  t dd | D rt| S g }ddg}g }d } }}| D ]y}|jdkr%q|du r2|j}|j}|j}|j|ks<|j|kr@td|jdu sI|du rR|j|krRtd|r]|j|kr]td|durgt||j}|t	j
durw||t	j
  |jdur|d  |j7  < |d	  |j|j 7  < || qt|dkrt	 S i }|d  D ]2  t	jkrt fd
d|D d|i| < q fdd|D }	tt|d}
tj|
g|	R  | < q|g krtrt|d rt|}n|g krtrt|d rt|}t	||||||d	 |d pd dS )aX  Concatenates a list of  SampleBatches or MultiAgentBatches.

    If all items in the list are or SampleBatch typ4, the output will be
    a SampleBatch type. Otherwise, the output will be a MultiAgentBatch type.
    If input is a mixture of SampleBatch and MultiAgentBatch types, it will treat
    SampleBatch objects as MultiAgentBatch types with 'default_policy' key and
    concatenate it with th rest of MultiAgentBatch objects.
    Empty samples are simply ignored.

    Args:
        samples: List of SampleBatches or MultiAgentBatches to be
            concatenated.

    Returns:
        A new (concatenated) SampleBatch or MultiAgentBatch.

    .. testcode::
        :skipif: True

        import numpy as np
        from ray.rllib.policy.sample_batch import SampleBatch
        b1 = SampleBatch({"a": np.array([1, 2]),
                          "b": np.array([10, 11])})
        b2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))


        c1 = MultiAgentBatch({'default_policy': {
                                        "a": np.array([1, 2]),
                                        "b": np.array([10, 11])
                                        }}, env_steps=2)
        c2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))

    .. testoutput::

        {"a": np.array([1, 2, 3]), "b": np.array([10, 11, 12])}
        MultiAgentBatch = {'default_policy': {"a": np.array([1, 2, 3]),
                                              "b": np.array([10, 11, 12])}}

    c                 s   s    | ]}t |tV  qd S r   )r"   r  r   r!   r!   r(   r   f  s    z!concat_samples.<locals>.<genexpr>r   g        NzNAll SampleBatches' `zero_padded` and `time_major` settings must be consistent!z?Samples must consistently either provide or omit `max_seq_len`!zPFor `zero_padded` SampleBatches, the values of `max_seq_len` must be consistent!r   c                       g | ]}|  qS r!   r!   r   r?   r!   r(   r)     r   z"concat_samples.<locals>.<listcomp>rZ   c                    r  r!   r!   )r&   cr  r!   r(   r)     r   rZ   g      ?)r>   rO   rQ   rP   rS   )r   r  rh   r\   r[   rZ   r   rg   r*   r+   r,   r,  r]   r   r0   r   r7   _concat_valuesr   r;   r   r1   r.   Tensorr-   convert_to_tensor)r   concatd_seq_lensconcatd_num_grad_updatesconcated_samplesr\   r[   rZ   r   concatd_datavalues_to_concat_concat_values_w_timer!   r  r(   r   8  s~   .



 
r   c                 C   s   t t}d}| D ]9}t|trt|dkrq	| }nt|ts+td	t
|j|j D ]\}}|| | q0|| 7 }q	i }| D ]
\}}t|||< qIt||S )a  Concatenates a list of SampleBatchTypes to a single MultiAgentBatch type.

    This function, as opposed to concat_samples() forces the output to always be
    MultiAgentBatch which is more generic than SampleBatch.

    Args:
        samples: List of SampleBatches or MultiAgentBatches to be
            concatenated.

    Returns:
        A new (concatenated) MultiAgentBatch.

    .. testcode::
        :skipif: True

        import numpy as np
        from ray.rllib.policy.sample_batch import SampleBatch
        b1 = MultiAgentBatch({'default_policy': {
                                        "a": np.array([1, 2]),
                                        "b": np.array([10, 11])
                                        }}, env_steps=2)
        b2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))

    .. testoutput::

        {'default_policy': {"a": np.array([1, 2, 3]),
                            "b": np.array([10, 11, 12])}}

    r   z[`concat_samples_into_ma_batch` can only concat SampleBatch|MultiAgentBatch objects, not {}!)r|  r}  r#   r"   r+   r0   r	  r  r   r   r   rV  rk  r5   r   rx   r   )r   rk  rx   r   r   rn  r   batchesr!   r!   r(   r    s&   
"



r  r  c                 G   s   t rt |d rt j|| rddS ddS t|d tjr+tj|| r'ddS ddS trAt|d rAtj|| r=ddS ddS t|d t	rVg }|D ]}|
| qL|S tdt|d  d|d  )zConcatenates a list of values.

    Args:
        values: The values to concatenate.
        time_major: Whether to concatenate along the first axis
            (time_major=False) or the second axis (time_major=True).
    r   r   )dim)axisz$Unsupported type for concatenation: z first element: )r1   r.   catr"   r$   r   r   r-   r   r#   r,  r   r   )rZ   rl  concatenated_listsublistr!   r!   r(   r    s    r  rn  c                 C   sB   t | tr| j }t|dkrt|v r| jt } | S td| S )a9  Converts a MultiAgentBatch to a SampleBatch if necessary.

    Args:
        batch: The SampleBatchType to convert.

    Returns:
        batch: the converted SampleBatch

    Raises:
        ValueError if the MultiAgentBatch has more than one policy_id
        or if the policy_id is not `DEFAULT_POLICY_ID`
    r   a  RLlib tried to convert a multi agent-batch with data from more than one policy to a single-agent batch. This is not supported and may be due to a number of issues. Here are two possible ones:1) Off-Policy Estimation is not implemented for multi-agent batches. You can set `off_policy_estimation_methods: {}` to resolve this.2) Loading multi-agent data for offline training is not implemented.Load single-agent data instead to resolve this.)r"   r  rk  r   r0   r  r   )rn  policy_keysr!   r!   r(    convert_ma_batch_to_sample_batch  s   



r  )9r|  r~  r  	functoolsr   numbersr   typingr   r   r   r   r   r	   r   r$   r;   ray._common.deprecationr
   r   ray.rllib.core.columnsr   ray.rllib.utils.annotationsr   r   r   ray.rllib.utils.compressionr   r   r   ray.rllib.utils.frameworkr   r   ray.rllib.utils.torch_utilsr   ray.rllib.utils.typingr   r   r   r   r   ray.utilr   tf1r-   tfvr1   r  r  r9   rC   r+   r  r   r  r  r  r!   r!   r!   r(   <module>   sV     
A         i t >