o
    cihL                     @   s  d dl Zd dlmZmZ d dlmZmZ d dlZ	d dl
mZ d dlZd dlmZmZmZmZ eG dd de	jZedejd	ejfd
dZedejd	efddZedeeef dejd	eeeef fddZedeeef dejd	eeeef fddZedejd	eej fddZedd Ze	d5ddddddejdedeeeef dee deded	e	jfd d!Zed"d# Z edd$d%ee d&eeef fd'd(Z!ed)d* Z"ed+d, Z#ed-d. Z$ed/d0 Z%ed1ed2ed	efd3d4Z&dS )6    N)TupleDict)ActTypeObsType)DeveloperAPI)AnyListOptionalUnionc                   @   s   e Zd ZdZdd ZdS )BatchedNdArraya  A ndarray-wrapper the usage of which indicates that there a batch dim exists.

    This is such that our `batch()` utility can distinguish between having to
    stack n individual batch items (each one w/o any batch dim) vs having to
    concatenate n already batched items (each one possibly with a different batch
    dim, but definitely with some batch dim).

    TODO (sven): Maybe replace this by a list-override instead.
    c                 C   s   t || }|S N)npasarrayview)clsinput_arrayobj r   V/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/utils/spaces/space_utils.py__new__   s   zBatchedNdArray.__new__N)__name__
__module____qualname____doc__r   r   r   r   r   r   
   s    
r   spacereturnc                 C   s   t | dr
t| jS | S )aR  Returns the original space of a space, if any.

    This function recursively traverses the given space and returns the original space
    at the very end of the chain.

    Args:
        space: The space to get the original space for.

    Returns:
        The original space or the given space itself if no original space is found.
    original_space)hasattrget_original_spacer   r   r   r   r   r      s   

r   c                 C   s,   t | tjjtjjtjjtjjfv rdS dS )aa  Returns true, if the space is composite.

    Note, we follow here the glossary of `gymnasium` by which any spoace
    that holds other spaces is defined as being 'composite'.

    Args:
        space: The space to be checked for being composed of other spaces.

    Returns:
        True, if the space is composed of other spaces, otherwise False.
    TF)typegymspacesr   GraphSequencer   r   r   r   r   is_composite_space/   s   r%   samplec                 C   s   t |r
|| gS | S )a  Returns a jsonabled space sample, if the space is composite.

    Checks, if the space is composite and converts the sample to a jsonable
    struct in this case. Otherwise return the sample as is.

    Args:
        sample: Any action or observation type possible in `gymnasium`.
        space: Any space defined in `gymnasium.spaces`.

    Returns:
        The `sample` as-is, if the `space` is composite, otherwise converts the
        composite sample to a JSONable data type.
    )r%   to_jsonabler&   r   r   r   r   to_jsonable_if_neededG   s   r)   c                 C   s   t |r|| d S | S )a  Returns a jsonabled space sample, if the space is composite.

    Checks, if the space is composite and converts the sample to a JSONable
    struct in this case. Otherwise return the sample as is.

    Args:
        sample: Any action or observation type possible in `gymnasium`, or a
            JSONable data type.
        space: Any space defined in `gymnasium.spaces`.

    Returns:
        The `sample` as-is, if the `space` is not composite, otherwise converts the
        composite sample jsonable to an actual `space` sample..
    r   )r%   from_jsonabler(   r   r   r   from_jsonable_if_needed_   s   r+   c                    s    fdd g } | | |S )a  Flattens a gym.Space into its primitive components.

    Primitive components are any non Tuple/Dict spaces.

    Args:
        space: The gym.Space to flatten. This may be any
            supported type (including nested Tuples and Dicts).

    Returns:
        List[gym.Space]: The flattened list of primitive Spaces. This list
            does not contain Tuples or Dicts anymore.
    c                    sl   ddl m} t| tr| D ]} || qd S t| t|fr/t| jD ]	} | | | q#d S ||  d S )Nr   )FlexDict)ray.rllib.utils.spaces.flexdictr,   
