o
    ciXo                     @   sz  d dl Z d dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lm Z  d d
l!m"Z" d dl#m$Z$m%Z% d dl&m'Z' ervd dl(m)Z) ej*ej*ej+ej+ej,ej,ej-ej-ej.ej.ej/ej/ej0ej0ej1ej1ej2ej2ej3ej3ej4ej4ddddddiZ5e6e7Z8e'ddG dd dZ9dS )    N)AnyDictListOptionalUnionSetTupleTYPE_CHECKING)Columns)MultiRLModuleSpecMultiRLModule)SingleAgentEpisode)flatten_dict)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)unpack_if_needed)ReplayBuffer)from_jsonable_if_needed)EpisodeTypeModuleID)	PublicAPI)AlgorithmConfigagent_indexdones	unroll_idalpha)	stabilityc                   @   s  e Zd ZdZedddddddeeejejf  dee	 dee
eef  d	e
eef f
d
dZede
eejf de
eejf fddZedefddZede
eef fddZdee dee fddZd%defddZeeedddddfdede
eee ejf f de
eef dedeee  dee dejd ejd	e
eef de
eee! f fd!d"Z"eeeddfdede
eee ejf f de
eef dedeee  de
eee! f fd#d$Z#dS )&OfflinePreLearnera?  Class that coordinates data transformation from dataset to learner.

    This class is an essential part of the new `Offline RL API` of `RLlib`.
    It is a callable class that is run in `ray.data.Dataset.map_batches`
    when iterating over batches for training. It's basic function is to
    convert data in batch from rows to episodes (`SingleAGentEpisode`s
    for now) and to then run the learner connector pipeline to convert
    further to trainable batches. These batches are used directly in the
    `Learner`'s `update` method.

    The main reason to run these transformations inside of `map_batches`
    is for better performance. Batches can be pre-fetched in `ray.data`
    and therefore batch trransformation can be run highly parallelized to
    the `Learner''s `update`.

    This class can be overridden to implement custom logic for transforming
    batches and make them 'Learner'-ready. When deriving from this class
    the `__call__` method and `_map_to_episodes` can be overridden to induce
    custom logic for the complete transformation pipeline (`__call__`) or
    for converting to episodes only ('_map_to_episodes`).

    Custom `OfflinePreLearner` classes can be passed into
    `AlgorithmConfig.offline`'s `prelearner_class`. The `OfflineData` class
    will then use the custom class in its data pipeline.
    N)spacesmodule_specmodule_stateconfigr   r   r   r    kwargsc                K   s   || _ | j j| _| j j| _| | _| j| |pd\| _| _| j j| j| jd| _	| j j
| _|j| _d| _| jsB| j sB| jrZ| j jpH| j}| j| j jB }|di || _d S d S )N)NN)input_observation_spaceinput_action_spacer    )r!   input_read_episodesinput_read_sample_batchesbuild_module	set_stateobservation_spaceaction_spacebuild_learner_connector_learner_connectorpolicies_to_train_policies_to_trainis_multi_agent_is_multi_agentiter_since_last_module_updateis_statefulprelearner_buffer_classdefault_prelearner_buffer_class default_prelearner_buffer_kwargsprelearner_buffer_kwargsepisode_buffer)selfr!   r   r   r    r"   r5   r8   r%   r%   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/offline_prelearner.py__init__R   s<   



zOfflinePreLearner.__init__batchreturnc              	      sv  | j rDddlddl  fdd|d D }| |}| j| | jj| jj| j	
 r5| jjddnd| jddp>dd	d	d
}nZ| jrtj| j|d	t| jjB | jjdd }| |}| j| | jj| jj| j	
 rx| jjddnd| jddpdd	d	d
}n| j| j|t| jjB d| jj| j| jdd }| j| j	i |i dd}|D ]}| ||s||= qt|S )aq  Prepares plain data batches for training with `Learner`'s.

        Args:
            batch: A dictionary of numpy arrays containing either column data
                with `self.config.input_read_schema`, `EpisodeType` data, or
                `BatchType` data.

        Returns:
            A `MultiAgentBatch` that can be passed to `Learner.update` methods.
        r   Nc                    s"   g | ]}t j| jd qS ))object_hook)r   
