o
    `۷i)                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dl	m
Z d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZmZ e eZzd dlZW n eye   dZe d
 Y nw eG dd dZ!eG dd dej"Z#edd Z$edd Z%dS )    N)OptionalUnion)RLlink)ExternalEnv)ExternalMultiAgentEnv)MultiAgentEnv)MultiAgentBatch)OldAPIStack)EnvActionTypeEnvInfoDict
EnvObsTypeMultiAgentDictzMCouldn't import `requests` library. Be sure to install it on the client side.c                   @   s  e Zd ZdZ			d'dedededeej fd	d
Z		d(dee de
defddZdedeeef deeef fddZdedeeef deeef ddfddZ		d)dededeeef dee ddf
ddZdedeeef ddfddZd*ddZd d! Zd"d# Zd+d%d&ZdS ),PolicyClientz4REST client to interact with an RLlib policy server.local      $@Naddressinference_modeupdate_intervalsessionc                 C   sH   || _ || _d | _|dkrd| _| | d S |dkr d| _d S td)Nr   TremoteFz1inference_mode must be either 'local' or 'remote')r   r   envr   _setup_local_rollout_worker
ValueError)selfr   r   r   r    r   Q/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/env/policy_client.py__init__&   s   
zPolicyClient.__init__T
episode_idtraining_enabledreturnc                 C   s4   | j r|   | j||S | |tj|dd S )N)r   commandr   r   )r   _update_local_policyr   start_episode_sendCommandsSTART_EPISODE)r   r   r   r   r   r   r"   8   s   zPolicyClient.start_episodeobservationc                    sZ   j r!  t|ttfr fdd|D }|S j| S tj	 |dd S )Nc                    s    i | ]}|j | | qS r   )r   
get_action).0eidr&   r   r   r   
<dictcomp>M   s    z+PolicyClient.get_action.<locals>.<dictcomp>r    r&   r   action)
r   r!   
isinstancelisttupler   r'   r#   r$   
GET_ACTION)r   r   r&   actionsr   r*   r   r'   G   s    zPolicyClient.get_actionr-   c                 C   s8   | j r|   | j|||S | tj|||d d S )N)r    r&   r-   r   )r   r!   r   
log_actionr#   r$   
LOG_ACTION)r   r   r&   r-   r   r   r   r3   ]   s   zPolicyClient.log_actionrewardinfomultiagent_done_dictc                 C   sb   | j r#|   |d urt|tsJ | j||||S | j|||S | tj||||d d S )N)r    r5   r6   r   done)	r   r!   r.   dictr   log_returnsr#   r$   LOG_RETURNS)r   r   r5   r6   r7   r   r   r   r:   p   s    zPolicyClient.log_returnsc                 C   s4   | j r|   | j||S | tj||d d S )Nr,   )r   r!   r   end_episoder#   r$   END_EPISODE)r   r   r&   r   r   r   r<      s   zPolicyClient.end_episodec                 C   s   | j dd dS )zGQuery the server for new policy weights, if local inference is enabled.T)forceN)r!   r   r   r   r   update_policy_weights   s   z"PolicyClient.update_policy_weightsc                 C   sn   t |}| jd u rtj| j|d}n	| jj| j|d}|jdkr+td	|j
| |  t |j}|S )N)data   zRequest failed {}: {})pickledumpsr   requestspostr   status_codeloggererrorformattextraise_for_statusloadscontent)r   rA   payloadresponseparsedr   r   r   r#      s   


zPolicyClient._sendc                 C   sL   || _ d| _td | dtjid }t|| j\| _| _	| jj
| _
d S )Nr   z,Querying server for rollout worker settings.r    worker_args)r   last_updatedrH   r6   r#   r$   GET_WORKER_ARGS_create_embedded_rollout_workerrollout_workerinference_threadr   )r   r   kwargsr   r   r   r      s   
z(PolicyClient._setup_local_rollout_workerFc                 C   s   | j  sJ | jrt | j | jks|rAtd | dtj	i}|d }|d }td
| | j|| t | _d S d S )Nz'Querying server for new policy weights.r    weightsglobal_varsz3Updating rollout worker weights and global vars {}.)rW   is_aliver   timerS   rH   r6   r#   r$   GET_WEIGHTSrJ   rV   set_weights)r   r>   resprY   rZ   r   r   r   r!      s*   
z!PolicyClient._update_local_policy)r   r   NNT)NN)r   N)F)__name__
__module____qualname____doc__strfloatr   rE   Sessionr   boolr"   r   r   r   r
   r'   r3   r   r:   r<   r@   r#   r   r!   r   r   r   r   r   "   s|    












r   c                       $   e Zd Z fddZdd Z  ZS )_LocalInferenceThreadc                    s    t    d| _|| _|| _d S r`   )superr   daemonrV   send_fn)r   rV   rm   	__class__r   r   r      s   

