o
    $izl                     @   sn  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lm Z  d d
l!m"Z"m#Z# d dl$m%Z% erpd dl&m'Z' ej(ej(ej)ej)ej*ej*ej+ej+ej,ej,ej-ej-ej.ej.ej/ej/ej0ej0ej1ej1ej2ej2ddddddiZ3e4e5Z6e%ddG dd dZ7dS )    N)TYPE_CHECKINGAnyDictListOptionalSetTupleUnion)Columns)MultiRLModuleMultiRLModuleSpec)SingleAgentEpisode)flatten_dict)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)unpack_if_needed)ReplayBuffer)EpisodeTypeModuleID)	PublicAPI)AlgorithmConfigagent_indexdones	unroll_idalpha)	stabilityc                   @   s  e Zd ZdZedddddddeeejejf  dee	 dee
eef  d	e
eef f
d
dZede
eejf de
eejf fddZedefddZede
eef fddZdee dee fddZd%defddZeeedddddfdede
eee ejf f de
eef dedeee  dee dejd ejd	e
eef de
eee! f fd!d"Z"eeeddfdede
eee ejf f de
eef dedeee  de
eee! f fd#d$Z#dS )&OfflinePreLearnera?  Class that coordinates data transformation from dataset to learner.

    This class is an essential part of the new `Offline RL API` of `RLlib`.
    It is a callable class that is run in `ray.data.Dataset.map_batches`
    when iterating over batches for training. It's basic function is to
    convert data in batch from rows to episodes (`SingleAGentEpisode`s
    for now) and to then run the learner connector pipeline to convert
    further to trainable batches. These batches are used directly in the
    `Learner`'s `update` method.

    The main reason to run these transformations inside of `map_batches`
    is for better performance. Batches can be pre-fetched in `ray.data`
    and therefore batch trransformation can be run highly parallelized to
    the `Learner''s `update`.

    This class can be overridden to implement custom logic for transforming
    batches and make them 'Learner'-ready. When deriving from this class
    the `__call__` method and `_map_to_episodes` can be overridden to induce
    custom logic for the complete transformation pipeline (`__call__`) or
    for converting to episodes only ('_map_to_episodes`).

    Custom `OfflinePreLearner` classes can be passed into
    `AlgorithmConfig.offline`'s `prelearner_class`. The `OfflineData` class
    will then use the custom class in its data pipeline.
    N)spacesmodule_specmodule_stateconfigr   r   r   r   kwargsc                K   s   || _ | j j| _| j j| _| | _| j| |pd\| _| _| j j| j| jd| _	| j j
| _|j| _d| _| jsB| j sB| jrZ| j jpH| j}| j| j jB }|di || _d S d S )N)NN)input_observation_spaceinput_action_spacer    )r    input_read_episodesinput_read_sample_batchesbuild_module	set_stateobservation_spaceaction_spacebuild_learner_connector_learner_connectorpolicies_to_train_policies_to_trainis_multi_agent_is_multi_agentiter_since_last_module_updateis_statefulprelearner_buffer_classdefault_prelearner_buffer_class default_prelearner_buffer_kwargsprelearner_buffer_kwargsepisode_buffer)selfr    r   r   r   r!   r4   r7   r$   r$   a/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/rllib/offline/offline_prelearner.py__init__Q   s<   



zOfflinePreLearner.__init__batchreturnc              	      sv  | j rDddlddl  fdd|d D }| |}| j| | jj| jj| j	
 r5| jjddnd| jddp>dd	d	d
}nZ| jrtj| j|d	t| jjB | jjdd }| |}| j| | jj| jj| j	
 rx| jjddnd| jddpdd	d	d
}n| j| j|t| jjB d| jj| j| jdd }| j| j	i |i dd}|D ]}| ||s||= qt|S )aq  Prepares plain data batches for training with `Learner`'s.

        Args:
            batch: A dictionary of numpy arrays containing either column data
                with `self.config.input_read_schema`, `EpisodeType` data, or
                `BatchType` data.

        Returns:
            A `MultiAgentBatch` that can be passed to `Learner.update` methods.
        r   Nc                    s"   g | ]}t j| jd qS ))object_hook)r   
from_stateunpackbdecode).0statemnpmsgpackr$   r:   
<listcomp>   s    z.OfflinePreLearner.__call__.<locals>.<listcomp>itemmax_seq_lenn_step   T)	num_itemsbatch_length_TrJ   sample_episodesto_numpy)rO   schemainput_compress_columnsepisodesF)rP   rO   rQ   r*   r+   )	rl_moduler<   rR   shared_datametrics)r%   rF   msgpack_numpy_validate_episodesr8   addsampler    train_batch_size_per_learnerr(   r3   model_configgetr&   r   _map_sample_batch_to_episoder1   SCHEMAinput_read_schemarQ   _map_to_episodesr*   r+   r-   _should_module_be_updatedr   )r9   r<   rR   	module_idr$   rD   r:   __call__   s|   



&zOfflinePreLearner.__call__c                 C   s   ddl m} |S )zSets the default replay buffer.r   )EpisodeReplayBuffer)4ray.rllib.utils.replay_buffers.episode_replay_bufferrd   )r9   rd   r$   r$   r:   r5     s   z1OfflinePreLearner.default_prelearner_buffer_classc                 C   s   | j jd | j jdS )zSets the default arguments for the replay buffer.

        Note, the `capacity` might vary with the size of the episodes or
        sample batches in the offline dataset.
        
   )capacitybatch_size_B)r    rZ   )r9   r$   r$   r:   r6     s   
