o
    ci4                     @   s   d dl mZ d dlmZmZ d dlZd dlZd dlmZ d dl	Z	d dl
Z
d dlZd dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z  d dl!m"Z" e#e$Z%eG dd deeeZ&dd Z'dS )    )deque)
HTTPServerSimpleHTTPRequestHandlerN)ThreadingMixIn)List)_create_embedded_rollout_workerCommands)InputReader)	IOContext)SampleBatch)override	PublicAPI)RolloutMetrics)SamplerInput)SampleBatchTypec                   @   sP   e Zd ZdZe		ddededededef
d	d
Z	e
edd Zdd ZdS )PolicyServerInputat  REST policy server that acts as an offline data source.

    This launches a multi-threaded server that listens on the specified host
    and port to serve policy requests and forward experiences to RLlib. For
    high performance experience collection, it implements InputReader.

    For an example, run `examples/envs/external_envs/cartpole_server.py` along
    with `examples/envs/external_envs/cartpole_client.py --inference-mode=local|remote`.

    WARNING: This class is not meant to be publicly exposed. Anyone that can
    communicate with this server can execute arbitary code on the machine. Use
    this with caution, in isolated environments, and at your own risk.

    .. testcode::
        :skipif: True

        import gymnasium as gym
        from ray.rllib.algorithms.ppo import PPOConfig
        from ray.rllib.env.policy_client import PolicyClient
        from ray.rllib.env.policy_server_input import PolicyServerInput
        addr, port = ...
        config = (
            PPOConfig()
            .api_stack(
                enable_rl_module_and_learner=False,
                enable_env_runner_and_connector_v2=False,
            )
            .environment("CartPole-v1")
            .offline_data(
                input_=lambda ioctx: PolicyServerInput(ioctx, addr, port)
            )
            # Run just 1 server (in the Algorithm's EnvRunnerGroup).
            .env_runners(num_env_runners=0)
        )
        algo = config.build()
        while True:
            algo.train()
        client = PolicyClient(
            "localhost:9900", inference_mode="local")
        eps_id = client.start_episode()
        env = gym.make("CartPole-v1")
        obs, info = env.reset()
        action = client.get_action(eps_id, obs)
        _, reward, _, _, _ = env.step(action)
        client.log_returns(eps_id, reward)
        client.log_returns(eps_id, reward)
        algo.stop()
          @   ioctxaddressportidle_timeoutmax_sample_queue_sizec              	      s4  |j  _t|d _t  _| _ jjdur$ fdd}| jj_	nG dd dt
}| j j_t j j j}zddl}	|	d t ||f| W n tyl   td	| d
| d ddl}	|	d  w td j d
 j  tjd jd}
d|
_|
  tjd jd}d|_|  dS )a  Create a PolicyServerInput.

        This class implements rllib.offline.InputReader, and can be used with
        any Algorithm by configuring

        [AlgorithmConfig object]
        .env_runners(num_env_runners=0)
        .offline_data(input_=lambda ioctx: PolicyServerInput(ioctx, addr, port))

        Note that by setting num_env_runners: 0, the algorithm will only create one
        rollout worker / PolicyServerInput. Clients can connect to the launched
        server using rllib.env.PolicyClient. You can increase the number of available
        connections (ports) by setting num_env_runners to a larger number. The ports
        used will then be `port` + the worker's index.

        Args:
            ioctx: IOContext provided by RLlib.
            address: Server addr (e.g., "localhost").
            port: Server port (e.g., 9900).
            max_queue_size: The maximum size for the sample queue. Once full, will
                purge (throw away) 50% of all samples, oldest first, and continue.
        )maxlenNc                     s4   g } 	 z
|   j  W n tjy   Y | S w qNappendmetrics_queue
get_nowaitqueueEmpty)	completedself U/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/env/policy_server_input.pyget_metrics|   s   z/PolicyServerInput.__init__.<locals>.get_metricsc                   @   sJ   e Zd ZdZdd ZdefddZdee fddZdee	 fd	d
Z
dS )z7PolicyServerInput.__init__.<locals>.MetricsDummySamplerz8This sampler only maintains a queue to get metrics from.c                 S   s
   || _ dS )zInitializes a MetricsDummySampler instance.

                    Args:
                        metrics_queue: A queue of metrics
                    N)r   )r#   r   r$   r$   r%   __init__   s   