from_stateunpackbdecode).0statemnpmsgpackr%   r;   
<listcomp>   s    z.OfflinePreLearner.__call__.<locals>.<listcomp>itemmax_seq_lenn_step   T)	num_itemsbatch_length_TrK   sample_episodesto_numpy)rP   schemainput_compress_columnsepisodesF)rQ   rP   rR   r+   r,   )	rl_moduler=   rS   shared_datametrics)r&   rG   msgpack_numpy_validate_episodesr9   addsampler!   train_batch_size_per_learnerr)   r4   model_configgetr'   r   _map_sample_batch_to_episoder2   SCHEMAinput_read_schemarR   _map_to_episodesr+   r,   r.   _should_module_be_updatedr   )r:   r=   rS   	module_idr%   rE   r;   __call__   s|   



&zOfflinePreLearner.__call__c                 C   s   ddl m} |S )zSets the default replay buffer.r   )EpisodeReplayBuffer)4ray.rllib.utils.replay_buffers.episode_replay_bufferre   )r:   re   r%   r%   r;   r6     s   z1OfflinePreLearner.default_prelearner_buffer_classc                 C   s   | j jd | j jdS )zSets the default arguments for the replay buffer.

        Note, the `capacity` might vary with the size of the episodes or
        sample batches in the offline dataset.
        
   )capacitybatch_size_B)r!   r[   )r:   r%   r%   r;   r7     s   
z2OfflinePreLearner.default_prelearner_buffer_kwargsrS   c                    s8   t dd |D stdt  fdd|D }|S )a  Validate episodes sampled from the dataset.

        Note, our episode buffers cannot handle either duplicates nor
        non-ordered fragmentations, i.e. fragments from episodes that do
        not arrive in timestep order.

        Args:
            episodes: A list of `SingleAgentEpisode` instances sampled
                from a dataset.

        Returns:
            A set of `SingleAgentEpisode` instances.

        Raises:
            ValueError: If not all episodes are `done`.
        c                 s   s    | ]}|j V  qd S N)is_donerC   epsr%   r%   r;   	<genexpr>7  s    z7OfflinePreLearner._validate_episodes.<locals>.<genexpr>zWhen sampling from episodes (`input_read_episodes=True`) all recorded episodes must be done (i.e. either `terminated=True`) or `truncated=True`).c                    s8   h | ]}|j vr|j s|j  jj vr|qS r%   )id_rY   r9   episode_id_to_indexkeysrl   r:   unique_episode_idsr%   r;   	<setcomp>@  s    

z7OfflinePreLearner._validate_episodes.<locals>.<setcomp>)all
ValueErrorset)r:   rS   r%   rr   r;   rX   #  s   z$OfflinePreLearner._validate_episodesc                 C   s.   | j sdS t| j s|t| j v S |  ||S )z:Checks which modules in a MultiRLModule should be updated.T)r0   callablerw   )r:   rc   multi_agent_batchr%   r%   r;   rb   I  s
   
z+OfflinePreLearner._should_module_be_updatedFr1   rQ   rP   rR   ignore_final_observationr+   r,   c                    s6  pg |r|rt }	ndd }	g }
t|tj  D ]\ }| r?tj|v r.|tj    nd |v r<|d    nd}nd}| rDqtjv rP|	t||n|	||}|rctdd t	|}ntjv rv|	t|tj
    |n|	|tj
    |}ttj |v rt|tj    nt j|||gi tj |v r|tj    ni gtjv r|	t|tj    |n|	|tj    |g|tj    g|tj |v rtj nd   tj |v r|tj    nd fd	d
