o
    `۷io                     @   sn  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lm Z  d d
l!m"Z"m#Z# d dl$m%Z% erpd dl&m'Z' ej(ej(ej)ej)ej*ej*ej+ej+ej,ej,ej-ej-ej.ej.ej/ej/ej0ej0ej1ej1ej2ej2ddddddiZ3e4e5Z6e%ddG dd dZ7dS )    N)TYPE_CHECKINGAnyDictListOptionalSetTupleUnion)Columns)MultiRLModuleMultiRLModuleSpec)SingleAgentEpisode)flatten_dict)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)unpack_if_needed)ReplayBuffer)EpisodeTypeModuleID)	PublicAPI)AlgorithmConfigagent_indexdones	unroll_idalpha)	stabilityc                   @   s  e Zd ZdZedddddddeeejejf  dee	 dee
eef  d	e
eef f
d
dZede
eejf de
eejf fddZedefddZede
eef fddZdee dee fddZdee dee fddZdee dee fddZd)defddZeee dddddfdede
ee!e"ejf f de
eef d ed!eee  d"ee d#ejd$ejd	e
eef de
eee# f fd%d&Z$eee ddfdede
ee!e"ejf f de
eef d ed!eee  de
eee# f fd'd(Z%dS )*OfflinePreLearnera?  Class that coordinates data transformation from dataset to learner.

    This class is an essential part of the new `Offline RL API` of `RLlib`.
    It is a callable class that is run in `ray.data.Dataset.map_batches`
    when iterating over batches for training. It's basic function is to
    convert data in batch from rows to episodes (`SingleAGentEpisode`s
    for now) and to then run the learner connector pipeline to convert
    further to trainable batches. These batches are used directly in the
    `Learner`'s `update` method.

    The main reason to run these transformations inside of `map_batches`
    is for better performance. Batches can be pre-fetched in `ray.data`
    and therefore batch trransformation can be run highly parallelized to
    the `Learner''s `update`.

    This class can be overridden to implement custom logic for transforming
    batches and make them 'Learner'-ready. When deriving from this class
    the `__call__` method and `_map_to_episodes` can be overridden to induce
    custom logic for the complete transformation pipeline (`__call__`) or
    for converting to episodes only ('_map_to_episodes`).

    Custom `OfflinePreLearner` classes can be passed into
    `AlgorithmConfig.offline`'s `prelearner_class`. The `OfflineData` class
    will then use the custom class in its data pipeline.
    N)spacesmodule_specmodule_stateconfigr   r   r   r   kwargsc                K   s   || _ | j j| _| j j| _| | _| j| |pd\| _| _| j j| j| jd| _	| j j
| _|j| _d| _| jsB| j sB| jrZ| j jpH| j}| j| j jB }|di || _d S d S )N)NN)input_observation_spaceinput_action_spacer    )r    input_read_episodesinput_read_sample_batchesbuild_module	set_stateobservation_spaceaction_spacebuild_learner_connector_learner_connectorpolicies_to_train_policies_to_trainis_multi_agent_is_multi_agentiter_since_last_module_updateis_statefulprelearner_buffer_classdefault_prelearner_buffer_class default_prelearner_buffer_kwargsprelearner_buffer_kwargsepisode_buffer)selfr    r   r   r   r!   r4   r7   r$   r$   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/offline/offline_prelearner.py__init__Q   s<   



zOfflinePreLearner.__init__batchreturnc              	      s   | j rddlddl  fdd|d D }| |}n3| jr9tj| j|dt| j	j
B | j	jdd }| |}n| j| j|t| j	j
B d	| j	j| j| jd
d }| j| ji |i dd}|D ]}| ||sh||= q]t|S )aq  Prepares plain data batches for training with `Learner`'s.

        Args:
            batch: A dictionary of numpy arrays containing either column data
                with `self.config.input_read_schema`, `EpisodeType` data, or
                `BatchType` data.

        Returns:
            A `MultiAgentBatch` that can be passed to `Learner.update` methods.
        r   Nc                    s"   g | ]}t j| jd qS ))object_hook)r   
