o
    $iA                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZmZmZ d dlmZ d dlZd dlZzd dlmZ W n eyS   dZY nw d dlmZ d dlmZ d dlmZ d d	lmZmZmZm Z m!Z! d d
l"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z,m-Z-m.Z. erd dl/m0Z0 e1e2Z3dd e4e5de5dd D Z6de7dede7fddZ8e#de7de7fddZ9e#de.dede.fddZ:e#de,ded  fd!d"Z;e$G d#d$ d$eZ<dS )%    N)Path)TYPE_CHECKINGListOptionalUnion)urlparse)
smart_open)InputReader)	IOContext)Policy)DEFAULT_POLICY_IDMultiAgentBatchSampleBatchconcat_samples convert_ma_batch_to_sample_batch)DeveloperAPI	PublicAPIoverride)unpack_if_needed)clip_actionnormalize_action)AnyFileTypeSampleBatchType)RolloutWorkerc                 C   s   g | ]}t |qS  )chr.0ir   r   Z/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/rllib/offline/json_reader.py
<listcomp>)   s    r!   cz   	json_datapolicyreturnc                 C   s   |   D ]f\}}||jv r|j| jnd}|jdr@|tjks/|tjks/|tjks/|tjkr@tj	|j
dd | | dd| |< q|jdrj|tjksZ|tjksZ|tjksZ|tjkrjtj	|jdd | | dd| |< q| S )	a@  Handle nested action/observation spaces for policies.

    Translates nested lists/dicts from the json into proper
    np.ndarrays, according to the (nested) observation- and action-
    spaces of the given policy.

    Providing nested lists w/o this preprocessing step would
    confuse a SampleBatch constructor.
     _disable_action_flatteningc                 S   
   t | S Nnparraycompr   r   r    <lambda>E      
 z0_adjust_obs_actions_for_policy.<locals>.<lambda>F)check_types_disable_preprocessor_apic                 S   r*   r+   r,   r/   r   r   r    r1   R   r2   )itemsview_requirementsdata_colconfiggetr   ACTIONSPREV_ACTIONStreemap_structure_up_toaction_space_structOBSNEXT_OBSobservation_space_struct)r%   r&   kvr7   r   r   r    _adjust_obs_actions_for_policy,   s:   










rD   c                 C   s8   i }|   D ]\}}|tjkr||tj< q|||< q|S )zEMake sure DONES in json data is properly translated into TERMINATEDS.)r5   r   DONESTERMINATEDS)r%   new_json_datarB   rC   r   r   r    _adjust_donesY   s   

rH   batchioctxc                 C   s  |j }|dr\|jd u rtdt| trB|jjt}|d u r5t|jjdks+J t	t
|jj }t| tj |j| tj< n| j D ]\}}t|tj |jj| j|tj< qG|ddu r|ddu r|jd u rstdd	}t| tr|jjt}|d u rt|jjdksJ t	t
|jj }t|jttfr|j d
st|t| tj |j| tj< | S | j D ],\}}|jj| }t|jttfr|j d
st|t|tj |jj| j|tj< q| S )Nclip_actionszCclip_actions is True but cannot clip actions since no workers existr$   actions_in_input_normalizedFnormalize_actionsTzWactions_in_input_normalized is False butcannot normalize actions since no workers exista  Normalization of offline actions that are flattened is not supported! Make sure that you record actions into offline file with the `_disable_action_flattening=True` flag OR as already normalized (between -1.0 and 1.0) values. Also, when reading already normalized action values from offline files, make sure to set `actions_in_input_normalized=True` so that RLlib will not perform normalization on top.r)   )r8   r9   worker
ValueError
isinstancer   
policy_mapr   lennextitervaluesr   r:   r>   policy_batchesr5   tupledictr   )rI   rJ   cfgr&   pidb	error_msgr   r   r    postprocess_actionsh   sn   









r]   rN   r   c           
      C   s0  d| v r