isinstancer   r   sortedr"   append)space_return_listr,   sk_helper_flattenr   r   r6      s   
z&flatten_space.<locals>._helper_flattenr   )r   retr   r5   r   flatten_spacex   s   
r8   c                    s    fdd  | S )aa  Returns a Tuple/Dict Space as native (equally structured) py tuple/dict.

    Args:
        space: The Space to get the python struct for.

    Returns:
        Union[dict,tuple,gym.Space]: The struct equivalent to the given Space.
            Note that the returned struct still contains all original
            "primitive" Spaces (e.g. Box, Discrete).

    .. testcode::
        :skipif: True

        get_base_struct_from_space(Dict({
            "a": Box(),
            "b": Tuple([Discrete(2), Discrete(3)])
        }))

    .. testoutput::

        dict(a=Box(), b=tuple(Discrete(2), Discrete(3)))
    c                    sD   t  trtfdd D S t  tr  fdd jD S  S )Nc                 3   s    | ]} |V  qd S r   r   ).0r3   _helper_structr   r   	<genexpr>   s    zEget_base_struct_from_space.<locals>._helper_struct.<locals>.<genexpr>c                    s   i | ]	}| | qS r   r   )r9   r4   )r;   r1   r   r   
<dictcomp>   s    zFget_base_struct_from_space.<locals>._helper_struct.<locals>.<dictcomp>)r.   r   tupler   r"   r1   r:   r?   r   r;      s
   

z2get_base_struct_from_space.<locals>._helper_structr   r   r   r:   r   get_base_struct_from_space   s   r@               F)
fill_value	time_size
time_majorone_hot_discrete
batch_sizerC   rD   rE   rF   c                   s  t tjjtjjttfr,}t tjjtjjfrt}t	 fdd|S rXt tjj
rBtjddjftjnt tjjrXtjddtjftjdkrdur dkrhdksjJ r~tj fdd	tD jd
S tjfdd	t D jd
S tj dkrfdd	t D n jd
S durÈ dkrdksJ r g}n g}n	 dkrʈ gng }tj|tj jdS )a  Returns batched dummy data (using `batch_size`) for the given `space`.

    Note: The returned batch will not pass a `space.contains(batch)` test
    as an additional batch dimension has to be added at axis 0, unless `batch_size` is
    set to 0.

    Args:
        space: The space to get a dummy batch for.
        batch_size: The required batch size (B). Note that this can also
            be 0 (only if `time_size` is None!), which will result in a
            non-batched sample for the given space (no batch dim).
        fill_value: The value to fill the batch with
            or "random" for random values.
        time_size: If not None, add an optional time axis
            of `time_size` size to the returned batch. This time axis might either
            be inserted at axis=1 (default) or axis=0, if `time_major` is True.
        time_major: If True AND `time_size` is not None, return batch
            as shape [T x B x ...], otherwise as [B x T x ...]. If `time_size`
            if None, ignore this setting and return [B x ...].
        one_hot_discrete: If True, will return one-hot vectors (instead of
            int-values) for those sub-components of a (possibly complex) `space`
            that are Discrete or MultiDiscrete. Note that in case `fill_value` is 0.0,
            this will result in zero-hot vectors (where all slots have a value of 0.0).

    Returns:
        The dummy batch of size `bqtch_size` matching the given space.
    c                    s   t |  dS )N)r   rG   rC   rD   rE   rF   )get_dummy_batch_for_spacer3   )rG   rC   rF   rE   rD   r   r   <lambda>   s    z+get_dummy_batch_for_space.<locals>.<lambda>rB         ?randomNr   c                    s"   g | ]}fd dt  D qS )c                       g | ]}   qS r   r&   r9   _r   r   r   
<listcomp>       8get_dummy_batch_for_space.<locals>.<listcomp>.<listcomp>ranger9   t)rG   r   r   r   rQ          z-get_dummy_batch_for_space.<locals>.<listcomp>dtypec                    s"   g | ]} fd dt D qS )c                    rM   r   rN   rV   r   r   r   rQ     rR   rS   rT   rO   )r   rD   r   r   rQ     rX   c                    rM   r   rN   rO   r   r   r   rQ     rR   )rC   rZ   )r.   r!   r"   r   r   dictr>   r@   treemap_structureDiscreteBoxnr   float32MultiDiscretesumnvecarrayrU   rZ   r&   fulllistshape)r   rG   rC   rD   rE   rF   base_structrh   r   )rG   rC   rF   r   rE   rD   r   rH      sX   &

