o
    
۾iK                  
   @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	Z
d dlmZ d dlmZ d dlmZ d dlmZ d d	l	mZ d d
lmZ d dlmZ d dlmZ d dlmZ erfd dlmZmZ d dlmZ ee Z!dZ"z7d dl#Z#d dl$m%Z% d dl&m'Z' zd dl(m)Z) W n e*y   d dl(m+Z, e,j-Z)Y nw G dd deZ.dZ/W n e*y Z0 zdZ#e1e0Z/dZ.W Y dZ0[0ndZ0[0ww G dd deZ2de3fddZ4dd Z5ddd ed!e1fd"d#Z6d0d%d&Z7d0d'd(Z8	d1d ed)e1dB fd*d+Z9de:fd,d-Z;de:fd.d/Z<dS )2    N)defaultdict)Future)TYPE_CHECKINGUnion)ParallelConfig)get_pp_group)KVOutputAggregator)init_loggercurrent_platform)IntermediateTensorsget_ip)AsyncModelRunnerOutput)WorkerWrapperBase)GrammarOutputSchedulerOutput)ModelRunnerOutputi  )placement_group_table)PlacementGroup)available_resources_per_node)statec                       s   e Zd ZdZd fddZdefddZdeeee	 f fdd	Z
d
d Zded ed B deded f fddZdeeef fddZdefddZ  ZS )RayWorkerWrapperzyRay wrapper for vllm.worker.Worker, allowing Worker to be
        lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.returnNc                    s   t  j|i | d| _d S )NF)super__init__compiled_dag_cuda_device_set)selfargskwargs	__class__ N/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/executor/ray_utils.pyr   -   s   
zRayWorkerWrapper.__init__c                 C   s   t  S Nr   r   r"   r"   r#   get_node_ip5   s   zRayWorkerWrapper.get_node_ipc                 C   sB   t   }tjjj}|stdtjjjt  	 | }||fS )Nz)current platform %s does not support ray.)
rayget_runtime_contextget_node_idvllm	platformsr   ray_device_keyRuntimeErrordevice_nameget_accelerator_ids)r   node_id
device_keygpu_idsr"   r"   r#   get_node_and_gpu_ids8   s   
z%RayWorkerWrapper.get_node_and_gpu_idsc                 C   sN   | j d us	J d| js%t rn| j jd usJ t| j j d| _d S d S )NWorker is not initializedT)workerr   r   is_tpudevice
set_devicer%   r"   r"   r#   setup_device_if_necessaryC   s   
z*RayWorkerWrapper.setup_device_if_necessaryexecute_model_input)r   r   )r   r   r   r   c                 C   s   |    | jd usJ dt|dkr|\}}}n|\}}d }| jjd us'J | jj||}| |r9|||fS t|trB| }t	 j
sT|rM|jrMJ ||d f}|S |d u rh| jj|}t|trh| }|S )Nr4      )r9   r5   lenmodel_runnerexecute_model_is_intermediate_tensors
isinstancer   
get_outputr   is_last_rankreq_idssample_tokens)r   r:   scheduler_outputgrammar_outputintermediate_tensorsoutputr"   r"   r#   execute_model_rayS   s0   






z"RayWorkerWrapper.execute_model_rayvarsc                 C   s   t j| d S r$   )osenvironupdate)r   rJ   r"   r"   r#   override_env_vars}   s   z"RayWorkerWrapper.override_env_varsc                 C   s
   t |tS r$   )r@   r   )r   rH   r"   r"   r#   r?      s   
z)RayWorkerWrapper._is_intermediate_tensors)r   N)__name__
__module____qualname____doc__r   strr&   tuplelistintr3   r9   r   rI   dictrN   boolr?   __classcell__r"   r"   r    r#   r   )   s&    
*r   c                       s6   e Zd ZdZddedB f fddZdddZ  ZS )	FutureWrapperaR  A wrapper around Ray output reference to meet the interface
    of .execute_model(): The top level (core busy loop) expects .result() api
    to block and return a single output.

    If aggregator is provided, the outputs from all workers are aggregated upon
    the result() call. If not only the first worker's output is returned.
    N