|  d}ntd|dkrJ|d ur!t|jdkr!td|  D ]
\}}t|| |< q%|d urBtt|j }t	| |} t
| } t| S |dkri }| d  D ]5\}}i }	| D ]\}}|tjkrltj}t||	|< q`|d ur|j| }t	|	|}	t
|	}	t|	||< qVt|| d S td	|)
Ntypez JSON record missing 'type' fieldr   r$   z\Found single-agent SampleBatch in input file, but our PolicyMap contains more than 1 policy!r   rV   countz<Type field must be one of ['SampleBatch', 'MultiAgentBatch'])poprO   rR   rQ   r5   r   rS   rT   rU   rD   rH   r   rE   rF   r   )
r%   rN   	data_typerB   rC   r&   rV   	policy_idpolicy_batchinnerr   r   r    from_json_data   s@   



re   c                   @   s   e Zd ZdZe	ddeeee f dee	 fddZ
eedefdd	Zdefd
dZdedefddZdd Zdedee fddZdefddZdefddZdedefddZdS )
JsonReaderzxReader object that loads experiences from JSON file chunks.

    The input files will be read from in random order.
    NinputsrJ   c                 C   s  t d |p	t | _d | _| _d| _| jr6| jjdd| _| jjdd}|r6t	t
| j| d| _| jjdura| jjj| _| jt| _| jdu rat| jdksWJ tt| j | _t|trtjtj|}tj|rtj|dtj|dg}t d	|  n|g}td
d |D rtd|d g | _|D ]}| jt  | qnt|t!t"frt!|| _ntd|| jrt dt| j ntd|d| _#dS )aB  Initializes a JsonReader instance.

        Args:
            inputs: Either a glob expression for files, e.g. `/tmp/**/*.json`,
                or a list of single file paths or URIs, e.g.,
                ["s3://bucket/file.json", "s3://bucket/file2.json"].
            ioctx: Current IO context object or None.
        zeYou are using JSONReader. It is recommended to use DatasetReader instead for better sharding support.Nr$   train_batch_sizenum_env_runnersr   z*.jsonz*.zipz+Treating input directory as glob patterns: c                 s   s$    | ]}t |jd gt vV  qdS )r(   N)r   schemeWINDOWS_DRIVESr   r   r   r    	<genexpr>  s   " z&JsonReader.__init__.<locals>.<genexpr>z"Don't know how to glob over `{}`, z/please specify a list of files to read instead.z*type of inputs must be list or str, not {}zFound {} input files.zNo files found matching {})$loggerinfor
   rJ   default_policyrQ   
batch_sizer8   r9   maxmathceilrN   r   rR   rS   rT   rU   rP   strospathabspath
expanduserisdirjoinwarninganyrO   formatfilesextendgloblistrW   cur_file)selfrg   rJ   num_workersr   r   r   r    __init__   sT   


zJsonReader.__init__r'   c                 C   s   g }d}|| j k rP| |  }d}|s2|dk r2|d7 }td| j | |  }|s2|dk s|s<td| j| |}||j	7 }|
| || j k s	t|}|S )Nr   d   r$   zSkipping empty line in {}z3Failed to read valid experience batch from file: {})rp   
_try_parse
_next_linerm   debugr}   r   rO   _postprocess_if_neededr_   appendr   )r   retr_   rI   triesr   r   r    rS   '  s,   




zJsonReader.nextc                 c   sH    | j D ]}| |}	 | }|sn| |}|du rn|V  qqdS )a  Reads through all files and yields one SampleBatchType per line.

        When reaching the end of the last file, will start from the beginning
        again.

        Yields:
            One SampleBatch or MultiAgentBatch per line in all input files.
        TN)r~   _try_open_filereadliner   )r   rv   filelinerI   r   r   r    read_all_files>  s   
	

zJsonReader.read_all_filesrI   c                 C   sX   | j jds	|S t|}t|tr(g }| D ]}|| j	| qt
|S td)Npostprocess_inputsz7Postprocessing of multi-agent data not implemented yet.)rJ   r8   r9   r   rP   r   split_by_episoder   ro   postprocess_trajectoryr   NotImplementedError)r   rI   out	sub_batchr   r   r    r   R  s   
z!JsonReader._postprocess_if_neededc                 C   s  t |jdgt vrtd u rtd|t}nk|dr-tj	tj
dd|dd  }|}tj|s@tj	ttjj|}tj|sNtd| dtd|rt|d	}|t|j W d    n1 smw   Y  tdd
|}tj|sJ t}||d	}|S )Nr(   zBYou must install the `smart_open` module to read from URIs like {}z~/HOME   zOffline file z not found!z\.zip$rz.json)r   rj   rk   r   rO   r}   
startswithru   rv   rz   environr9   existsr   __file__parentFileNotFoundErrorresearchzipfileZipFile
extractallsubopen)r   rv   ctx	path_origzip_refr   r   r   r    r   d  s.   
 
zJsonReader._try_open_filer   c              	   C   sX   |  }|sd S z| |}W n ty#   td| j| Y d S w t|| j}|S )Nz&Ignoring corrupt json record in {}: {})	strip
_from_json	Exceptionrm   	exceptionr}   r   r]   rJ   )r   r   rI   r   r   r    r     s   zJsonReader._try_parsec                 C   s   | j s|  | _ | j  }d}|s?|dk r?|d7 }t| j dr$| j   |  | _ | j  }|s9td| j  |s?|dk s|sItd| j	|S )Nr   r   r$   closezIgnoring empty file {}z'Failed to read next line from files: {})
r   
_next_filer   hasattrr   rm   r   r}   rO   r~   )r   r   r   r   r   r    r     s$   





zJsonReader._next_linec                 C   sf   | j d u r(| jjd ur(| jjj}| jjjpd}| jtt| jd ||   }nt	| j}| 
|S )Nr$   )r   rJ   rN   worker_indexr   r~   roundrR   randomchoicer   )r   idxtotalrv   r   r   r    r     s   
"
zJsonReader._next_filedatac                 C   s,   t |tr
|d}t|}t|| jjS )Nzutf-8)rP   bytesdecodejsonloadsre   rJ   rN   )r   r   r%   r   r   r    r     s   


zJsonReader._from_jsonr+   )__name__
__module____qualname____doc__r   r   rt   r   r   r
   r   r   r	   r   rS   r   r   r   r   r   r   r   r   r   r   r   r    rf      s$    =rf   )=r   r   loggingrr   ru   r   r   r   pathlibr   typingr   r   r   r   urllib.parser   numpyr-   r<   r   ImportErrorray.rllib.offline.input_readerr	   ray.rllib.offline.io_contextr
   ray.rllib.policy.policyr   ray.rllib.policy.sample_batchr   r   r   r   r   ray.rllib.utils.annotationsr   r   r   ray.rllib.utils.compressionr   "ray.rllib.utils.spaces.space_utilsr   r   ray.rllib.utils.typingr   r   r   ray.rllib.evaluationr   	getLoggerr   rm   rangeordrk   rX   rD   rH   r]   re   rf   r   r   r   r    <module>   sN    
 -M+