| D dd
}|r|  |
| qd|
iS )z!Maps a batch of data to episodes.c                 S   s   | S rj   r%   )rZ   spacer%   r%   r;   convertl  s   z3OfflinePreLearner._map_to_episodes.<locals>.convertr   Nc                 S   s   d|  S )Nr   r%   )xr%   r%   r;   <lambda>  s    z4OfflinePreLearner._map_to_episodes.<locals>.<lambda>r   Fc                    sL   i | ]"\}}|vr$|  vr$|d vr||v rt|  n|  gqS )r   r   typevaluesr   rC   kvirR   rQ   r%   r;   
<dictcomp>  s    	z6OfflinePreLearner._map_to_episodes.<locals>.<dictcomp>r   
ro   agent_idobservationsinfosactionsrewards
terminated	truncatedextra_model_outputslen_lookback_bufferrS   )r   	enumerater
   OBSAGENT_IDr   treemap_structurecopydeepcopyNEXT_OBSr   EPS_IDstruuiduuid4hexINFOSACTIONSREWARDSTERMINATEDS
TRUNCATEDSitemsrP   append)r1   r=   rQ   rP   rR   rz   r+   r,   r"   r|   rS   obsr   unpacked_obsunpacked_next_obsepisoder%   r   r;   ra   S  s   


	

	;z"OfflinePreLearner._map_to_episodesc           
         s  pg g }t |tj  D ]l\ | r)d |v r&|d    d nd}nd}| r.qttrFtfddtjd D n"ttj	r^tj v rYt
 n
 n
tdt dtj |v rtj v rt|tj    d n
|tj    d  nd  tj |v rtj |v r|tj    d }|tj    d }nLtj |v rڈtj |vr|tj    d }d	}n0tj |vrtj |v r|tj    d }d	}nd
|v r|d
   d }d	}nd}d	}ttj |v r t|tj    d nt j|tj |v r7|tj    ni gt tjv rNt|tj    n|tj    |tj    || fdd| D dd
}	|rw|	  ||	 qd|iS )z6Maps an old stack `SampleBatch` to new stack episodes.r   r   Nc                    s   g | ]} |d f qS ).r%   )rC   r   )r   r%   r;   rH     s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<listcomp>zUnknown observation type: z. When mapping from old recorded `SampleBatches` batched observations should be either of type `np.array` or - if the column is compressed - of `str` type.FdoneTc                    sJ   i | ]!\}}|vr#|  vr#|d vr||v rt|  n|  qS r   r   r   r   r%   r;   r   m  s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<dictcomp>r   rS   )r   r
   r   
isinstancer   r   rangeshapenpndarraytolist	TypeErrorr   r   r   r   r   r   r   r   r   r   r   lenr   r   r   rP   )
r1   r=   rQ   rP   rR   rS   r   r   r   r   r%   )r   rR   r   rQ   r;   r^     s   	

	.z.OfflinePreLearner._map_sample_batch_to_episoderj   )$__name__
__module____qualname____doc__r   r   r   gymSpacer   r   r   r   r   r<   r   r   r   rd   propertyr   r6   r7   r   r   r   rX   boolrb   staticmethodr_   r   listr   ra   r^   r%   r%   r%   r;   r   6   s    
:(	
&



	
 

r   ):r   	gymnasiumr   loggingnumpyr   r   r   typingr   r   r   r   r   r   r   r	   ray.rllib.core.columnsr
   (ray.rllib.core.rl_module.multi_rl_moduler   r   "ray.rllib.env.single_agent_episoder   ray.rllib.utilsr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.compressionr   ,ray.rllib.utils.replay_buffers.replay_bufferr   "ray.rllib.utils.spaces.space_utilsr   ray.rllib.utils.typingr   r   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   r   r   	MODULE_IDr   r   r   r   r   r   r   Tr_   	getLoggerr   loggerr   r%   r%   r%   r;   <module>   sJ    (	