rH   c                 C   sN   t | tttfr%g }t| D ]}|t|dg qtj	|dd } | S )a:  Returns a single np.ndarray given a list/tuple of np.ndarrays.

    Args:
        input_ (Union[List[np.ndarray], np.ndarray]): The list of ndarrays or
            a single ndarray.

    Returns:
        np.ndarray: The result after concatenating all single arrays in input_.

    .. testcode::
        :skipif: True

        flatten_to_single_ndarray([
            np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]),
            np.array([7, 8, 9]),
        ])

    .. testoutput::

        np.array([
            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0
        ])
    r   axis)
r.   rg   r>   r[   r\   flattenr0   r   reshapeconcatenate)input_expandedin_r   r   r   flatten_to_single_ndarray#  s   rs   )'individual_items_already_have_batch_dimlist_of_structsrt   c                   s^   | st d|dkrt| d }t|d t}|rtjntj tj fddg| R  }|S )a  Converts input from a list of (nested) structs to a (nested) struct of batches.

    Input: List of structs (each of these structs representing a single batch item).
        [
            {"a": 1, "b": (4, 7.0)},  <- batch item 1
            {"a": 2, "b": (5, 8.0)},  <- batch item 2
            {"a": 3, "b": (6, 9.0)},  <- batch item 3
        ]

    Output: Struct of different batches (each batch has size=3 b/c there were 3 items
        in the original list):
        {
            "a": np.array([1, 2, 3]),
            "b": (np.array([4, 5, 6]), np.array([7.0, 8.0, 9.0]))
        }

    Args:
        list_of_structs: The list of (possibly nested) structs. Each item
            in this list represents a single batch item.
        individual_items_already_have_batch_dim: True, if the individual items in
            `list_of_structs` already have a batch dim. In this case, we will
            concatenate (instead of stack) at the end. In the example above, this would
            look like this: Input: [{"a": [1], "b": ([4], [7.0])}, ...] -> Output: same
            as in above example.
            If the special value "auto" is used,

    Returns:
        The struct of component batches. Each leaf item in this struct represents the
        batch for a single component (in case struct is tuple/dict). If the input is a
        simple list of primitive items, e.g. a list of floats, a np.array of floats
        will be returned.
    z3Input `list_of_structs` does not contain any items.autor   c                     s   t  | ddS )Nr   rk   )r   ascontiguousarrayrI   np_funcr   r   rJ   v  s    zbatch.<locals>.<lambda>)	
ValueErrorr\   rm   r.   r   r   ro   stackr]   )ru   rt   flatr7   r   rx   r   batchE  s   &
r}   c              
      sR   t | g }ttd D ] |t |  fddttD  q|S )a  Converts input from (nested) struct of batches to batch of structs.

    Input: Struct of different batches (each batch has size=3):
        {
            "a": np.array([1, 2, 3]),
            "b": (np.array([4, 5, 6]), np.array([7.0, 8.0, 9.0]))
        }
    Output: Batch (list) of structs (each of these structs representing a
        single action):
        [
            {"a": 1, "b": (4, 7.0)},  <- action 1
            {"a": 2, "b": (5, 8.0)},  <- action 2
            {"a": 3, "b": (6, 9.0)},  <- action 3
        ]

    Args:
        batches_struct: The struct of component batches. Each leaf item
            in this struct represents the batch for a single component
            (in case struct is tuple/dict).
            Alternatively, `batches_struct` may also simply be a batch of
            primitives (non tuple/dict).

    Returns:
        The list of individual structs. Each item in the returned list represents a
        single (maybe complex) batch item.
    r   c                    s   g | ]}|   qS r   r   )r9   i	batch_posflat_batchesr   r   rQ     s    zunbatch.<locals>.<listcomp>)r\   rm   rU   lenr0   unflatten_as)batches_structoutr   r   r   unbatch{  s   
r   c                 C      dd }t || |S )a  Clips all components in `action` according to the given Space.

    Only applies to Box components within the action space.

    Args:
        action: The action to be clipped. This could be any complex
            action, e.g. a dict or tuple.
        action_space: The action space struct,
            e.g. `{"a": Distrete(2)}` for a space: Dict({"a": Discrete(2)}).

    Returns:
        Any: The input action, but clipped by value according to the space's
            bounds.
    c                 S   s$   t |tjjrt| |j|j} | S r   )r.   r!   r"   r_   r   cliplowhighar3   r   r   r   map_  s   zclip_action.<locals>.map_r\   r]   )actionaction_spacer   r   r   r   clip_action  s   r   c                 C   r   )a  Unsquashes all components in `action` according to the given Space.

    Inverse of `normalize_action()`. Useful for mapping policy action
    outputs (normalized between -1.0 and 1.0) to an env's action space.
    Unsquashing results in cont. action component values between the
    given Space's bounds (`low` and `high`). This only applies to Box
    components within the action space, whose dtype is float32 or float64.

    Args:
        action: The action to be unsquashed. This could be any complex
            action, e.g. a dict or tuple.
        action_space_struct: The action space struct,
            e.g. `{"a": Box()}` for a space: Dict({"a": Box()}).

    Returns:
        Any: The input action, but unsquashed, according to the space's
            bounds. An unsquashed action is ready to be sent to the
            environment (`BaseEnv.send_actions([unsquashed actions])`).
    c                 S   s   t |tjjrFt|jrFt|jrF|jtj	ks|jtj
