o
    `۷iW                     @   s   d dl Z d dlZd dlmZmZmZmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ e eZeG d	d
 d
eZdae Zee
defddZdS )    N)AnyDictListOptional)client_mode_hook)actors)(parse_pg_formatted_resources_to_original)TaskID)
RuntimeEnv)
Deprecated	PublicAPIc                   @   s$  e Zd ZdZdd Zeddddeeef fdd	Z	e
ed
dddd ZdefddZe
eddddd ZdefddZdefddZdefddZe
eddddd Zdee fddZdefddZdee fdd Zdee fd!d"Ze
ed#ddd$d% Zdee fd&d'Zdee fd(d)Ze
d*d+ Ze
d,d- Ze
ed.ddd/d0 Zdee fd1d2Ze
d3d4 Zd5d6 Z d7d8 Z!e
d9d: Z"e
d;d< Z#e
d=d> Z$ed?dddeee%e f fd@dAZ&deee%e f fdBdCZ'deee%e f fdDdEZ(de)fdFdGZ*dHS )IRuntimeContextz)A class used for getting runtime context.c                 C   s   |d usJ || _ d S N)workerselfr    r   I/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/runtime_context.py__init__   s   
zRuntimeContext.__init__z4Use get_xxx_id() methods to get relevant ids insteadT)messagewarningreturnc                 C   sP   | j | j| jd}| jjtjjjkr&| jdur| j|d< | j	dur&| j	|d< |S )zxGet a dictionary of the current context.

        Returns:
            dict: Dictionary of the current context.
        )job_idnode_id	namespaceNtask_idactor_id)
r   r   r   r   moderay_privateWORKER_MODEr   r   )r   contextr   r   r   get   s   




zRuntimeContext.getzUse get_job_id() insteadc                 C      | j j}| r
J |S )a  Get current job ID for this worker or driver.

        Job ID is the id of your Ray drivers that create tasks or actors.

        Returns:
            If called by a driver, this returns the job ID. If called in
            a task, return the job ID of the associated driver.

        )r   current_job_idis_nilr   r   r   r   r   r   .      zRuntimeContext.job_idc                 C       t  std| jj}| S )a  Get current job ID for this worker or driver.

        Job ID is the id of your Ray drivers that create tasks or actors.

        Returns:
            If called by a driver, this returns the job ID. If called in
            a task, return the job ID of the associated driver. The
            job ID will be hex format.

        Raises:
            RuntimeError: If Ray has not been initialized.
        z=Job ID is not available because Ray has not been initialized.)r   is_initializedRuntimeErrorr   r$   hexr&   r   r   r   
get_job_id>      zRuntimeContext.get_job_idzUse get_node_id() insteadc                 C   r#   )aR  Get the ID for the node that this process is running on.

        This can be called from within a driver, task, or actor.
        When called from a driver that is connected to a remote Ray cluster using
        Ray Client, this returns the ID of the head node.

        Returns:
            A node id for this worker or driver.
        )r   current_node_idr%   r   r   r   r   r   r   R   r'   zRuntimeContext.node_idc                 C   r(   )a  Get the ID for the node that this process is running on.

        This can be called from within a driver, task, or actor.
        When called from a driver that is connected to a remote Ray cluster using
        Ray Client, this returns the ID of the head node.

        Returns:
            A node id in hex format for this worker or driver.

        Raises:
            RuntimeError: If Ray has not been initialized.
        z>Node ID is not available because Ray has not been initialized.)r   r)   r*   r   r.   r+   r/   r   r   r   get_node_idb   r-   zRuntimeContext.get_node_idc                 C   s   t  std| jjjS )a%  Get the session name for the Ray cluster this process is connected to.

        The session name uniquely identifies a Ray cluster instance. This is the
        same value that appears as the ``SessionName`` label in Ray metrics,
        making it useful for filtering metrics when multiple clusters run the same
        application name.

        This can be called from within a driver, task, or actor.

        Example:

            .. testcode::

                import ray

                ray.init()
                session_name = ray.get_runtime_context().get_session_name()
                print(f"Session Name: {session_name}")

                @ray.remote
                def get_session_name():
                    return ray.get_runtime_context().get_session_name()

                # Session name is the same across all processes in the cluster
                assert ray.get(get_session_name.remote()) == session_name

                # Use SessionName label to filter metrics by cluster, e.g.:
                # ray_serve_http_request_latency_ms_bucket{SessionName="<session_name>"}

        Returns:
            A session name string for the Ray cluster (e.g.,
            "session_2025-01-01_00-00-00_000000_1234").

        Raises:
            RuntimeError: If Ray has not been initialized.
        zCSession name is not available because Ray has not been initialized.)r   r)   r*   r   nodesession_namer   r   r   r   get_session_namev   s
   %
zRuntimeContext.get_session_namec                 C   s   t  std| jj S )zGet current worker ID for this worker or driver process.

        Returns:
            A worker id in hex format for this worker or driver process.

        Raises:
            RuntimeError: If Ray has not been initialized.
        z@Worker ID is not available because Ray has not been initialized.)r   r)   r*   r   	worker_idr+   r3   r   r   r   get_worker_id   s
   	zRuntimeContext.get_worker_idzUse get_task_id() insteadc                 C   s<   | j jtjj jksJ d| j j |  }| s|S dS )aZ  Get current task ID for this worker.

        Task ID is the id of a Ray task.
        This shouldn't be used in a driver process.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def ready(self):
                        return True

                @ray.remote
                def f():
                    return True

                # All the below code generates different task ids.
                # Task ids are available for actor creation.
                a = Actor.remote()
                # Task ids are available for actor tasks.
                a.ready.remote()
                # Task ids are available for normal tasks.
                f.remote()

        Returns:
            The current worker's task id. None if there's no task id.
        ZThis method is only available when the process is a                 worker. Current mode: N)r   r   r   r   r    _get_current_task_idr%   r   r   r   r   r   r      s   $zRuntimeContext.task_idc                 C   sF   | j jtjj jkrtd| j j  dS |  }| s!|	 S dS )a  Get current task ID for this worker.

        Task ID is the id of a Ray task. The ID will be in hex format.
        This shouldn't be used in a driver process.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_id(self):
                        return ray.get_runtime_context().get_task_id()

                @ray.remote
                def get_task_id():
                    return ray.get_runtime_context().get_task_id()

                # All the below code generates different task ids.
                a = Actor.remote()
                # Task ids are available for actor tasks.
                print(ray.get(a.get_task_id.remote()))
                # Task ids are available for normal tasks.
                print(ray.get(get_task_id.remote()))

            .. testoutput::
                :options: +MOCK

                16310a0f0a45af5c2746a0e6efb235c0962896a201000000
                c2668a65bda616c1ffffffffffffffffffffffff01000000

        Returns:
            The current worker's task id in hex. None if there's no task id.
        JThis method is only available when the process is a worker. Current mode: N)
r   r   r   r   r    loggerr   r8   r%   r+   r9   r   r   r   get_task_id   s   &zRuntimeContext.get_task_idc                 C      | j jS r   )r   current_task_idr3   r   r   r   r8   
  s   z#RuntimeContext._get_current_task_idc                 C   2   | j jtjj jkrtd| j j  dS | j jS )aC  Get current task name for this worker.

        Task name by default is the task's function call string. It can also be
        specified in options when triggering a task.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_name(self):
                        return ray.get_runtime_context().get_task_name()

                @ray.remote
                class AsyncActor:
                    async def get_task_name(self):
                        return ray.get_runtime_context().get_task_name()

                @ray.remote
                def get_task_name():
                    return ray.get_runtime_context().get_task_name()

                a = Actor.remote()
                b = AsyncActor.remote()
                # Task names are available for actor tasks.
                print(ray.get(a.get_task_name.remote()))
                # Task names are available for async actor tasks.
                print(ray.get(b.get_task_name.remote()))
                # Task names are available for normal tasks.
                # Get default task name
                print(ray.get(get_task_name.remote()))
                # Get specified task name
                print(ray.get(get_task_name.options(name="task_name").remote()))

            .. testoutput::
                :options: +MOCK

                Actor.get_task_name
                AsyncActor.get_task_name
                get_task_name
                task_name

        Returns:
            The current worker's task name
        r:   N)r   r   r   r   r    r;   r   current_task_namer3   r   r   r   get_task_name  s   2zRuntimeContext.get_task_namec                 C   r?   )a  Get current task function name string for this worker.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_function_name(self):
                        return ray.get_runtime_context().get_task_function_name()

                @ray.remote
                class AsyncActor:
                    async def get_task_function_name(self):
                        return ray.get_runtime_context().get_task_function_name()

                @ray.remote
                def get_task_function_name():
                    return ray.get_runtime_context().get_task_function_name()

                a = Actor.remote()
                b = AsyncActor.remote()
                # Task functions are available for actor tasks.
                print(ray.get(a.get_task_function_name.remote()))
                # Task functions are available for async actor tasks.
                print(ray.get(b.get_task_function_name.remote()))
                # Task functions are available for normal tasks.
                print(ray.get(get_task_function_name.remote()))

            .. testoutput::
                :options: +MOCK

                [python module name].Actor.get_task_function_name
                [python module name].AsyncActor.get_task_function_name
                [python module name].get_task_function_name

        Returns:
            The current worker's task function call string
        r:   N)r   r   r   r   r    r;   r   current_task_function_namer3   r   r   r   get_task_function_nameG  s   +z%RuntimeContext.get_task_function_namezUse get_actor_id() insteadc                 C   s<   | j jtjj jksJ d| j j | j j}| s|S dS )zGet the current actor ID in this worker.

        ID of the actor of the current process.
        This shouldn't be used in a driver process.

        Returns:
            The current actor id in this worker. None if there's no actor id.
        r7   N)r   r   r   r   r    r   r%   r   r   r   r   r   r   z  s   zRuntimeContext.actor_idc                 C   sF   | j jtjj jkrtd| j j  dS | j j}| s!|	 S dS )a6  Get the current actor ID in this worker.

        ID of the actor of the current process.
        This shouldn't be used in a driver process.
        The ID will be in hex format.

        Returns:
            The current actor id in hex format in this worker. None if there's no
            actor id.
        r:   N)
r   r   r   r   r    r;   debugr   r%   r+   rD   r   r   r   get_actor_id  s   zRuntimeContext.get_actor_idc                 C   sF   | j jtjj jkrtd| j j  dS | j j}| s!| j j	S dS )a  Get the current actor name of this worker.

        This shouldn't be used in a driver process.
        The name is in string format.

        Returns:
            The current actor name of this worker.
            If a current worker is an actor, and
            if actor name doesn't exist, it returns an empty string.
            If a current worker is not an actor, it returns None.
        r:   N)
r   r   r   r   r    r;   r   r   r%   
actor_namerD   r   r   r   get_actor_name  s   zRuntimeContext.get_actor_namec                 C   r=   )zvGet the current namespace of this worker.

        Returns:
            The current namespace of this worker.
        )r   r   r3   r   r   r   r     s   zRuntimeContext.namespacec                 C   s2   | j  r	J dt| j  d}|o|d dkS )zCheck whether this actor has been restarted.

        Returns:
            Whether this actor has been ever restarted.
        z0This method should't be called inside Ray tasks.)r   NumRestartsr   )r   r%   r   r+   )r   
actor_infor   r   r   was_current_actor_reconstructed  s   z.RuntimeContext.was_current_actor_reconstructedz$Use get_placement_group_id() insteadc                 C   r=   )zGet the current Placement group ID of this worker.

        Returns:
            The current placement group id of this worker.
        )r   placement_group_idr3   r   r   r   current_placement_group_id  s   z)RuntimeContext.current_placement_group_idc                 C   s   | j j}| s| S dS )zGet the current Placement group ID of this worker.

        Returns:
            The current placement group id in hex format of this worker.
        N)r   rL   r%   r+   )r   pg_idr   r   r   get_placement_group_id  s   z%RuntimeContext.get_placement_group_idc                 C   r=   )a	  Get if the current task should capture parent's placement group.

        This returns True if it is called inside a driver.

        Returns:
            Return True if the current task should implicitly
                capture the parent placement group.
        )r   -should_capture_child_tasks_in_placement_groupr3   r   r   r   rP     s   
z<RuntimeContext.should_capture_child_tasks_in_placement_groupc                 C   sX   | j jtjj jksJ d| j j | j   | j j }dd | D }t	|}|S )a  Get the assigned resources to this worker.

        By default for tasks, this will return {"CPU": 1}.
        By default for actors, this will return {}. This is because
        actors do not have CPUs assigned to them by default.

        Returns:
            A dictionary mapping the name of a resource to a float, where
            the float represents the amount of that resource reserved
            for this worker.
        r7   c                 S   s$   i | ]\}}|t d d |D qS )c                 s   s    | ]\}}|V  qd S r   r   ).0_amtr   r   r   	<genexpr>   s    zCRuntimeContext.get_assigned_resources.<locals>.<dictcomp>.<genexpr>)sum)rQ   resmappingr   r   r   
<dictcomp>  s    z9RuntimeContext.get_assigned_resources.<locals>.<dictcomp>)
r   r   r   r   r    check_connectedcore_workerresource_idsitemsr   )r   resource_id_mapresource_mapresultr   r   r   get_assigned_resources  s   
z%RuntimeContext.get_assigned_resourcesc                 C   r=   )zGet the runtime env string used for the current driver or worker.

        Returns:
            The runtime env string currently using by this worker.
        )r   runtime_envr3   r   r   r   get_runtime_env_string  s   z%RuntimeContext.get_runtime_env_stringc                 C   s   t |  S )zGet the runtime env used for the current driver or worker.

        Returns:
            The runtime env currently using by this worker. The type of
                return value is ray.runtime_env.RuntimeEnv.
        )r
   deserializerb   r3   r   r   r   ra     s   	zRuntimeContext.runtime_envc                 C   s0   | j }|  |j}| rtd|j|S )zvGet the current actor handle of this actor itself.

        Returns:
            The handle of current actor.
        z*This method is only available in an actor.)r   rY   r   r%   r*   rZ   get_actor_handle)r   r   r   r   r   r   current_actor  s   zRuntimeContext.current_actorc                 C   s   | j   | j jjS )znGet the GCS address of the ray cluster.

        Returns:
            The GCS address of the cluster.
        )r   rY   
gcs_clientaddressr3   r   r   r   gcs_address(  s   

zRuntimeContext.gcs_addressz!Use get_accelerator_ids() insteadc                 C   s   |   S r   )get_accelerator_idsr3   r   r   r   get_resource_ids2  s   zRuntimeContext.get_resource_idsc                 C   sN   | j }|  i }tjj D ]}||d| d}dd |D ||< q|S )a  
        Get the current worker's visible accelerator ids.

        Returns:
            A dictionary keyed by the accelerator resource name. The values are a list
            of ids `{'GPU': ['0', '1'], 'neuron_cores': ['0', '1'],
            'TPU': ['0', '1']}`.
        ^z_group_[0-9A-Za-z]+$c                 S   s   g | ]}t |qS r   )str)rQ   idr   r   r   
<listcomp>I  s    z6RuntimeContext.get_accelerator_ids.<locals>.<listcomp>)r   rY   r   r   accelerators"get_all_accelerator_resource_names,get_accelerator_ids_for_accelerator_resource)r   r   ids_dictaccelerator_resource_nameaccelerator_idsr   r   r   ri   6  s   	

z"RuntimeContext.get_accelerator_idsc                 C   s   | j }|  |jS )z
        Get the node labels of the current worker.

        Returns:
            A dictionary of label key-value pairs.
        )r   rY   current_node_labelsr   r   r   r   get_node_labelsL  s   zRuntimeContext.get_node_labelsc                 C   s@   | j jtjj jkrtd| j j | j j rtd| j jS )am  Check if the current task has been canceled.

        This can be used to periodically check if ray.cancel() has been
        called on the current task and perform graceful cleanup.

        Returns:
            True if the task has been canceled, False otherwise.

        Raises:
            RuntimeError: If called from a driver or async actor context.
        r:   z/This method is not supported in an async actor.)	r   r   r   r   r    r*   rZ   current_actor_is_asynciois_canceledr3   r   r   r   rx   X  s   zRuntimeContext.is_canceledN)+__name__
__module____qualname____doc__r   r   r   rl   r   r"   propertyr   r,   r   r0   r4   r6   r   r   r<   r	   r8   rA   rC   r   rF   rH   r   rK   rM   rO   rP   r`   rb   ra   re   rh   r   rj   ri   rv   boolrx   r   r   r   r   r      sh    

+
)/:3



	





	r   r   c                   C   sB   t  tdu rttjjjatW  d   S 1 sw   Y  dS )af  Get the runtime context of the current driver/worker.

    The obtained runtime context can be used to get the metadata
    of the current driver, task, or actor.

    Example:

        .. testcode::

            import ray
            # Get the job id.
            ray.get_runtime_context().get_job_id()
            # Get the session name (used as SessionName label in Ray metrics).
            ray.get_runtime_context().get_session_name()
            # Get the actor id.
            ray.get_runtime_context().get_actor_id()
            # Get the task id.
            ray.get_runtime_context().get_task_id()

    N)_runtime_context_lock_runtime_contextr   r   r   r   global_workerr   r   r   r   get_runtime_contextt  s
   $r   )logging	threadingtypingr   r   r   r   ray._private.workerr   ray._private.client_mode_hookr   ray._private.stater   ray._private.utilsr   ray._rayletr	   ray.runtime_envr
   ray.util.annotationsr   r   	getLoggerry   r;   objectr   r   Lockr   r   r   r   r   r   <module>   s,    
    c