o
    ٷi                  	   @   s  d dl Z d dlZd dlmZ d dlmZ d dlZzd dlZd dlm	Z
 d dlmZ dZd dlmZ W n eyE   dZdZ
dZdZeZY nw e eZd	d
 Zdd Zed$ddZdd Zd%dedB fddZd&dededB dedefddZdd Zd%ddZdd  Zd!efd"d#ZdS )'    N)contextmanager)Any)Queue) PlacementGroupSchedulingStrategyT)PlacementGroupFc                   C   s"   t rt rdS dtjv rdS dS )z;Check if Ray is initialized without hard dependency on Ray.TRAY_RAYLET_PIDF)RAY_AVAILABLErayis_initializedosenviron r   r   Y/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/distributed/ray_utils/utils.pyis_ray_initialized   s   
r   c                 C   sR   d}| D ]}t |ttfr|D ]}||9 }qq||9 }qtjg |d }|| S )z]
    Calculate total bytes for a tensor allocation, handling nested tuples in size args.
       )dtype)
isinstancetuplelisttorchtensorelement_size)	size_argsr   num_elementssinnerr   r   r   r   calculate_total_bytes)   s   

r      c                 c   sh    d}d}t  }||k}t| dd}|r |r |r d}| j}d| _zdV  W |r,|| _dS dS |r3|| _w )a  
    Context manager to temporarily disable pin_memory if running in Ray and
    the allocation size exceeds the threshold.

    This is a workaround for Ray workers often having low ulimit -l (locked memory),
    causing OS call failed errors when allocating large pinned buffers.
    F
pin_memoryTN)r   getattrr   )obj
size_bytes	thresholdshould_disableold_pinin_rayis_large	is_pinnedr   r   r    maybe_disable_pin_memory_for_ray9   s    	
r(   c                   C   s   t stddd S )Nz(ray is required for worker_backend='ray'c                   S   s
   t ddS )Nr   )maxsize)RayQueuer   r   r   r   <lambda>`   s   
 z%get_ray_queue_class.<locals>.<lambda>)r   ImportErrorr   r   r   r   get_ray_queue_class]   s   r-   addressc                 C   sH   t s	td d S t s"ddtjddii}tj| d|d d S d S )Nz.Ray is not available, skipping initialization.env_vars
PYTHONPATH T)r.   ignore_reinit_errorruntime_env)	r   loggerwarningr	   r
   r   r   getinit)r.   r3   r   r   r   initialize_ray_clusterc   s   
r8   PACKnumber_of_stagesstrategyreturnc                 C   sd   t stdt std t| dd t| D }tjj	||d}t
|  td |S )a  Create a placement group for the given number of stages.
    Args:
        number_of_stages: The number of stages to create the placement group for.
        strategy: The strategy to use for the placement group.
    Returns:
        The placement group.
    z,ray is required for creating placement groupzJ[Orchestrator] Ray is not initialized. Initializing with default settings.c                 S   s   g | ]}d d dqS )g      ?)GPUCPUr   ).0_r   r   r   
<listcomp>~   s    z*create_placement_group.<locals>.<listcomp>)r;   z*[Orchestrator] Ray Placement Group created)r   r,   r	   r
   r4   r5   r8   rangeutilplacement_groupr6   readyinfo)r:   r.   r;   bundlespgr   r   r   create_placement_groupn   s   

rI   c              
   C   sZ   | r)t r+z	tj|  W d S  ty( } ztd|  W Y d }~d S d }~ww d S d S )Nz"Failed to remove placement group: )r   r	   rC   remove_placement_group	Exceptionr4   r5   )rH   er   r   r   rJ      s   rJ   c                 C   s   | rt |  dS dS )zETry to clean up Ray resources including placement group and shutdown.N)rJ   )rH   r   r   r   try_close_ray   s   rM   c              
   C   sX   | r(t r*zt|  W d S  ty' } ztd|  W Y d }~d S d }~ww d S d S )NzFailed to kill actor: )r   r	   killrK   r4   r5   )actorrL   r   r   r   kill_ray_actor   s   rP   placement_group_bundle_indexc                 O   sr   t stdtjddG dd d}|jt||ddtjddid	d
d }|j	j| g|R i | |S )Nz&ray is required for starting ray actorr   )num_gpusc                   @   s   e Zd Zdd ZdS )z+start_ray_actor.<locals>.OmniStageRayWorkerc                 _   s   ||i |S Nr   )selffuncargskwargsr   r   r   run   s   z/start_ray_actor.<locals>.OmniStageRayWorker.runN)__name__
__module____qualname__rX   r   r   r   r   OmniStageRayWorker   s    r\   )rD   rQ   r0   r1   1)r/   CUDA_LAUNCH_BLOCKING)scheduling_strategyr3   )
r   r,   r	   remoteoptionsr   r   r   r6   rX   )worker_entry_fnrD   rQ   rV   rW   r\   worker_actorr   r   r   start_ray_actor   s   
rd   )r   rS   )Nr9   ) loggingr   
contextlibr   typingr   r   r	   ray.util.queuer   r*   ray.util.scheduling_strategiesr   r   ray.util.placement_groupr   r,   	getLoggerrY   r4   r   r   r(   r-   strr8   intrI   rJ   rM   rP   rd   r   r   r   r   <module>   s@   
# 
