o
    bio                     @   sf   d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	 e 
eZdd ZG dd de	ZdS )	    N)ListOptional)RuntimeEnvContext)RuntimeEnvPluginc                  C   s   t tdrtjdu sJ dS ddlm}  | j}| }|dkr^ddlm} dd | D }d	d
 |D }|	| t
dt  d| d}|t| W d   n1 sXw   Y  n|	d}tj| dt_dS )zNInitialize the MPI cluster. When using MPI cluster, this must be called first.initedTNr   )MPI)get_all_accelerator_managersc                 S   s   g | ]}|  qS  )#get_visible_accelerator_ids_env_var).0mr	   r	   P/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/runtime_env/mpi.py
<listcomp>   s    zmpi_init.<locals>.<listcomp>c                 S   s&   i | ]}t j|r|t j|qS r	   )osenvironget)r   nr	   r	   r   
<dictcomp>   s
    zmpi_init.<locals>.<dictcomp>z/tmp/.w)hasattrmpi_initr   mpi4pyr   
COMM_WORLDGet_rankray._private.acceleratorsr   bcastopenr   getpidwritestrr   update)r   commrankr   device_varsvisible_devicesfr	   r	   r   r      s,   



r   c                   @   sD   e Zd ZdZdZdZefdee ddde	de
ej d	d
f
ddZd
S )	MPIPlugina  Plugin for enabling MPI cluster functionality in runtime environments.

    This plugin enables an MPI cluster to run on top of Ray. It handles the setup
    and configuration of MPI processes for distributed computing tasks.

    To use this plugin, add "mpi" to the runtime environment configuration:

    Example:
        @ray.remote(
            runtime_env={
                "mpi": {
                    "args": ["-n", "4"],
                    "worker_entry": worker_entry,
                }
            }
        )
        def calc_pi():
            ...

    Here worker_entry should be function for the MPI worker to run.
    For example, it should be `'py_module.worker_func'`. The module should be able to
    be imported in the runtime.

    In the mpi worker with rank==0, it'll be the normal ray function or actor.
    For the worker with rank > 0, it'll just run `worker_func`.

    ray.runtime_env.mpi_init must be called in the ray actors/tasks before any MPI
    communication.
    Z   mpiurisruntime_env
RuntimeEnvcontextloggerreturnNc           	      C   s   |  }|d u r
d S ztjddgddd}W n tjy$   |d  w |d|j   |d}|d us=J ddg|d	g  |j	d
d|g }d
||_	d S )Nmpirunz	--versionT)capture_outputcheckz>Failed to run mpi run. Please make sure mpi has been installedzRunning MPI plugin
 worker_entryz0`worker_entry` must be setup in the runtime env.argsz-mz#ray._private.runtime_env.mpi_runner )r)   
subprocessrunCalledProcessError	exceptioninfostdoutdecoder   py_executablejoin)	selfr*   r+   r-   r.   
mpi_configprocr3   cmdsr	   r	   r   modify_contextL   s:   




zMPIPlugin.modify_context)__name__
__module____qualname____doc__prioritynamedefault_loggerr   r    r   r   loggingLoggerrC   r	   r	   r	   r   r'   *   s     r'   )rK   r   r6   typingr   r    ray._private.runtime_env.contextr   ray._private.runtime_env.pluginr   	getLoggerrD   rJ   r   r'   r	   r	   r	   r   <module>   s    