from_stateunpackbdecode).0statemnpmsgpackr$   r:   
<listcomp>   s    z.OfflinePreLearner.__call__.<locals>.<listcomp>itemT)to_numpyschemainput_compress_columnsepisodesF)rJ   rI   rK   r*   r+   )	rl_moduler<   rL   shared_datametrics)r%   rF   msgpack_numpy_postprocess_and_sampler&   r   _map_sample_batch_to_episoder1   SCHEMAr    input_read_schemarK   _map_to_episodesr*   r+   r-   r(   _should_module_be_updatedr   )r9   r<   rL   	module_idr$   rD   r:   __call__   sT   

&zOfflinePreLearner.__call__c                 C   s   ddl m} |S )zSets the default replay buffer.r   )EpisodeReplayBuffer)4ray.rllib.utils.replay_buffers.episode_replay_bufferrY   )r9   rY   r$   r$   r:   r5      s   z1OfflinePreLearner.default_prelearner_buffer_classc                 C   s   | j jd | j jdS )zSets the default arguments for the replay buffer.

        Note, the `capacity` might vary with the size of the episodes or
        sample batches in the offline dataset.
        
   )capacitybatch_size_B)r    train_batch_size_per_learner)r9   r$   r$   r:   r6      s   
z2OfflinePreLearner.default_prelearner_buffer_kwargsrL   c                 C   sb   t dd |D stdt }t }|D ]}|j|vr.|j| jjvr.||j || q|S )a  Validate episodes sampled from the dataset.

        Note, our episode buffers cannot handle either duplicates nor
        non-ordered fragmentations, i.e. fragments from episodes that do
        not arrive in timestep order.

        Args:
            episodes: A list of `SingleAgentEpisode` instances sampled
                from a dataset.

        Returns:
            A set of `SingleAgentEpisode` instances.

        Raises:
            ValueError: If not all episodes are `done`.
        c                 s   s    | ]}|j V  qd S N)is_done)rB   epsr$   r$   r:   	<genexpr>  s    z7OfflinePreLearner._validate_episodes.<locals>.<genexpr>zWhen sampling from episodes (`input_read_episodes=True`) all recorded episodes must be done (i.e. either `terminated=True`) or `truncated=True`).)all
ValueErrorsetid_r8   episode_id_to_indexadd)r9   rL   unique_episode_idscleaned_episodesra   r$   r$   r:   _validate_episodes  s   

z$OfflinePreLearner._validate_episodesc                 C   s:   |D ]}t j|jv r|jt j= t j|jv r|jt j= q|S )a]  Removes states from episodes.

        This is necessary, if the module is stateful and we want to
        enable the offline RLModule to learn its own state representations.

        Args:
            episodes: A list of `SingleAgentEpisode` instances.

        Returns:
            A list of `SingleAgentEpisode` instances without states.
        )r
   	STATE_OUTextra_model_outputsSTATE_IN)r9   rL   ra   r$   r$   r:   _remove_states_from_episodes/  s   

z.OfflinePreLearner._remove_states_from_episodesc                 C   s   |  |}| j r| jjs| |}| j| | j r&| jj	ddnd}| jj
| jj|| j	dddd| jjt| jdddS )	zPostprocesses episodes and samples from the buffer.

        Args:
            episodes: A list of `SingleAgentEpisode` instances.

        Returns:
            A list of `SingleAgentEpisode` instances sampled from the buffer.
        max_seq_lenr   Nn_step   T
burnin_len)	num_itemsbatch_length_Trq   sample_episodesrI   lookbackmin_batch_length_T)rk   r(   r3   r    %prelearner_use_recorded_module_statesro   r8   rh   model_configgetsampler^   episode_lookback_horizongetattr)r9   rL   ru   r$   r$   r:   rQ   E  s(   

z)OfflinePreLearner._postprocess_and_samplec                 C   s.   | j sdS t| j s|t| j v S |  ||S )z:Checks which modules in a MultiRLModule should be updated.T)r/   callablere   )r9   rW   multi_agent_batchr$   r$   r:   rV   o  s
   
