o
    `۷iuv                     @   s   d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lm Z  d dl!m"Z" erhd dl#m$Z$ e%e&Z'e
dZ(e"G dd de j)dZ*dS )    N)	TYPE_CHECKINGAnyCallableDictListOptionalTupleTypeVarUnion)ActorHandle)RayActorError)!COMPONENT_ENV_TO_MODULE_CONNECTORCOMPONENT_LEARNER!COMPONENT_MODULE_TO_ENV_CONNECTORCOMPONENT_RL_MODULE)LearnerGroup)FaultTolerantActorManager)NUM_ENV_STEPS_SAMPLED_LIFETIMEWEIGHTS_SEQ_NO)Runner)PolicyID)DeveloperAPI)AlgorithmConfigTc                   @   s  e Zd Z					dbdddee dee d	ee d
ededeeef ddfddZ	dddddded dedee dee deeef ddfddZ
dcdededdfddZdefddZdddedededddef
ddZdddddddddddee d ee d!eeeeef   d"eeeef  d#eee  fd$d%Z				&	ddd'eee  d(eeed)f  d*eee  d+ee d,ee ddfd-d.Zd/ee ddfd0d1Zded2d3Zdddddddd4d5eeegef eeegef  eee f ded6ed7ee d+ee d8ed9edee fd:d;Zddd<d5eeegef eeegef  eee f d6ed7ee defd=d>Zd&ddd?d+ee d8ed9edeeeef  fd@dAZdee fdBdCZe e!j"dDdE Z#e e!j"defdFdGZ$e dfdHdIZ%e defdJdKZ&e dee fdLdMZ'e e!j"defdNdOZ(e defdPdQZ)e defdRdSZ*e defdTdUZ+e defdVdWZ,e defdXdYZ-e e!j"dZd[ Z.e e!j"d\d] Z/e e!j"d^d_ Z0e e!j"d`da Z1dS )gRunnerGroupFNr   Tconfigr   local_runnerlogdirtune_trial_id	pg_offset_setupkwargsreturnc           	   
   K   s   t |tr
t|n|pt | _|| _t| j| _|| _	|| _
|| _t| jdd| _|rWz| jd|| j|d| W d S  tyV } z|jrP|jd jd |d }~ww d S )N   )'max_remote_requests_in_flight_per_actorinit_id)r   num_runnersr   r       )
isinstancedictr   	from_dictr   _remote_configrayput_remote_config_obj_ref_tune_trial_id
_pg_offset_logdirr   "_max_requests_in_flight_per_runner_worker_managerr    r&   r   actor_init_failedargs)	selfr   r   r   r   r   r    r!   er(   r(   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/utils/runners/runner_group.py__init__*   s:   	zRunnerGroup.__init__)r   r&   r   validater&   r;   c                K   sd   d | _ |dkr	d}|| _| j|fd|d ur|n| ji| |r0| jdd|| jd|| _ d S d S )Nr   Tr;   runner_indexr&   r   r(   )_local_runner_RunnerGroup__local_configadd_runners$_validate_runners_after_construction_make_runner_local_config)r7   r   r&   r   r;   r!   r(   r(   r9   r    j   s,   zRunnerGroup._setupc                    sF   j   fddtD }j | |r!  dS dS )z?Creates and adds a number of remote runners to this runner set.c                    s2   g | ]}j d| d   jd qS )r#   r<   r(   )rB   r/   ).0ir!   r&   old_num_runnersr7   r(   r9   
<listcomp>   s    

