o
    `۷it                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZmZ e eZeddeG dd deZ	ddededededededee fddZ dd Z!G dd deZ"dS )    N)	dataclass)Optional)ray_constants)get_address_and_port)WorkerGroup)BackendBackendConfig)*DEFAULT_JAX_DISTRIBUTED_SHUTDOWN_TIMEOUT_S"JAX_DISTRIBUTED_SHUTDOWN_TIMEOUT_S)	PublicAPI)get_tpu_coordinator_env_varsget_tpu_worker_resourcesalpha)	stabilityc                   @   s2   e Zd ZU dZeed< dZeed< edd ZdS )	JaxConfigFuse_tpuuse_gpuc                 C   s   t S N)_JaxBackend)self r   M/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/v2/jax/config.pybackend_cls   s   zJaxConfig.backend_clsN)	__name__
__module____qualname__r   bool__annotations__r   propertyr   r   r   r   r   r      s
   
 r   master_addr_with_portnum_workersindexr   r   resources_per_workerjax_env_varsc                 C   s  t jdd }|s|rdt jd< d}|r)| D ]\}}	|t jvr(|	t j|< q|s4|r4dt jd< d}d|dv rP|dd}
ddd	 t|
D t jd
< ddl}d|dv rh|j	
| || td d|dv r|
dkrztt|
}nd}|j	
| ||| td dS dS )a3  Set up distributed Jax training information.

    This function should be called on each worker. It sets JAX environment
    variables and initializes JAX distributed training.

    Args:
        master_addr_with_port: The master address with port for coordination.
        num_workers: Total number of workers.
        index: Index of this worker.
        use_tpu: Whether to configure for TPU. If True and JAX_PLATFORMS is not
            already set, it will be set to "tpu".
        use_gpu: Whether to configure for GPU. If True and JAX_PLATFORMS is not
            already set, it will be set to "cuda".
        resources_per_worker: The resources per worker.
        jax_env_vars: The JAX coordinator env vars to inject for multi-slice. These
            values do not override existing values if specified.
    JAX_PLATFORMS tpucuda,GPUr   c                 s   s    | ]}t |V  qd S r   )str).0ir   r   r   	<genexpr>M   s    
z5_setup_jax_distributed_environment.<locals>.<genexpr>CUDA_VISIBLE_DEVICESNz#Initialized JAX distributed on TPU.z$Initialized JAX distributed on CUDA.)osenvirongetloweritemssplitjoinrangejaxdistributed
initializeloggerinfolist)r   r    r!   r   r   r"   r#   jax_platformskvnum_gpus_per_workerr7   local_device_idsr   r   r   "_setup_jax_distributed_environment    s<   





rB   c               
   C   sP   zddl } | j  W dS  ty' } ztd|  W Y d}~dS d}~ww )zShutdown JAX distributed environment.

    This function should be called on each worker during cleanup.
    If JAX distributed was not initialized, this is a no-op.
    r   N'Error during JAX distributed shutdown: )r7   r8   shutdown	Exceptionr:   warning)r7   er   r   r   _shutdown_jax_distributedb   s   rH   c                   @   s0   e Zd ZdedefddZdedefddZdS )r   worker_groupbackend_configc                 C   s  |j s|jsd S |dt\}}| d| }|j r%t|dr%| j}nd}|j r@|dkr@|jj}t	|j
|j|jdd\}}	n	tdt|| }t|}
g }t|
D ]/}i }|dkrkt|| |d }t|||d}||j|t|t|||j |j| |d	 qSt| d S )Nr   :get_worker_group_context   )topologyaccelerator_typeresources_per_unit
num_slices)coordinator_addressrQ   slice_id)r   r    r!   r   r   r"   r#   )r   r   execute_singler   hasattrrL   rQ   _train_run_contextscaling_configr   rN   rO   r"   maxlenr6   minr   appendexecute_single_asyncrB   get_resources_per_workerrayr1   )r   rI   rJ   master_addrmaster_portr   rQ   rW   workers_per_slice_num_workers_totalsetup_futuresr,   env_varsrS   r   r   r   on_startq   sR   z_JaxBackend.on_startc              
   C   s   |j s|jsdS |t}ttt}ztj	||d t
d W dS  tjjy7   t
d| d Y dS  tyQ } zt
d|  W Y d}~dS d}~ww )zBCleanup JAX distributed resources when shutting down worker group.N)timeoutz"JAX distributed shutdown completedz)JAX distributed shutdown timed out after z= seconds. This may indicate workers are hung or unresponsive.rC   )r   r   execute_asyncrH   r   env_integerr
   r	   r^   r1   r:   debug
exceptionsGetTimeoutErrorrF   rE   )r   rI   rJ   shutdown_futures	timeout_srG   r   r   r   on_shutdown   s$   


z_JaxBackend.on_shutdownN)r   r   r   r   r   rf   ro   r   r   r   r   r   p   s    7r   r   )#loggingr/   dataclassesr   typingr   r^   ray._privater   ray.train._internal.utilsr    ray.train._internal.worker_groupr   ray.train.backendr   r   ray.train.constantsr	   r
   ray.utilr   ray.util.tpur   r   	getLoggerr   r:   r   r*   intr   dictrB   rH   r   r   r   r   r   <module>   sD    

B