z+OfflinePreLearner._should_module_be_updatedFr0   rJ   rI   rK   ignore_final_observationr*   r+   c                    s  pg g }	t |tj  D ]\ }
| r4tj|v r#|tj    nd |v r1|d    nd}nd}| r9qtjv rBt|
n|
}|rRtdd t|}ntjv rbt|tj	    n|tj	    }t
tj |v r~t|tj    nt j|||gi tj |v r|tj    ni gtjv rt|tj    n|tj    g|tj    g|tj |v rȈtj nd   tj |v r|tj    nd fdd| D d	d

}|r|  |	| qd|	iS )z!Maps a batch of data to episodes.r   Nc                 S   s   d|  S )Nr   r$   )xr$   r$   r:   <lambda>  s    z4OfflinePreLearner._map_to_episodes.<locals>.<lambda>r   Fc                    sL   i | ]"\}}|vr$|  vr$|d vr||v rt|  n|  gqS )r   r   typevaluesr   rB   kvirK   rJ   r$   r:   
<dictcomp>  s    	z6OfflinePreLearner._map_to_episodes.<locals>.<dictcomp>r   
rf   agent_idobservationsinfosactionsrewards
terminated	truncatedrm   len_lookback_bufferrL   )	enumerater
   OBSAGENT_IDr   treemap_structurecopydeepcopyNEXT_OBSr   EPS_IDstruuiduuid4hexINFOSACTIONSREWARDSTERMINATEDS
TRUNCATEDSitemsrI   append)r0   r<   rJ   rI   rK   r   r*   r+   r!   rL   obsr   unpacked_obsunpacked_next_obsepisoder$   r   r:   rU   y  sx   




	8z"OfflinePreLearner._map_to_episodesc           
         s  pg g }t |tj  D ]l\ | r)d |v r&|d    d nd}nd}| r.qttrFtfddtjd D n"ttj	r^tj v rYt
 n
 n
tdt dtj |v rtj v rt|tj    d n
|tj    d  nd  tj |v rtj |v r|tj    d }|tj    d }nLtj |v rڈtj |vr|tj    d }d	}n0tj |vrtj |v r|tj    d }d	}nd
|v r|d
   d }d	}nd}d	}ttj |v r t|tj    d nt j|tj |v r7|tj    ni gt tjv rNt|tj    n|tj    |tj    || fdd| D dd
}	|rw|	  ||	 qd|iS )z6Maps an old stack `SampleBatch` to new stack episodes.r   r   Nc                    s   g | ]} |d f qS ).r$   )rB   r   )r   r$   r:   rG   $  s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<listcomp>zUnknown observation type: z. When mapping from old recorded `SampleBatches` batched observations should be either of type `np.array` or - if the column is compressed - of `str` type.FdoneTc                    sJ   i | ]!\}}|vr#|  vr#|d vr||v rt|  n|  qS r   r   r   r   r$   r:   r     s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<dictcomp>r   rL   )r   r
   r   
isinstancer   r   rangeshapenpndarraytolist	TypeErrorr   r   r   r   r   r   r   r   r   r   r   lenr   r   r   rI   )
r0   r<   rJ   rI   rK   rL   r   r   r   r   r$   )r   rK   r   rJ   r:   rR     s   	

	.z.OfflinePreLearner._map_sample_batch_to_episoder_   )&__name__
__module____qualname____doc__r   r   r   gymSpacer   r   r   r   r   r;   r   r   r   rX   propertyr   r5   r6   r   r   r   rk   ro   rQ   boolrV   staticmethodrS   r	   listr   rU   rR   r$   r$   r$   r:   r   5   s    
:(e	
'

*



	
x

r   )8r   loggingr   typingr   r   r   r   r   r   r   r	   	gymnasiumr   numpyr   r   ray.rllib.core.columnsr
   (ray.rllib.core.rl_module.multi_rl_moduler   r   "ray.rllib.env.single_agent_episoder   ray.rllib.utilsr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.compressionr   ,ray.rllib.utils.replay_buffers.replay_bufferr   ray.rllib.utils.typingr   r   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   r   r   	MODULE_IDr   r   r   r   r   r   r   TrS   	getLoggerr   loggerr   r$   r$   r$   r:   <module>   sH    (	