z+RunnerGroup.add_runners.<locals>.<listcomp>N)r4   
num_actorsrange
add_actorsr;   )r7   r&   r;   r!   new_runnersr(   rF   r9   r@      s   

zRunnerGroup.add_runnersc                 C   sR   | j dd D ]}|js&| }| jr$td| jj dt	|  q|qd S )Nc                 S      |   S N)assert_healthywr(   r(   r9   <lambda>       z&RunnerGroup.validate.<locals>.<lambda>zValidation of z failed! Error=)
r4   foreach_actorokget_ignore_ray_errors_on_runnersloggererror
runner_cls__name__str)r7   resultr8   r(   r(   r9   r;      s   zRunnerGroup.validate)recreated_runnerr=   r^   c             	   K   s   t d||||| j| jd|}|dkr| jdi |S tj d u r%dn| j| }tjdi | j	| jj
|djdi |S )N)r   worker_indexnum_workersrecreated_workerlog_dirr   r   )placement_group_bundle_indexr(   )r*   r2   r0   rZ   r-   utilget_current_placement_groupr1   remote_remote_argsoptions)r7   r=   r&   r^   r   r!   pg_bundle_idxr(   r(   r9   rB      s0   zRunnerGroup._make_runner)from_runnerenv_steps_sampledconnector_statesrl_module_staterunner_indices_to_updateenv_to_modulemodule_to_envrk   rl   rm   rn   ro   c                K   s(  |p| j }|jp|jdko|j}
|j}| jdkr/| j r/| j i |dur't|ini |p,i  |
s5|s5dS |
r|g kr>i }nt|du rM| jdd d|jd}dd	 |D }d
d	 |D }| j durt	| j drt	| j dr|du srJ | j j
}|du s|J | j j}i }|r|t||i |r|t||i n|du rt| t| i}n|jttgd}|dur||jpd |t< |s| j dur| j | n||ti  ||ti  |td |td |r| j dur|r| j | |r|| | jdt|d|ddd dS dS )z>Synchronizes the connectors of this `RunnerGroup`'s `Runner`s.training_onlyr   Nc                 S   s   | j ttgdS )N
components)	get_stater   r   rP   r(   r(   r9   rR     s
    z0RunnerGroup.sync_runner_states.<locals>.<lambda>F)r   timeout_secondsc                 S      g | ]
}t |v r|t  qS r(   )r   rD   sr(   r(   r9   rH     
    z2RunnerGroup.sync_runner_states.<locals>.<listcomp>c                 S   rw   r(   )r   rx   r(   r(   r9   rH     rz   _env_to_module_module_to_envrs   r#   	set_statestate        )r!   remote_worker_idsr   rv   )r   merge_runner_statesin_evaluationbroadcast_runner_statesnum_healthy_remote_runnersr}   r   foreach_runner)sync_filters_on_rollout_workers_timeout_shasattrr{   r|   updater   merge_statesr   ru   r&   rV   popr*   )r7   r   rk   rl   rm   rn   ro   rp   rq   r!   merge	broadcastrunner_statesenv_to_module_statesmodule_to_env_statesr(   r(   r9   sync_runner_states   s   









zRunnerGroup.sync_runner_statesr   policiesfrom_worker_or_learner_groupr   to_worker_indicesrv   inference_onlyc                 K   s"  | j du r|du rtdd}| js|dur||dur|n| j }|du r'td|dur2dd |D ntg}	t|trI|jdd |	D |dt }nt|t	rZt
|jj|	|d}n|j|	|d}dd	 | D }t
|}
| jd
t|
dd||d | j dur|dur| j | dS dS dS )a  Syncs model weights from the given weight source to all remote workers.

        Weight source can be either a (local) rollout worker or a learner_group. It
        should just implement a `get_weights` method.

        Args:
            policies: Optional list of PolicyIDs to sync weights for.
                If None (default), sync weights to/from all policies.
            from_worker_or_learner_group: Optional (local) `Runner` instance or
                LearnerGroup instance to sync from. If None (default),
                sync from this `Runner`Group's local worker.
            to_worker_indices: Optional list of worker indices to sync the
                weights to. If None (default), sync to all remote workers.
            global_vars: An optional global vars dict to set this
                worker to. If None, do not update the global_vars.
            timeout_seconds: Timeout in seconds to wait for the sync weights
                calls to complete. Default is 0.0 (fire-and-forget, do not wait
                for any sync calls to finish). Setting this to 0.0 might significantly
                improve algorithm performance, depending on the algo's `training_step`
                logic.
            inference_only: Sync weights with workers that keep inference-only
                modules. This is needed for algorithms in the new stack that
                use inference-only modules. In this case only a part of the
                parameters are synced to the workers. Default is False.
        NzhNo `local_runner` in `RunnerGroup`! Must provide `from_worker_or_learner_group` arg in `sync_weights()`!z}`from_worker_or_trainer` is None. In this case, `RunnerGroup`^ should have `local_runner`. But `local_runner` is also `None`.c                 S      g | ]}t d  | qS /)r   )rD   pr(   r(   r9   rH         z,RunnerGroup.sync_weights.<locals>.<listcomp>c                 S   r   r   )r   )rD   mr(   r(   r9   rH     r   )rt   r   c                 S   s"   i | ]\}}|t tfv r||qS r(   )r   r   )rD   kvr(   r(   r9   
<dictcomp>  s
    z,RunnerGroup.sync_weights.<locals>.<dictcomp>r}   r~   F)funcr!   r   r   rv   )r   	TypeErrornum_remote_runners
