o
    ci5                     @   s  d Z ddlZddlZddlZddlmZmZ ddlmZ	 ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZmZ dd
lmZ eeZzddlZW n e yg   dZe!d Y nw eG dd dZ"G dd dej#Z$dd Z%dd Z&dS )zREST client to interact with a policy server.

This client supports both local and remote policy inference modes. Local
inference is faster but causes more compute to be done on the client.
    N)UnionOptional)ExternalEnv)ExternalMultiAgentEnv)MultiAgentEnv)MultiAgentBatch)OldAPIStack)MultiAgentDictEnvInfoDict
EnvObsTypeEnvActionType)RLlinkzMCouldn't import `requests` library. Be sure to install it on the client side.c                   @   s  e Zd ZdZ			d'dedededeej fd	d
Z		d(dee de
defddZdedeeef deeef fddZdedeeef deeef ddfddZ		d)dededeeef dee ddf
ddZdedeeef ddfddZd*ddZd d! Zd"d# Zd+d%d&ZdS ),PolicyClientz4REST client to interact with an RLlib policy server.local      $@Naddressinference_modeupdate_intervalsessionc                 C   sH   || _ || _d| _|dkrd| _| | dS |dkr d| _dS td)a  Create a PolicyClient instance.

        Args:
            address: Server to connect to (e.g., "localhost:9090").
            inference_mode: Whether to use 'local' or 'remote' policy
                inference for computing actions.
            update_interval (float or None): If using 'local' inference mode,
                the policy is refreshed after this many seconds have passed,
                or None for manual control via client.
            session (requests.Session or None): If available the session object
                is used to communicate with the policy server. Using a session
                can lead to speedups as connections are reused. It is the
                responsibility of the creator of the session to close it.
        Nr   TremoteFz1inference_mode must be either 'local' or 'remote')r   r   envr   _setup_local_rollout_worker
ValueError)selfr   r   r   r    r   O/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/env/policy_client.py__init__,   s   
zPolicyClient.__init__T
episode_idtraining_enabledreturnc                 C   s4   | j r|   | j||S | |tj|dd S )a}  Record the start of one or more episode(s).

        Args:
            episode_id (Optional[str]): Unique string id for the episode or
                None for it to be auto-assigned.
            training_enabled: Whether to use experiences for this
                episode to improve the policy.

        Returns:
            episode_id: Unique string id for the episode.
        )r   commandr   r   )r   _update_local_policyr   start_episode_sendCommandsSTART_EPISODE)r   r   r   r   r   r   r"   L   s   zPolicyClient.start_episodeobservationc                    sZ   j r!  t|ttfr fdd|D }|S j| S tj	 |dd S )a  Record an observation and get the on-policy action.

        Args:
            episode_id: Episode id returned from start_episode().
            observation: Current environment observation.

        Returns:
            action: Action from the env action space.
        c                    s    i | ]}|j | | qS r   )r   
get_action).0eidr&   r   r   r   
<dictcomp>w   s    z+PolicyClient.get_action.<locals>.<dictcomp>r    r&   r   action)
r   r!   
isinstancelisttupler   r'   r#   r$   
GET_ACTION)r   r   r&   actionsr   r*   r   r'   g   s    zPolicyClient.get_actionr-   c                 C   s8   | j r|   | j|||S | tj|||d dS )zRecord an observation and (off-policy) action taken.

        Args:
            episode_id: Episode id returned from start_episode().
            observation: Current environment observation.
            action: Action for the observation.
        )r    r&   r-   r   N)r   r!   r   
log_actionr#   r$   
LOG_ACTION)r   r   r&   r-   r   r   r   r3      s   zPolicyClient.log_actionrewardinfomultiagent_done_dictc                 C   sb   | j r#|   |durt|tsJ | j||||S | j|||S | tj||||d dS )a  Record returns from the environment.

        The reward will be attributed to the previous action taken by the
        episode. Rewards accumulate until the next action. If no reward is
        logged before the next action, a reward of 0.0 is assumed.

        Args:
            episode_id: Episode id returned from start_episode().
            reward: Reward from the environment.
            info: Extra info dict.
            multiagent_done_dict: Multi-agent done information.
        N)r    r5   r6   r   done)	r   r!   r.   dictr   log_returnsr#   r$   LOG_RETURNS)r   r   r5   r6   r7   r   r   r   r:      s    zPolicyClient.log_returnsc                 C   s4   | j r|   | j||S | tj||d dS )zRecord the end of an episode.

        Args:
            episode_id: Episode id returned from start_episode().
            observation: Current environment observation.
        r,   N)r   r!   r   end_episoder#   r$   END_EPISODE)r   r   r&   r   r   r   r<      s   
zPolicyClient.end_episodec                 C   s   | j dd dS )zGQuery the server for new policy weights, if local inference is enabled.T)forceN)r!   r   r   r   r   update_policy_weights   s   z"PolicyClient.update_policy_weightsc                 C   sn   t |}| jd u rtj| j|d}n	| jj| j|d}|jdkr+td	|j
| |  t |j}|S )N)data   zRequest failed {}: {})pickledumpsr   requestspostr   status_codeloggererrorformattextraise_for_statusloadscontent)r   rA   payloadresponseparsedr   r   r   r#      s   