kr9|j| d |j|j  d  } t| |j|j} | S t|jtjrF|j|  } | S )NrK          @)r.   r!   r"   r_   r   allbounded_belowbounded_aboverZ   ra   float64r   r   r   
issubdtypeintegerr   r   r   r   r     s   


zunsquash_action.<locals>.map_r   r   action_space_structr   r   r   r   unsquash_action  s   r   c                 C   r   )a  Normalizes all (Box) components in `action` to be in [-1.0, 1.0].

    Inverse of `unsquash_action()`. Useful for mapping an env's action
    (arbitrary bounded values) to a [-1.0, 1.0] interval.
    This only applies to Box components within the action space, whose
    dtype is float32 or float64.

    Args:
        action: The action to be normalized. This could be any complex
            action, e.g. a dict or tuple.
        action_space_struct: The action space struct,
            e.g. `{"a": Box()}` for a space: Dict({"a": Box()}).

    Returns:
        Any: The input action, but normalized, according to the space's
            bounds.
    c                 S   sH   t |tjjr"|jtjks|jtjkr"| |j d |j	|j  d } | S )Nr   rK   )
r.   r!   r"   r_   rZ   r   ra   r   r   r   r   r   r   r   r     s   znormalize_action.<locals>.map_r   r   r   r   r   normalize_action  s   r   elementsampled_elementc                 C   s   dd }t j|| |ddS )aK  Convert all the components of the element to match the space dtypes.

    Args:
        element: The element to be converted.
        sampled_element: An element sampled from a space to be matched
            to.

    Returns:
        The input element, but with all its components converted to match
        the space dtypes.
    c                 S   s   t |tjrAt | tjs3t | ttfsJ d|  d|jdkr*tj| |jd} | S td	t
| |j| jkr?| |j} | S t |tsLt |tjrdt | trY|  rYt| } t | tjrdt| } | S )NzERROR: `elem` (z!) must be np.array, float or int!r   rY   zZElement should be of type np.ndarray but is instead of                             type {})r.   r   ndarrayfloatintrh   re   rZ   rz   formatr    astypeint_
is_integerfloat_int64)elemr3   r   r   r   r     s2   



z+convert_element_to_space_type.<locals>.map_F)check_typesr   )r   r   r   r   r   r   convert_element_to_space_type  s   r   )rA   )'	gymnasiumr!   gymnasium.spacesr   r   gymnasium.corer   r   numpyr   ray.rllib.utils.annotationsr   r\   typingr   r   r	   r
   r   r   Spacer   boolr%   r)   r+   r8   r@   r   r   strrH   rs   r}   r   r   r   r   r   r   r   r   r   <module>   s    


#f
!
5
(

+