ValueErrorr   r)   r   ru   r   r   r-   rV   rg   itemsr.   r   r*   r}   )r7   r   r   r   rv   r   r!   rn   weights_srcmodulesrl_module_state_refr(   r(   r9   sync_weightsz  sh   "




zRunnerGroup.sync_weightsnew_remote_runnersc                 C   s   | j   | j | dS )zHard overrides the remote `Runner`s in this set with the provided ones.

        Args:
            new_remote_workers: A list of new `Runner`s (as `ActorHandles`) to use as
                new remote workers.
        N)r4   clearrK   )r7   r   r(   r(   r9   reset  s   
zRunnerGroup.resetc                 C   sd   z+z| j dd ddd W n ty   td Y n	w W | j  dS W | j  dS | j  w )z8Calls `stop` on all `Runner`s (including the local one).c                 S   rM   rN   )stoprP   r(   r(   r9   rR     rS   z"RunnerGroup.stop.<locals>.<lambda>FT)healthy_onlyr   zFailed to stop workers!N)r   	ExceptionrX   	exceptionr4   r   r7   r(   r(   r9   r     s   

zRunnerGroup.stop)r!   r   r   r   rv   return_obj_refsmark_healthyr   r   r   r   r   c             	   C   s   |r|rJ dg }	|r>| j dur>|r|d }
|dd }ni }
|}t|tr4t| j |d	i |
g}	n
|| j fi |
g}	| j sE|	S | jj|||||||d}tj|| j	d dd |
 D }|	| S )
a  Calls the given function with each `Runner` as its argument.

        Args:
            func: The function to call for each `Runner`s. The only call argument is
                the respective `Runner` instance.
            local_env_runner: Whether to apply `func` to local `Runner`, too.
                Default is True.
            healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
            remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
                Use None (default) for all remote `Runner`s.
            timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
                fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
                synchronous execution).
            return_obj_refs: Whether to return `ObjectRef` instead of actual results.
                Note, for fault tolerance reasons, these returned ObjectRefs should
                never be resolved with ray.get() outside of this `RunnerGroup`.
            mark_healthy: Whether to mark all those `Runner`s healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that `Runner`s are NOT set unhealthy, if they simply time out
                (only if they return a `RayActorError`).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects `Runner`s that are currently tagged as
                unhealthy).

        Returns:
             The list of return values of all calls to `func([worker])`.
        z-Can not return `ObjectRef` from local worker.Nr   r#   )r!   r   remote_actor_idsrv   r   r   ignore_ray_errorsc                 S   s   g | ]}|  qS r(   )rV   rD   rr(   r(   r9   rH   Q  s    z.RunnerGroup.foreach_runner.<locals>.<listcomp>r(   )r   r)   r\   getattrr4   	actor_idsrT   r    handle_remote_call_result_errorsrW   ignore_errors)r7   r   r!   r   r   r   rv   r   r   local_resultlocal_kwargsremote_resultsr(   r(   r9   r     s@   )


