o
    biM                     @   s   d dl Z d dlmZmZmZmZ d dlZd dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ e eZeG dd	 d	eZdae Zee
d
efddZdS )    N)AnyDictListOptional)client_mode_hook)(parse_pg_formatted_resources_to_original)TaskID)
RuntimeEnv)
Deprecated	PublicAPIc                   @   s  e Zd ZdZdd Zeddddeeef fdd	Z	e
ed
dddd ZdefddZe
eddddd ZdefddZdefddZe
eddddd Zdee fddZdefddZdee fddZdee fdd Ze
ed!ddd"d# Zdee fd$d%Zdee fd&d'Ze
d(d) Ze
d*d+ Ze
ed,ddd-d. Zdee fd/d0Ze
d1d2 Zd3d4 Zd5d6 Z e
d7d8 Z!e
d9d: Z"e
d;d< Z#ed=dddeee$e f fd>d?Z%deee$e f fd@dAZ&deee$e f fdBdCZ'dDS )ERuntimeContextz)A class used for getting runtime context.c                 C   s   |d usJ || _ d S N)workerselfr    r   G/home/ubuntu/.local/lib/python3.10/site-packages/ray/runtime_context.py__init__   s   
zRuntimeContext.__init__z4Use get_xxx_id() methods to get relevant ids insteadT)messagewarningreturnc                 C   sP   | j | j| jd}| jjtjjjkr&| jdur| j|d< | j	dur&| j	|d< |S )zxGet a dictionary of the current context.

        Returns:
            dict: Dictionary of the current context.
        )job_idnode_id	namespaceNtask_idactor_id)
r   r   r   r   moderay_privateWORKER_MODEr   r   )r   contextr   r   r   get   s   




zRuntimeContext.getzUse get_job_id() insteadc                 C      | j j}| r
J |S )a  Get current job ID for this worker or driver.

        Job ID is the id of your Ray drivers that create tasks or actors.

        Returns:
            If called by a driver, this returns the job ID. If called in
            a task, return the job ID of the associated driver.

        )r   current_job_idis_nilr   r   r   r   r   r   -   s   zRuntimeContext.job_idc                 C       t  sJ d| jj}| S )a  Get current job ID for this worker or driver.

        Job ID is the id of your Ray drivers that create tasks or actors.

        Returns:
            If called by a driver, this returns the job ID. If called in
            a task, return the job ID of the associated driver. The
            job ID will be hex format.

        Raises:
            AssertionError: If not called in a driver or worker. Generally,
                this means that ray.init() was not called.
        z=Job ID is not available because Ray has not been initialized.)r   is_initializedr   r#   hexr%   r   r   r   
get_job_id=   s   zRuntimeContext.get_job_idzUse get_node_id() insteadc                 C   r"   )zGet current node ID for this worker or driver.

        Node ID is the id of a node that your driver, task, or actor runs.

        Returns:
            A node id for this worker or driver.
        )r   current_node_idr$   r   r   r   r   r   r   Q   s   