zPolicyClient._sendc                 C   sL   || _ d| _td | dtjid }t|| j\| _| _	| jj
| _
d S )Nr   z,Querying server for rollout worker settings.r    worker_args)r   last_updatedrH   r6   r#   r$   GET_WORKER_ARGS_create_embedded_rollout_workerrollout_workerinference_threadr   )r   r   kwargsr   r   r   r      s   
z(PolicyClient._setup_local_rollout_workerFc                 C   s   | j  sJ | jrt | j | jks|rAtd | dtj	i}|d }|d }td
| | j|| t | _d S d S )Nz'Querying server for new policy weights.r    weightsglobal_varsz3Updating rollout worker weights and global vars {}.)rW   is_aliver   timerS   rH   r6   r#   r$   GET_WEIGHTSrJ   rV   set_weights)r   r>   resprY   rZ   r   r   r   r!      s*   
z!PolicyClient._update_local_policy)r   r   NNT)NN)r   N)F)__name__
__module____qualname____doc__strfloatr   rE   Sessionr   boolr"   r   r   r	   r   r'   r3   r
   r:   r<   r@   r#   r   r!   r   r   r   r   r   (   s|    
!



 




'


r   c                       s(   e Zd ZdZ fddZdd Z  ZS )_LocalInferenceThreadzAThread that handles experience generation (worker.sample() loop).c                    s    t    d| _|| _|| _d S r`   )superr   daemonrV   send_fn)r   rV   rl   	__class__r   r   r     s   

z_LocalInferenceThread.__init__c              
   C   s   z7	 t d | j }| j }t|tr$t d| |	  n	t d|j
 | tj||d q tyP } zt d| W Y d }~d S d }~ww )NTz$Generating new batch of experiences.z9Sending batch of {} env steps ({} agent steps) to server.z)Sending batch of {} steps back to server.)r    samplesmetricsz$Error: inference worker thread died!)rH   r6   rV   sampleget_metricsr.   r   rJ   	env_stepsagent_stepscountrl   r$   REPORT_SAMPLES	ExceptionrI   )r   ro   rp   er   r   r   run   s4   



z_LocalInferenceThread.run)ra   rb   rc   rd   r   ry   __classcell__r   r   rm   r   ri     s    ri   c                    s    fdd}|S )zWrap an environment in the ExternalEnv interface if needed.

    Args:
        real_env_creator: Create an env given the env_config.
    c                    sP    | }t |ttfs&td t |trt}nt}G dd d|}||S |S )NzzThe env you specified is not a supported (sub-)type of ExternalEnv. Attempting to convert it automatically to ExternalEnv.c                       s$   e Zd Z fddZdd Z  ZS )zI_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapperc                    s   t  j|j|jd d S )N)observation_spaceaction_space)rj   r   r{   r|   )r   real_envrm   r   r   r   R  s   
zR_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.__init__c                 S   s   t d d S )Ni?B )r\   sleepr?   r   r   r   ry   X  s   zM_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.run)ra   rb   rc   r   ry   rz   r   r   rm   r   _ExternalEnvWrapperQ  s    r   )r.   r   r   rH   r6   r   )
env_configr}   external_clsr   real_env_creatorr   r   wrapped_creatorC  s   
z,_auto_wrap_external.<locals>.wrapped_creatorr   )r   r   r   r   r   _auto_wrap_external<  s   r   c                    s   |   } | d j dd| d< | d }d|_d|_i |_|jdu r@ddlm m |j|j	d|j
t fd	d
| d< n
| d }t|| d< td|  ddlm} |di | }t||}|  ||fS )zCreate a local rollout worker and a thread that samples from it.

    Args:
        kwargs: Args for the RolloutWorker constructor.
        send_fn: Function to send a JSON request to the server.
    configF)copy_frozenNsamplerr   )	RandomEnvRandomMultiAgentEnv)r|   r{   c                    s   rS  S )Nr   )_r   r   r   is_mar   r   <lambda>  s    z1_create_embedded_rollout_worker.<locals>.<lambda>env_creatorz&Creating rollout worker with kwargs={})RolloutWorkerr   )copyoutputinput_input_configr   *ray.rllib.examples.envs.classes.random_envr   r   r|   r{   is_multi_agentr   rH   r6   rJ   #ray.rllib.evaluation.rollout_workerr   ri   start)rX   rl   r   r   r   rV   rW   r   r   r   rU   c  s.   



rU   )'rd   logging	threadingr\   typingr   r   ray.cloudpicklecloudpicklerC   ray.rllib.env.external_envr   &ray.rllib.env.external_multi_agent_envr   ray.rllib.env.multi_agent_envr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   ray.rllib.utils.typingr	   r
   r   r   )ray.rllib.env.utils.external_env_protocolr   r$   	getLoggerra   rH   rE   ImportErrorwarningr   Threadri   r   rU   r   r   r   r   <module>   s8    
 o%'