o
    ciu4                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ e eZ eddG dd deZ!dS )    N)Path)List)AlgorithmConfig)Columns)	EnvRunner)SingleAgentEnvRunner)SingleAgentEpisode)override5OverrideToImplementCustomLogic_CallToSuperRecommendedOverrideToImplementCustomLogic)pack_if_needed)to_jsonable_if_needed)EpisodeType)log_once)	PublicAPIalpha)	stabilityc                       s   e Zd ZdZeeedef fddZeee	ddddddde
d	e
d
edededee f fddZeee	dddZe	dee ddfddZ  ZS )OfflineSingleAgentEnvRunnerz7The environment runner to record the single agent case.configc                   s~  t  jdd|i| tjj }|j|jj_	| j
j| _| j
j| _| j
j| _| j
j| _d | _| j
j| _| j
j | _d| j d | _| jdkrXdd l}|jdi | j| _n8| jdkrnddlm} |jdi | j| _n"| jdkrdd l}|j di | j| _n| jd urt!d	| j d
| j"d| ji | j
j#| _#| j
j$| _$| j
j%| _%| j%rd| _&nd| _&| j
j'| _(d| _)g | _*d S )Nr   zrun-   gcsr   s3)fsabszUnknown filesystem: z9. Filesystems can be 'gcs' for GCS, 's3' for S3, or 'abs'
filesystemFT )+super__init__raydataDataContextget_currentnum_cpus_per_env_runnerexecution_optionsresource_limitscpur   output_write_methodoutput_write_method_kwargsoutput_filesystemr   output_filesystem_kwargsfilesystem_kwargsfilesystem_objectoutputoutput_pathenvlowersubdir_pathworker_indexzfillworker_pathgcsfsGCSFileSystempyarrowr   S3FileSystemadlfsAzureBlobFileSystem
ValueErrorupdateoutput_write_episodesoutput_compress_columnsoutput_max_rows_per_filewrite_data_this_iteroutput_write_remaining_datawrite_remaining_data_sample_counter_samples)selfr   kwargsdata_contextr4   r   r8   	__class__r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/offline_env_runner.pyr   !   sL   













z$OfflineSingleAgentEnvRunner.__init__NFnum_timestepsnum_episodesexplorerandom_actionsforce_resetrK   rL   rM   rN   rO   returnc             
      s  t  j|||||d}|  jd7  _| jr5ddlddl tdr&td | j	
 fdd|D  n| | | jrHt| j	| jkrHd	| _| jr| jryd
| _t| j	| jkrx| j	d| j }| j	| jd | _	tj|}t| j	| jksYntj| j	}z0t| j| j| jd| j d }	t|| j|	 fi | j td|	 d W n ty }
 zt|
 W Y d}
~
nd}
~
ww | jjdt| j	d |S )z2Samples from environments and writes data to disk.rJ      r   NmsgpackzPacking episodes with `msgpack` and encode array with `msgpack_numpy` for serialization. This is needed for recording episodes.c                    s    g | ]}j |  jd qS ))default)packb	get_stateencode).0epsmnprR   r   rI   
<listcomp>   s     z6OfflineSingleAgentEnvRunner.sample.<locals>.<listcomp>TF-r   zWrote samples to storage at .recording_buffer_size)keyvalue) r   samplerB   r<   rR   msgpack_numpyr   loggerinforC   extend_map_episodes_to_datar>   lenr?   r   r   
from_itemsr   r-   joinpathr0   r3   r2   getattrr&   as_posixr'   	Exceptionerrormetrics	log_value)rD   rK   rL   rM   rN   rO   samplessamples_to_write
samples_dspatherG   rY   rI   ra   x   sh   

z"OfflineSingleAgentEnvRunner.samplec              
   C   s   | j r\| jr\tj| j }|  jd7  _z0t| j| j	| j
d| j d }t|| j| fi | j td| d W n ty[ } zt| W Y d}~nd}~ww tdt| j   dS )zWrites the reamining samples to disk

        Note, if the user defined `max_rows_per_file` the
        number of rows for the remaining samples could be
        less than the defined maximum row number by the user.
        rQ   r\   r   z"Wrote final samples to storage at zY. Note Note, final samples could be smaller in size than `max_rows_per_file`, if defined.NzExperience buffer length: )rC   rA   r   r   rh   rB   r   r-   ri   r0   r3   r2   rj   r&   rk   r'   rc   rd   rl   rm   debugrg   )rD   rr   rs   rt   r   r   rI   stop   s*   

z OfflineSingleAgentEnvRunner.stoprp   c                    sV  j j}j j}|D ]ttD ] tjjtjj	tj
jtjtjjv r2tt |nt |tjtjjv rKtt |nt |tj tjtjjv rltt d |n	t d |tj td k rdnjtj td k rdnji	 fddj D }j| qq
dS )zConverts list of episodes to list of single dict experiences.

        Note, this method also appends all sampled experiences to the
        buffer.

        Args:
            samples: List of episodes to be converted.
        rQ   Fc                    s4   i | ]}||j v rt| n| qS r   )r=   r   get_extra_model_outputs)rW   kira   rD   r   rI   
<dictcomp>.  s    

zEOfflineSingleAgentEnvRunner._map_episodes_to_data.<locals>.<dictcomp>N)r.   observation_spaceaction_spacerangerg   r   EPS_IDid_AGENT_IDagent_id	MODULE_ID	module_idOBSr=   r   r   get_observationsACTIONSget_actionsREWARDSget_rewardsNEXT_OBSTERMINATEDSis_terminated
TRUNCATEDSis_truncatedextra_model_outputskeysrC   append)rD   rp   	obs_spacer}   sample_datar   ry   rI   rf      sX   (z1OfflineSingleAgentEnvRunner._map_episodes_to_data)rP   N)__name__
__module____qualname____doc__r	   r   r
   r   r   r   intboolr   r   ra   r   rv   r   rf   __classcell__r   r   rG   rI   r      s<    U^% r   )"loggingr   pathlibr   typingr   %ray.rllib.algorithms.algorithm_configr   ray.rllib.core.columnsr   ray.rllib.env.env_runnerr   %ray.rllib.env.single_agent_env_runnerr   "ray.rllib.env.single_agent_episoder   ray.rllib.utils.annotationsr	   r
   r   ray.rllib.utils.compressionr   "ray.rllib.utils.spaces.space_utilsr   ray.rllib.utils.typingr   ray.util.debugr   ray.util.annotationsr   Logger__file__rc   r   r   r   r   rI   <module>   s$    