zRuntimeContext.node_idc                 C   r&   )a  Get current node ID for this worker or driver.

        Node ID is the id of a node that your driver, task, or actor runs.
        The ID will be in hex format.

        Returns:
            A node id in hex format for this worker or driver.

        Raises:
            AssertionError: If not called in a driver or worker. Generally,
                this means that ray.init() was not called.
        z>Node ID is not available because Ray has not been initialized.)r   r'   r   r*   r(   r+   r   r   r   get_node_id_   s   zRuntimeContext.get_node_idc                 C   s   t  sJ d| jj S )zGet current worker ID for this worker or driver process.

        Returns:
            A worker id in hex format for this worker or driver process.
        z@Worker ID is not available because Ray has not been initialized.)r   r'   r   	worker_idr(   r   r   r   r   get_worker_idr   s
   zRuntimeContext.get_worker_idzUse get_task_id() insteadc                 C   s<   | j jtjj jksJ d| j j |  }| s|S dS )aZ  Get current task ID for this worker.

        Task ID is the id of a Ray task.
        This shouldn't be used in a driver process.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def ready(self):
                        return True

                @ray.remote
                def f():
                    return True

                # All the below code generates different task ids.
                # Task ids are available for actor creation.
                a = Actor.remote()
                # Task ids are available for actor tasks.
                a.ready.remote()
                # Task ids are available for normal tasks.
                f.remote()

        Returns:
            The current worker's task id. None if there's no task id.
        ZThis method is only available when the process is a                 worker. Current mode: N)r   r   r   r   r   _get_current_task_idr$   r   r   r   r   r   r   }   s   $zRuntimeContext.task_idc                 C   sF   | j jtjj jkrtd| j j  dS |  }| s!|	 S dS )a  Get current task ID for this worker.

        Task ID is the id of a Ray task. The ID will be in hex format.
        This shouldn't be used in a driver process.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_id(self):
                        return ray.get_runtime_context().get_task_id()

                @ray.remote
                def get_task_id():
                    return ray.get_runtime_context().get_task_id()

                # All the below code generates different task ids.
                a = Actor.remote()
                # Task ids are available for actor tasks.
                print(ray.get(a.get_task_id.remote()))
                # Task ids are available for normal tasks.
                print(ray.get(get_task_id.remote()))

            .. testoutput::
                :options: +MOCK

                16310a0f0a45af5c2746a0e6efb235c0962896a201000000
                c2668a65bda616c1ffffffffffffffffffffffff01000000

        Returns:
            The current worker's task id in hex. None if there's no task id.
        JThis method is only available when the process is a worker. Current mode: N)
r   r   r   r   r   loggerr   r1   r$   r(   r2   r   r   r   get_task_id   s   &zRuntimeContext.get_task_idc                 C      | j jS r   )r   current_task_idr.   r   r   r   r1      s   z#RuntimeContext._get_current_task_idc                 C   2   | j jtjj jkrtd| j j  dS | j jS )aB  Get current task name for this worker.

        Task name by default is the task's funciton call string. It can also be
        specified in options when triggering a task.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_name(self):
                        return ray.get_runtime_context().get_task_name()

                @ray.remote
                class AsyncActor:
                    async def get_task_name(self):
                        return ray.get_runtime_context().get_task_name()

                @ray.remote
                def get_task_name():
                    return ray.get_runtime_context().get_task_name()

                a = Actor.remote()
                b = AsyncActor.remote()
                # Task names are available for actor tasks.
                print(ray.get(a.get_task_name.remote()))
                # Task names are avaiable for async actor tasks.
                print(ray.get(b.get_task_name.remote()))
                # Task names are available for normal tasks.
                # Get default task name
                print(ray.get(get_task_name.remote()))
                # Get specified task name
                print(ray.get(get_task_name.options(name="task_name").remote()))

            .. testoutput::
                :options: +MOCK

                Actor.get_task_name
                AsyncActor.get_task_name
                get_task_name
                task_nams

        Returns:
            The current worker's task name
        r3   N)r   r   r   r   r   r4   r   current_task_namer.   r   r   r   get_task_name   s   2zRuntimeContext.get_task_namec                 C   r8   )a  Get current task function name string for this worker.

        Example:

            .. testcode::

                import ray

                @ray.remote
                class Actor:
                    def get_task_function_name(self):
                        return ray.get_runtime_context().get_task_function_name()

                @ray.remote
                class AsyncActor:
                    async def get_task_function_name(self):
                        return ray.get_runtime_context().get_task_function_name()

                @ray.remote
                def get_task_function_name():
                    return ray.get_runtime_context().get_task_function_name()

                a = Actor.remote()
                b = AsyncActor.remote()
                # Task functions are available for actor tasks.
                print(ray.get(a.get_task_function_name.remote()))
                # Task functions are available for async actor tasks.
                print(ray.get(b.get_task_function_name.remote()))
                # Task functions are available for normal tasks.
                print(ray.get(get_task_function_name.remote()))

            .. testoutput::
                :options: +MOCK

                [python modual name].Actor.get_task_function_name
                [python modual name].AsyncActor.get_task_function_name
                [python modual name].get_task_function_name

        Returns:
            The current worker's task function call string
        r3   N)r   r   r   r   r   r4   r   current_task_function_namer.   r   r   r   get_task_function_name  s   +z%RuntimeContext.get_task_function_namezUse get_actor_id() insteadc                 C   s<   | j jtjj jksJ d| j j | j j}| s|S dS )zGet the current actor ID in this worker.

        ID of the actor of the current process.
        This shouldn't be used in a driver process.

        Returns:
            The current actor id in this worker. None if there's no actor id.
        r0   N)r   r   r   r   r   r   r$   r   r   r   r   r   r   G  s   zRuntimeContext.actor_idc                 C   sF   | j jtjj jkrtd| j j  dS | j j}| s!|	 S dS )a6  Get the current actor ID in this worker.

        ID of the actor of the current process.
        This shouldn't be used in a driver process.
        The ID will be in hex format.

        Returns:
            The current actor id in hex format in this worker. None if there's no
            actor id.
        r3   N)