aggregatorc                    s   t    || _|| _d S r$   )r   r   ref_or_refsr[   )r   r\   r[   r    r"   r#   r      s   

zFutureWrapper.__init__c                 C   s.   t j| j|d}| jd u r|S | jj|ddS )Ntimeoutr   )output_rank)r'   getr\   r[   	aggregate)r   r^   outputsr"   r"   r#   result   s   
zFutureWrapper.resultr$   )rO   rP   rQ   rR   r   r   rc   rY   r"   r"   r    r#   rZ      s    rZ   r   c                   C   s   t duS )z!Returns True if Ray is available.N)r'   r"   r"   r"   r#   ray_is_available   s   rd   c                   C   s   t du rtdt ddS )z+Raise an exception if Ray is not available.NzFailed to import Ray: z+.Please install Ray with `pip install ray`.)r'   
ValueErrorray_import_errr"   r"   r"   r#   assert_ray_available   s
   
rg   placement_groupr   parallel_config
device_strc           
   
   C   s   t  sJ dt| }|d }|d }tt}| D ]\}}|| ||  qt   }	|	|vrCt	d|	 d| j
 d| d| D ]\}}t||jk ratd|j|t||||j qGd	S )
zVerify a given placement group has bundles located in the right place.

    There are 2 rules.
    - Warn if all tensor parallel workers cannot fit in a single node.
    - Fail if driver node is not included in a placement group.
    zDRay is not initialized although distributed-executor-backend is ray.bundles_to_node_idbundleszdriver node id z& is not included in a placement group z. Node id -> bundles z. You don't have enough GPUs available in a current node. Check `ray status` and `ray list nodes` to see if you have available GPUs in a node `{driver_node_id}` before starting an vLLM engine.aC  tensor_parallel_size=%d is bigger than a reserved number of %ss (%d %ss) in a node %s. Tensor parallel workers can be spread out to 2+ nodes which can degrade the performance unless you have fast interconnect across nodes, like Infiniband. To resolve this issue, make sure you have more than %d GPUs available at each node.N)r'   is_initializedr   r   rU   itemsappendr(   r)   r-   idr<   tensor_parallel_sizeloggerwarning)
rh   ri   rj   pg_databundle_to_node_idsrl   node_id_to_bundle
bundle_idxr0   driver_node_idr"   r"   r#   _verify_bundles   s>   
		ry   current_placement_groupc                 C   s   | j }t }|  }d}t | tk r?tj|g|d\}}t|dkr&n|d9 }tdt	t | | t | tk sz
tj
|dd W dS  tjjyz   tdd |D }|dkrntd	| d
| dt ddtd|dt ddw )zWait until a placement group is ready.

    It prints the informative log messages if the placement group is
    not created within time.

    
   r]   r      a6  Waiting for creating a placement group of specs for %d seconds. specs=%s. Check `ray status` and `ray list nodes` to see if you have enough resources, and make sure the IP addresses used by ray cluster are the same as VLLM_HOST_IP environment variable specified in each node if you are running on a multi-node.c                 s   s    | ]	}| d dV  qdS )GPUr   N)r`   ).0specr"   r"   r#   	<genexpr>  s    z'_wait_until_pg_ready.<locals>.<genexpr>   z+Cannot provide a placement group requiring z GPUs (placement_group_specs=z	) within z seconds.
Tensor parallel size may exceed available GPUs in your cluster. Check resources with `ray status` and `ray list nodes`.
If running on K8s with limited GPUs, consider reducing --tensor-parallel-size to match available GPU resources.Nz:Cannot provide a placement group of placement_group_specs=z within z^ seconds. See `ray status` and `ray list nodes` to make sure the cluster has enough resources.)bundle_specstimereadyPG_WAIT_TIMEOUTr'   waitr<   rr   inforV   r`   
exceptionsGetTimeoutErrorsumre   )rz   placement_group_specsspg_ready_refwait_intervalr   _total_gpu_requiredr"   r"   r#   _wait_until_pg_ready   sR   