z_LocalInferenceThread.__init__c              
   C   s   z7	 t d | j }| j }t|tr$t d| |	  n	t d|j
 | tj||d q tyP } zt d| W Y d }~d S d }~ww )NTz$Generating new batch of experiences.z9Sending batch of {} env steps ({} agent steps) to server.z)Sending batch of {} steps back to server.)r    samplesmetricsz$Error: inference worker thread died!)rH   r6   rV   sampleget_metricsr.   r   rJ   	env_stepsagent_stepscountrm   r$   REPORT_SAMPLES	ExceptionrI   )r   rp   rq   er   r   r   run   s4   



z_LocalInferenceThread.runra   rb   rc   r   rz   __classcell__r   r   rn   r   rj      s    rj   c                    s    fdd}|S )Nc                    sP    | }t |ttfs&td t |trt}nt}G dd d|}||S |S )NzzThe env you specified is not a supported (sub-)type of ExternalEnv. Attempting to convert it automatically to ExternalEnv.c                       ri   )zI_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapperc                    s   t  j|j|jd d S )N)observation_spaceaction_space)rk   r   r}   r~   )r   real_envrn   r   r   r     s   
zR_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.__init__c                 S   s   t d d S )Ni?B )r\   sleepr?   r   r   r   rz     s   zM_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.runr{   r   r   rn   r   _ExternalEnvWrapper  s    r   )r.   r   r   rH   r6   r   )
env_configr   external_clsr   real_env_creatorr   r   wrapped_creator   s   
z,_auto_wrap_external.<locals>.wrapped_creatorr   )r   r   r   r   r   _auto_wrap_external   s   r   c                    s   |   } | d j dd| d< | d }d |_d|_i |_|jd u r@ddlm m |j|j	d|j
t fdd	| d
< n
| d
 }t|| d
< td|  ddlm} |di | }t||}|  ||fS )NconfigF)copy_frozensamplerr   )	RandomEnvRandomMultiAgentEnv)r~   r}   c                    s   rS  S )Nr   )_r   r   r   is_mar   r   <lambda>1  s    z1_create_embedded_rollout_worker.<locals>.<lambda>env_creatorz&Creating rollout worker with kwargs={})RolloutWorkerr   )copyoutputinput_input_configr   *ray.rllib.examples.envs.classes.random_envr   r   r~   r}   is_multi_agentr   rH   r6   rJ   #ray.rllib.evaluation.rollout_workerr   rj   start)rX   rm   r   r   r   rV   rW   r   r   r   rU     s.   


rU   )&logging	threadingr\   typingr   r   ray.cloudpicklecloudpicklerC   ray.rllib.env.external.rllinkr   r$   ray.rllib.env.external_envr   &ray.rllib.env.external_multi_agent_envr   ray.rllib.env.multi_agent_envr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr	   ray.rllib.utils.typingr
   r   r   r   	getLoggerra   rH   rE   ImportErrorwarningr   Threadrj   r   rU   r   r   r   r   <module>   s<    
 /#
!