r   r   r   r   r   r4   debugr   r$   r(   r=   r   r   r   get_actor_idZ  s   zRuntimeContext.get_actor_idc                 C   sF   | j jtjj jkrtd| j j  dS | j j}| s!| j j	S dS )a  Get the current actor name of this worker.

        This shouldn't be used in a driver process.
        The name is in string format.

        Returns:
            The current actor name of this worker.
            If a current worker is an actor, and
            if actor name doesn't exist, it returns an empty string.
            If a current worker is not an actor, it returns None.
        r3   N)
r   r   r   r   r   r4   r   r   r$   
actor_namer=   r   r   r   get_actor_nameo  s   zRuntimeContext.get_actor_namec                 C   r6   )zvGet the current namespace of this worker.

        Returns:
            The current namespace of this worker.
        )r   r   r.   r   r   r   r     s   zRuntimeContext.namespacec                 C   s6   | j  r	J dtjj| j  }|o|d dkS )zCheck whether this actor has been restarted.

        Returns:
            Whether this actor has been ever restarted.
        z0This method should't be called inside Ray tasks.NumRestartsr   )r   r$   r   r   stateactorsr(   )r   
actor_infor   r   r   was_current_actor_reconstructed  s   z.RuntimeContext.was_current_actor_reconstructedz$Use get_placement_group_id() insteadc                 C   r6   )zGet the current Placement group ID of this worker.

        Returns:
            The current placement group id of this worker.
        )r   placement_group_idr.   r   r   r   current_placement_group_id  s   z)RuntimeContext.current_placement_group_idc                 C   s   | j j}| s| S dS )zGet the current Placement group ID of this worker.

        Returns:
            The current placement group id in hex format of this worker.
        N)r   rG   r$   r(   )r   pg_idr   r   r   get_placement_group_id  s   z%RuntimeContext.get_placement_group_idc                 C   r6   )a	  Get if the current task should capture parent's placement group.

        This returns True if it is called inside a driver.

        Returns:
            Return True if the current task should implicitly
                capture the parent placement group.
        )r   -should_capture_child_tasks_in_placement_groupr.   r   r   r   rK     s   
z<RuntimeContext.should_capture_child_tasks_in_placement_groupc                 C   sX   | j jtjj jksJ d| j j | j   | j j }dd | D }t	|}|S )a  Get the assigned resources to this worker.

        By default for tasks, this will return {"CPU": 1}.
        By default for actors, this will return {}. This is because
        actors do not have CPUs assigned to them by default.

        Returns:
            A dictionary mapping the name of a resource to a float, where
            the float represents the amount of that resource reserved
            for this worker.
        r0   c                 S   s$   i | ]\}}|t d d |D qS )c                 s   s    | ]\}}|V  qd S r   r   ).0_amtr   r   r   	<genexpr>  s    zCRuntimeContext.get_assigned_resources.<locals>.<dictcomp>.<genexpr>)sum)rL   resmappingr   r   r   