zRunnerGroup.foreach_runner)r   r   c                C   s   | j j|||dS )a  Calls the given function asynchronously with each `Runner` as the argument.

        Does not return results directly. Instead, `fetch_ready_async_reqs()` can be
        used to pull results in an async manner whenever they are available.

        Args:
            func: The function to call for each `Runner`s. The only call argument is
                the respective `Runner` instance.
            healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
            remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.

        Returns:
             The number of async requests that have actually been made. This is the
             length of `remote_worker_ids` (or self.num_remote_workers()` if
             `remote_worker_ids` is None) minus the number of requests that were NOT
             made b/c a remote `Runner` already had its
             `max_remote_requests_in_flight_per_actor` counter reached.
        )r   r   )r4   foreach_actor_async)r7   r   r   r   r(   r(   r9   foreach_runner_asyncU  s
   z RunnerGroup.foreach_runner_asyncrv   r   r   c                C   s4   | j j|||d}tj|| jd dd | D S )a  Get esults from outstanding asynchronous requests that are ready.

        Args:
            timeout_seconds: Time to wait for results. Default is 0, meaning
                those requests that are already ready.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark all those workers healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that workers are NOT set unhealthy, if they simply time out
                (only if they return a RayActorError).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects workers that are currently tagged as
                unhealthy).

        Returns:
            A list of results successfully returned from outstanding remote calls,
            paired with the indices of the callee workers.
        r   r   c                 S   s   g | ]	}|j | fqS r(   )actor_idrV   r   r(   r(   r9   rH     s    z6RunnerGroup.fetch_ready_async_reqs.<locals>.<listcomp>)r4   fetch_ready_async_reqsr   r   rW   r   )r7   rv   r   r   r   r(   r(   r9   r   t  s   z"RunnerGroup.fetch_ready_async_reqsc                 C   s   | j j| jddS )zChecks for unhealthy workers and tries restoring their states.

        Returns:
            List of IDs of the workers that were restored.
        T)rv   r   )r4   probe_unhealthy_actorsrunner_health_probe_timeout_sr   r(   r(   r9   probe_unhealthy_runners  s   z#RunnerGroup.probe_unhealthy_runnersc                 C      dS )z>Number of seconds to wait for health probe calls to `Runner`s.Nr(   r   r(   r(   r9   r         z)RunnerGroup.runner_health_probe_timeout_sc                 C   r   )zClass for each runner.Nr(   r   r(   r(   r9   rZ     r   zRunnerGroup.runner_clsc                 C      | j S )z(Returns the config for a local `Runner`.)r?   r   r(   r(   r9   rC        zRunnerGroup._local_configc                 C   r   )zReturns the local `Runner`.)r>   r   r(   r(   r9   r     r   zRunnerGroup.local_runnerc                 C   
   | j  S )z(Returns the list of remote `Runner` IDs.)r4   healthy_actor_idsr   r(   r(   r9   healthy_runner_ids     
zRunnerGroup.healthy_runner_idsc                 C   r   )z)Number of runners to schedule and manage.Nr(   r   r(   r(   r9   r&     r   zRunnerGroup.num_runnersc                 C   r   )zNumber of remote `Runner`s.)r4   rI   r   r(   r(   r9   r     r   zRunnerGroup.num_remote_runnersc                 C   r   )z/Returns the number of healthy remote `Runner`s.)r4   num_healthy_actorsr   r(   r(   r9   r     r   z&RunnerGroup.num_healthy_remote_runnersc                 C   s   t t| j|   S )z(Returns the number of healthy `Runner`s.)intboolr>   r   r   r(   r(   r9   num_healthy_runners  s   zRunnerGroup.num_healthy_runnersc                 C   r   )z/Returns the number of in-flight async requests.)r4   num_outstanding_async_reqsr   r(   r(   r9   num_in_flight_async_reqs  r   z$RunnerGroup.num_in_flight_async_reqsc                 C   r   )zIReturns the number of times managed remote `Runner`s have been restarted.)r4   total_num_restartsr   r(   r(   r9   num_remote_runner_restarts  r   z&RunnerGroup.num_remote_runner_restartsc                 C   r   )z!Remote arguments for each runner.Nr(   r   r(   r(   r9   rh     r   zRunnerGroup._remote_argsc                 C   r   )z'If errors in runners should be ignored.Nr(   r   r(   r(   r9   rW     r   z)RunnerGroup._ignore_ray_errors_on_runnersc                 C   r   )z&Maximum requests in flight per runner.Nr(   r   r(   r(   r9   r3     r   z.RunnerGroup._max_requests_in_flight_per_runnerc                 C   r   )z.If runners should validated after constructed.Nr(   r   r(   r(   r9   rA     r   z0RunnerGroup._validate_runners_after_construction)FNNr   T)F)NNNr   F)r"   N)r"   r   )2r[   
__module____qualname__r   r   r\   r   r   r   r:   r    r@   r   r;   r   rB   r   r   r   r   r
   floatr   r   r   r   r   r   r   r   r   r   propertyabcabstractmethodr   rZ   rC   r   r   r&   r   r   r   r   r   rh   rW   r3   rA   r(   r(   r(   r9   r   (   sh   	


C

"
)

 '


t

*	

S*
"
'r   )	metaclass)+r   loggingtypingr   r   r   r   r   r   r   r	   r
   r-   	ray.actorr   ray.exceptionsr   ray.rllib.corer   r   r   r   $ray.rllib.core.learner.learner_groupr   ray.rllib.utils.actor_managerr   ray.rllib.utils.metricsr   r   ray.rllib.utils.runners.runnerr   ray.rllib.utils.typingr   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   	getLoggerr[   rX   r   ABCMetar   r(   r(   r(   r9   <module>   s&    ,