r   c                 C   s   t j|  t }d}t | tk r>t j }|d u rd S |d9 }tdtt |  t	| t | tk sd S d S )Nr{   r|   z?Waiting for removing a placement group of specs for %d seconds.)
r'   utilremove_placement_groupr   r   get_current_placement_grouprr   r   rV   sleep)rz   r   r   pgr"   r"   r#   _wait_until_pg_removed'  s   

r   ray_addressc                    st  t   ddlm} | r+| jdkr+ddlm} | }| j|kr+td| j||| j t	
 r5td n0| s=| r]zt	d W n  ty\   td t	j|| j| jd	 Y n	w t	j|| jd
 |j  sstd|j d| jrz| j}nt	j }|rtd |j}d}|D ]}| d}	|	dkrtd  d|	r|d7 }q| j|krtd  d  d| j d| d	nhtd t	  d}
| j|
krtd    fddt| jD }t }t	  }t | }| ddk rtd  d|d  d  d|d|dd|d d| < t	jj|d d!}t| |d"us/J t ||   || _d"S )#a  Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    r   r
   r   )cuda_device_count_statelesszTensor parallel size (%d) exceeds available GPUs (%d). This may result in Ray placement group allocation failures. Consider reducing tensor_parallel_size to %d or less, or ensure your Ray cluster has %d GPUs available.z8Ray is already initialized. Skipping Ray initialization.autoz_No existing RAY instance detected. A new instance will be launched with current node resources.)addressnum_gpusruntime_env)r   r   zcurrent platform z does not support ray.z"Using the existing placement groupz/Placement group bundle cannot have more than 1 .zThe number of required z(s exceeds the total number of available z6s in the placement group. Required number of devices: z. Total number of devices: zANo current placement group found. Creating a new placement group.z\The number of required %ss exceeds the total number of available %ss in the placement group.c                    s   g | ]} d iqS )g      ?r"   )r~   r   rj   r"   r#   
<listcomp>  s    z*initialize_ray_cluster.<locals>.<listcomp>zCurrent node has no z" available. current_node_resource=z#. vLLM engine cannot start without z . Make sure you have at least 1 z% available in a node current_node_id=z current_ip=gMbP?znode:PACK)strategyN)!rg   vllm.platformsr   is_cuda
world_sizevllm.utils.torch_utilsr   rr   rs   r'   rm   r   is_rocmis_xpuinitConnectionErrorray_runtime_envr,   re   r.   rh   r   r   r   r`   cluster_resourcesranger   r(   r)   r   r   ry   )ri   r   r   r   available_gpusrz   rl   device_bundlesbundlebundle_devicesnum_devices_in_clusterr   
current_ipcurrent_node_idcurrent_node_resourcer"   r   r#   initialize_ray_cluster9  s   










r   c                  C   s@   ddl m}  t }t|d }|  }|| dksJ || S )Nr   )TPUAcceleratorManagerTPU)ray._private.acceleratorsr   r'   r   rV   !get_current_node_num_accelerators)r   r   
total_tpustpus_per_noder"   r"   r#   get_num_tpu_nodes  s   r   c                  C   sn   t j } t j }d}|r5t }|  D ]\}}||j kr0|d  D ]	\}}|| q&qt	|}|S )Nr   rk   )
r'   r   r   r   setrn   rp   hexaddr<   )pg_table
current_pg	num_nodesnodes_in_pgpg_keyr   r   noder"   r"   r#    get_num_nodes_in_placement_group  s   

r   )rz   r   r$   )=rK   r   collectionsr   concurrent.futuresr   typingr   r   r   r*   vllm.configr   vllm.distributedr   /vllm.distributed.kv_transfer.kv_connector.utilsr   vllm.loggerr	   r   vllm.sequencer   vllm.utils.network_utilsr   vllm.v1.outputsr   vllm.v1.worker.worker_baser   vllm.v1.core.sched.outputr   r   r   rO   rr   r   r'   ray.utilr   ray.util.placement_groupr   ray._private.stater   ImportErrorr   _state_available_resources_per_noder   rf   erS   rZ   rX   rd   rg   ry   r   r   r   rV   r   r   r"   r"   r"   r#   <module>   sv   
Z	

5
A
 