<dictcomp>  s    z9RuntimeContext.get_assigned_resources.<locals>.<dictcomp>)
r   r   r   r   r   check_connectedcore_workerresource_idsitemsr   )r   resource_id_mapresource_mapresultr   r   r   get_assigned_resources  s   
z%RuntimeContext.get_assigned_resourcesc                 C   r6   )zGet the runtime env string used for the current driver or worker.

        Returns:
            The runtime env string currently using by this worker.
        )r   runtime_envr.   r   r   r   get_runtime_env_string  s   z%RuntimeContext.get_runtime_env_stringc                 C   s   t |  S )zGet the runtime env used for the current driver or worker.

        Returns:
            The runtime env currently using by this worker. The type of
                return value is ray.runtime_env.RuntimeEnv.
        )r	   deserializer]   r.   r   r   r   r\     s   	zRuntimeContext.runtime_envc                 C   s0   | j }|  |j}| rtd|j|S )zvGet the current actor handle of this actor itself.

        Returns:
            The handle of current actor.
        z*This method is only available in an actor.)r   rT   r   r$   RuntimeErrorrU   get_actor_handle)r   r   r   r   r   r   current_actor  s   zRuntimeContext.current_actorc                 C   s   | j   | j jjS )zmGet the GCS address of the ray cluster.
        Returns:
            The GCS address of the cluster.
        )r   rT   
gcs_clientaddressr.   r   r   r   gcs_address  s   

zRuntimeContext.gcs_addressz!Use get_accelerator_ids() insteadc                 C   s   |   S r   )get_accelerator_idsr.   r   r   r   get_resource_ids  s   zRuntimeContext.get_resource_idsc                 C   sN   | j }|  i }tjj D ]}||d| d}dd |D ||< q|S )a  
        Get the current worker's visible accelerator ids.

        Returns:
            A dictionary keyed by the accelerator resource name. The values are a list
            of ids `{'GPU': ['0', '1'], 'neuron_cores': ['0', '1'],
            'TPU': ['0', '1']}`.
        ^z_group_[0-9A-Za-z]+$c                 S   s   g | ]}t |qS r   )str)rL   idr   r   r   
<listcomp>  s    z6RuntimeContext.get_accelerator_ids.<locals>.<listcomp>)r   rT   r   r   accelerators"get_all_accelerator_resource_names,get_accelerator_ids_for_accelerator_resource)r   r   ids_dictaccelerator_resource_nameaccelerator_idsr   r   r   re     s   	

z"RuntimeContext.get_accelerator_idsc                 C   s   | j }|  |jS )z
        Get the node labels of the current worker.

        Returns:
            A dictionary of label key-value pairs.
        )r   rT   current_node_labelsr   r   r   r   get_node_labels  s   zRuntimeContext.get_node_labelsN)(__name__
__module____qualname____doc__r   r
   r   rh   r   r!   propertyr   r)   r   r,   r/   r   r   r5   r   r1   r:   r<   r   r?   rA   r   rF   rH   rJ   rK   r[   r]   r\   ra   rd   r   rf   re   rr   r   r   r   r   r      sd    


)/:3



	





r   r   c                   C   sB   t  tdu rttjjjatW  d   S 1 sw   Y  dS )a  Get the runtime context of the current driver/worker.

    The obtained runtime context can be used to get the metadata
    of the current task and actor.

    Note: For Ray Client, ray.get_runtime_context().get_node_id() should
    point to the head node. Also, keep in mind that ray._private.worker.global_worker
    will create a new worker object here if global_worker doesn't point to one.

    Example:

        .. testcode::

            import ray
            # Get the job id.
            ray.get_runtime_context().get_job_id()
            # Get the actor id.
            ray.get_runtime_context().get_actor_id()
            # Get the task id.
            ray.get_runtime_context().get_task_id()

    N)_runtime_context_lock_runtime_contextr   r   r   r   global_workerr   r   r   r   get_runtime_context)  s
   $r{   )loggingtypingr   r   r   r   	threadingray._private.workerr   ray._private.client_mode_hookr   ray._private.utilsr   ray._rayletr   ray.runtime_envr	   ray.util.annotationsr
   r   	getLoggerrs   r4   objectr   ry   Lockrx   r{   r   r   r   r   <module>   s*    
    