z2OfflinePreLearner.default_prelearner_buffer_kwargsrR   c                 C   sb   t dd |D stdt }t }|D ]}|j|vr.|j| jjvr.||j || q|S )a  Validate episodes sampled from the dataset.

        Note, our episode buffers cannot handle either duplicates nor
        non-ordered fragmentations, i.e. fragments from episodes that do
        not arrive in timestep order.

        Args:
            episodes: A list of `SingleAgentEpisode` instances sampled
                from a dataset.

        Returns:
            A set of `SingleAgentEpisode` instances.

        Raises:
            ValueError: If not all episodes are `done`.
        c                 s   s    | ]}|j V  qd S N)is_done)rB   epsr$   r$   r:   	<genexpr>6  s    z7OfflinePreLearner._validate_episodes.<locals>.<genexpr>zWhen sampling from episodes (`input_read_episodes=True`) all recorded episodes must be done (i.e. either `terminated=True`) or `truncated=True`).)all
ValueErrorsetid_r8   episode_id_to_indexrX   )r9   rR   unique_episode_idscleaned_episodesrk   r$   r$   r:   rW   "  s   

z$OfflinePreLearner._validate_episodesc                 C   s.   | j sdS t| j s|t| j v S |  ||S )z:Checks which modules in a MultiRLModule should be updated.T)r/   callablero   )r9   rb   multi_agent_batchr$   r$   r:   ra   I  s
   
z+OfflinePreLearner._should_module_be_updatedFr0   rP   rO   rQ   ignore_final_observationr*   r+   c                    s  pg g }	t |tj  D ]\ }
| r4tj|v r#|tj    nd |v r1|d    nd}nd}| r9qtjv rBt|
n|
}|rRtdd t|}ntjv rbt|tj	    n|tj	    }t
tj |v r~t|tj    nt j|||gi tj |v r|tj    ni gtjv rt|tj    n|tj    g|tj    g|tj |v rȈtj nd   tj |v r|tj    nd fdd| D d	d

}|r|  |	| qd|	iS )z!Maps a batch of data to episodes.r   Nc                 S   s   d|  S )Nr   r$   )xr$   r$   r:   <lambda>  s    z4OfflinePreLearner._map_to_episodes.<locals>.<lambda>r   Fc                    sL   i | ]"\}}|vr$|  vr$|d vr||v rt|  n|  gqS )r   r   typevaluesr   rB   kvirQ   rP   r$   r:   
<dictcomp>  s    	z6OfflinePreLearner._map_to_episodes.<locals>.<dictcomp>r   
rp   agent_idobservationsinfosactionsrewards
terminated	truncatedextra_model_outputslen_lookback_bufferrR   )	enumerater
   OBSAGENT_IDr   treemap_structurecopydeepcopyNEXT_OBSr   EPS_IDstruuiduuid4hexINFOSACTIONSREWARDSTERMINATEDS
TRUNCATEDSitemsrO   append)r0   r<   rP   rO   rQ   rv   r*   r+   r!   rR   obsr   unpacked_obsunpacked_next_obsepisoder$   r   r:   r`   S  sx   




	8z"OfflinePreLearner._map_to_episodesc           
         s  pg g }t |tj  D ]l\ | r)d |v r&|d    d nd}nd}| r.qttrFtfddtjd D n"ttj	r^tj v rYt
 n
 n
tdt dtj |v rtj v rt|tj    d n
|tj    d  nd  tj |v rtj |v r|tj    d }|tj    d }nLtj |v rڈtj |vr|tj    d }d	}n0tj |vrtj |v r|tj    d }d	}nd
|v r|d
   d }d	}nd}d	}ttj |v r t|tj    d nt j|tj |v r7|tj    ni gt tjv rNt|tj    n|tj    |tj    || fdd| D dd
}	|rw|	  ||	 qd|iS )z6Maps an old stack `SampleBatch` to new stack episodes.r   r   Nc                    s   g | ]} |d f qS ).r$   )rB   r   )r   r$   r:   rG     s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<listcomp>zUnknown observation type: z. When mapping from old recorded `SampleBatches` batched observations should be either of type `np.array` or - if the column is compressed - of `str` type.FdoneTc                    sJ   i | ]!\}}|vr#|  vr#|d vr||v rt|  n|  qS ry   r{   r}   r   r$   r:   r   Z  s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<dictcomp>r   rR   )r   r
   r   
isinstancer   r   rangeshapenpndarraytolist	TypeErrorrz   r   r   r   r   r   r   r   r   r   r   lenr   r   r   rO   )
r0   r<   rP   rO   rQ   rR   r   r   r   r   r$   )r   rQ   r   rP   r:   r]     s   	

	.z.OfflinePreLearner._map_sample_batch_to_episoderi   )$__name__
__module____qualname____doc__r   r   r   gymSpacer   r   r   r   r   r;   r   r   r   rc   propertyr   r5   r6   r   r   r   rW   boolra   staticmethodr^   r	   listr   r`   r]   r$   r$   r$   r:   r   5   s    
:(	
'



	
x

r   )8r   loggingr   typingr   r   r   r   r   r   r   r	   	gymnasiumr   numpyr   r   ray.rllib.core.columnsr
   (ray.rllib.core.rl_module.multi_rl_moduler   r   "ray.rllib.env.single_agent_episoder   ray.rllib.utilsr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.compressionr   ,ray.rllib.utils.replay_buffers.replay_bufferr   ray.rllib.utils.typingr   r   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   r   r   	MODULE_IDr   r   r   r   r   r   r   Tr^   	getLoggerr   loggerr   r$   r$   r$   r:   <module>   sH    (	