z@PolicyServerInput.__init__.<locals>.MetricsDummySampler.__init__returnc                 S      t r   NotImplementedErrorr"   r$   r$   r%   get_data      z@PolicyServerInput.__init__.<locals>.MetricsDummySampler.get_datac                 S   r)   r   r*   r"   r$   r$   r%   get_extra_batches   r-   zIPolicyServerInput.__init__.<locals>.MetricsDummySampler.get_extra_batchesc                 S   s4   g }	 z
| | j  W n tjy   Y |S w q)z;Returns metrics computed on a policy client rollout worker.r   )r#   r!   r$   r$   r%   r&      s   zCPolicyServerInput.__init__.<locals>.MetricsDummySampler.get_metricsN)__name__
__module____qualname____doc__r'   r   r,   r   r.   r   r&   r$   r$   r$   r%   MetricsDummySampler   s    r3   r      zCreating a PolicyServer on :z failed!zStarting connector server at server)nametargetTz
heart-beat)workerrollout_workerr   samples_queuer   Queuer   r   samplerr&   r   _make_handlertimesleepr   r'   OSErrorprintloggerinfoserver_nameserver_port	threadingThreadserve_foreverdaemonstart#_put_empty_sample_batch_every_n_sec)r#   r   r   r   r   r   r&   r3   handlerr?   serving_threadheart_beat_threadr$   r"   r%   r'   N   sB    



zPolicyServerInput.__init__c                 C   s0   t | jdkrtd t | jdks| j S )Nr   g?)lenr;   r?   r@   popr"   r$   r$   r%   next   s   

zPolicyServerInput.nextc                 C   s   	 t | j | jt  qr   )r?   r@   r   r;   r   r   r"   r$   r$   r%   rL      s   z5PolicyServerInput._put_empty_sample_batch_every_n_secN)r   r   )r/   r0   r1   r2   r   r
   strintfloatr'   r   r	   rR   rL   r$   r$   r$   r%   r      s&    1}
r   c                    sV   d  d t   fdd fddG  fdddt}|S )Nc                      sb   %  d u rt  \    W d    d S W d    d S 1 s*w   Y  d S r   )r   creation_argsset_weightsget_weightsr$   )child_rollout_workerinference_threadlockreport_datar:   r$   r%   setup_child_rollout_worker   s   "z1_make_handler.<locals>.setup_child_rollout_workerc                    s   | d }|   | tjkr'td tjd D ]}  q | d D ]}| q+ d urC 	
   d S d S )NsampleszFPolicyServerInput queue is full! Purging half of the samples (oldest).   metrics)decompress_if_neededr   rP   r   rC   warningrangepopleftputrW   rX   get_global_vars)databatch_rollout_metric)rY   r   r:   r;   r$   r%   r\      s    

z"_make_handler.<locals>.report_datac                       s8   e Zd Z fddZdd ZfddZ  ZS )z_make_handler.<locals>.Handlerc                    s   t  j|i | d S r   )superr'   )r#   akw	__class__r$   r%   r'     s   z'_make_handler.<locals>.Handler.__init__c                 S   s   t | jdd}| j|}t|}z| |}| d | 	  | j
t| W d S  ty@   | dt  Y d S w )NzContent-Lengthr      i  )rT   headersgetrfilereadpickleloadsexecute_commandsend_responseend_headerswfilewritedumps	Exception
send_error	traceback
format_exc)r#   content_lenraw_bodyparsed_inputresponser$   r$   r%   do_POST  s   


z&_make_handler.<locals>.Handler.do_POSTc                    s  |d }i }|t jkrtd  |d< |S |t jkr0td  |d<  |d< |S |t jkrFtd	|d j
 | |S |t jkrc   sTJ  j|d	 |d
 |d	< |S |t jkr} snJ  j|d	 |d |d< |S |t jkr sJ  j|d	 |d |d  |S |t jkrʈ sJ |d r j|d	 |d |d |d  |S  j|d	 |d |d  |S |t jkr sJ  j|d	 |d  |S td	|)Ncommandz'Sending worker creation args to client.worker_argsz!Sending worker weights to client.weightsglobal_varsz(Got sample batch of size {} from client.r^   
episode_idtraining_enabledobservationactiondonerewardrD   zUnknown command: {})r   GET_WORKER_ARGSrC   rD   rV   GET_WEIGHTSrX   rf   REPORT_SAMPLESformatcountSTART_EPISODEis_aliveenvstart_episode
GET_ACTION
get_action
LOG_ACTION
log_actionLOG_RETURNSlog_returnsEND_EPISODEend_episode
ValueError)r#   argsr   r   rY   rZ   r\   r:   r]   r$   r%   rw     sp   

/

+
$





z._make_handler.<locals>.Handler.execute_command)r/   r0   r1   r'   r   rw   __classcell__r$   r   rn   r%   Handler  s    r   )rG   Lockr   )r:   r;   r   r   r$   )rY   rZ   r[   r   r\   r:   r;   r]   r%   r>      s   Hr>   )(collectionsr   http.serverr   r   loggingr   socketserverr   rG   r?   r   typingr   ray.cloudpicklecloudpickleru   ray.rllib.env.policy_clientr   r   ray.rllib.offline.input_readerr	   ray.rllib.offline.io_contextr
   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   r   ray.rllib.evaluation.metricsr   ray.rllib.evaluation.samplerr   ray.rllib.utils.typingr   	getLoggerr/   rC   r   r>   r$   r$   r$   r%   <module>   s.    
 E