o
    
۾iHf                     @   sj  U d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZmZ d dlmZmZ d dlm Z  d dl!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' e%durd dl(m)Z) d dl*m+Z+ ndZ)e
rd dl,m-Z- ee.Z/e Z0ee'dB  e1d< e02d eG dd dZ3G dd de Z4dS )    N)defaultdict)Callable)Future)	dataclass)TYPE_CHECKINGAny)init_logger)current_platform)get_env_vars_to_copy)get_distributed_init_methodget_ipget_open_port)GrammarOutputSchedulerOutput)ReconfigureDistributedRequestReconfigureRankType)Executor)FutureWrapperRayWorkerWrapperinitialize_ray_clusterray)ModelRunnerOutput)ActorHandle) PlacementGroupSchedulingStrategy)PlacementGroupCOMPLETED_NONE_FUTUREc                   @   s:   e Zd ZU dZeed< eed< dZeed< dZe	ed< dS )	RayWorkerMetaDataz
    Metadata for a Ray worker.
    The order of ray worker creation can be random,
    and we need to reset the rank after creating all workers.
    workercreated_rankadjusted_rank ipN)
__name__
__module____qualname____doc__r   __annotations__intr    r"   str r*   r*   Q/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/executor/ray_executor.pyr   0   s   
 r   c                   @   s  e Zd ZU dZh dZddhZdZeed< dZ	eed< d8d
dZ
edefddZd8ddZdeeef fddZdd Zdd Zd9ddZdedd	fddZ	d:dededed	B eed	B  B fd d!Z	d:d"d#deded	B eed	B  B fd$d%Z	d:ded"d#deded	B eed	B  B fd&d'Z			(			d;d)eeB d*ed	B d+e d,eeef d	B dede!e ee!e  B fd-d.Z"d/d0 Z#d1efd2d3Z$d4d5 Z%d8d6d7Z&d	S )<RayDistributedExecutorzRay-based distributed executor>   
LOCAL_RANKVLLM_HOST_IPVLLM_HOST_PORTHIP_VISIBLE_DEVICESCUDA_VISIBLE_DEVICESROCR_VISIBLE_DEVICESHF_TOKENHUGGING_FACE_HUB_TOKENTuses_raysupports_ppreturnNc                 C   s   d | _ t st rdtjd< | jsJ t| j | jj	}tj
dd}|dkr.dtjd< | | | jjd u| _| jjjdkoL| jjd u pL| jjj | _d | _d S )Nshm&VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPERAY_USAGE_STATS_ENABLED01pooling)forward_dagr	   is_tpuis_xpuosenvironr5   r   parallel_configplacement_groupget_init_workers_rayvllm_configkv_transfer_confighas_connectormodel_configrunner_typeec_transfer_configis_ec_produceruses_samplerscheduler_output)selfrD   	ray_usager*   r*   r+   _init_executorR   s    






z%RayDistributedExecutor._init_executorc                 C   s    | j j}|dkr| jjrdS |S )zRay distributed executor supports pipeline parallelism,
        meaning that it allows PP size batches to be executed concurrently.
              )rC   pipeline_parallel_sizescheduler_configasync_scheduling)rP   pp_sizer*   r*   r+   max_concurrent_batcheso   s   z-RayDistributedExecutor.max_concurrent_batchesc                 C   s\   t rt d t| dr*| jd ur,| j  dd l}| jD ]}|| qd | _d S d S d S )NzShutting down Ray distributed executor. If you see error log from logging.cc regarding SIGTERM received, please ignore because this is the expected termination process in Ray.r>   r   )loggerinfohasattrr>   teardownr   workerskill)rP   r   r   r*   r*   r+   shutdownw   s   


zRayDistributedExecutor.shutdownc                 C   s&   | di }|dddddi |S )Nruntime_envnsightzcuda,cudnn,cublasz'worker_process_%p'node)tozcuda-graph-trace)
setdefaultupdate)rP   ray_remote_kwargsra   r*   r*   r+   !_configure_ray_workers_use_nsight   s   
z8RayDistributedExecutor._configure_ray_workers_use_nsightc                 C   s2   | di }| di }|dd tjD  |S )Nra   env_varsc                 S   s   i | ]}|d qS )r<   r*   ).0env_varr*   r*   r+   
<dictcomp>       zHRayDistributedExecutor._update_noset_device_env_vars.<locals>.<dictcomp>)rf   rg   r	   ray_noset_device_env_vars)rP   rh   ra   rj   r*   r*   r+   _update_noset_device_env_vars   s   z4RayDistributedExecutor._update_noset_device_env_varsc                 C   s   | j S N)_env_vars_for_all_workersrP   r*   r*   r+   _get_env_vars_to_be_updated   s   z2RayDistributedExecutor._get_env_vars_to_be_updatedrD   r   c           %   
      s  t j}d | _g | _g | _| jjr| |}| | t j	rNt
ttt j	d}t|| jjks<J d|d| jjtt|t|ksMJ d|n g }t|jD ]\}}|tjdre|| qU|d | jj }g }t  t|D ]B\}}t|d|d}	tjdkrtjd*d||	d	|tj|d
}
ntjd*ddtj|i|	d|tj|d
}
|t|
|d qwtdd |D }t||D ]\}}||_qt d| t d| j i |D ]}|dd |< qdtf fdd}t!||d}t|D ]\}}||_"qdd |D | _dd |D }| j#d|fd g }| jg| j D ]}
|
d u r5q,|t|
j$  q,t%t
}t%t
t|D ]\}\}}|| | dd |D }| &| qN' D ]\}}t!||< qot| g }t|}t|}||krt(d| dt
|)  d| d| d 	fd!d|D }t*| j+ttj,-| j.d"d#}|D ]}|D ]}|t/j0v rt/j0| ||< qq|| _1| j#d$| 2 fd tdkrd% t3 t4 }g }t|D ](\}\}} || 5|}!t6| j7|!||| j p|| jj8 dkd&}"||" q| j#d'|fd | #d( | #d) t9| jj:D ]<}#| jg  t9| jj8D ],}$|#| jj8 |$ }t| j|# |$ks\J |#t| jk sfJ | j|# | j|  qFq8d S )+N,zZVLLM_RAY_BUNDLE_INDICES must have the same size as the world size, but got bundle_indices=z% and self.parallel_config.world_size=zMVLLM_RAY_BUNDLE_INDICES cannot have duplicate values, but got bundle_indices=r   T)rD   #placement_group_capture_child_tasksplacement_group_bundle_indexGPU)num_cpusnum_gpusscheduling_strategy)rpc_rank)ry   rz   	resourcesr{   )r   r   c                 S   s   g | ]}|j j qS r*   )r   get_node_ipremote)rk   eachr*   r*   r+   
<listcomp>       
z<RayDistributedExecutor._init_workers_ray.<locals>.<listcomp>zworkers: %szdriver_dummy_worker: %srS   itemc                    s    | j }| kr	dnd| |fS )a  
            Sort the workers based on 3 properties:
            1. If the worker is on the same node as the driver (vllm engine),
                it should be placed first.
            2. Then, if the worker is on a node with fewer workers, it should
                be placed first.
            3. Finally, if the work is on a node with smaller IP address, it
                should be placed first.
            r   rS   )r"   )r   r"   )	driver_ip	ip_countsr*   r+   sort_by_driver_then_worker_ip   s   
zORayDistributedExecutor._init_workers_ray.<locals>.sort_by_driver_then_worker_ip)keyc                 S   s   g | ]}|j qS r*   )r   rk   r   r*   r*   r+   r     rn   c                 S   s   i | ]}|j |jqS r*   )r   r    r   r*   r*   r+   rm     s    
z<RayDistributedExecutor._init_workers_ray.<locals>.<dictcomp>adjust_rankargsc                 S   s   g | ]}t |qS r*   )r(   )rk   xr*   r*   r+   r   -  s    z0Every node should have a unique IP address. Got z nodes with node ids z and z unique IP addresses z. Please check your network configuration. If you set `VLLM_HOST_IP` environment variable, make sure it is unique for each node.c              	      s*   g | ]\}}t jd tt | iqS )ru   )r	   device_control_env_varjoinmapr)   )rk   node_id_)	node_gpusr*   r+   r   J  s    r^   )exclude_varsadditional_varsdestinationupdate_environment_variablesz	127.0.0.1)rG   
local_rankrankdistributed_init_methodis_driver_workerinit_workerinit_device
load_modelr*   );envsVLLM_RAY_PER_WORKER_GPUSdriver_dummy_workerr^   pp_tp_workersrC   ray_workers_use_nsightri   rp   VLLM_RAY_BUNDLE_INDICESlistr   r(   splitlen
world_sizeset	enumeratebundle_specsrE   r	   ray_device_keyappendr   r   r   r   r   r   zipr"   rZ   debugsortedr    collective_rpcget_node_and_gpu_idsr   extenditemsRuntimeErrorkeysr
   WORKER_SPECIFIC_ENV_VARSadditional_env_varsunionADDITIONAL_ENV_VARSrA   rB   rr   rt   r   r   indexdictrG   tensor_parallel_sizerangerU   )%rP   rD   rh   rz   bundle_indices	bundle_idbundleworker_metadatar   r{   r   
worker_ipsr   r"   r   sorted_worker_metadatair   rerank_mappingworker_node_and_gpu_idsnode_workersr   gpu_idsall_ipsn_ipsn_nodes(all_args_to_update_environment_variablesenv_vars_to_copyr   namer   
all_kwargsr   r   kwargspp_ranktp_rankr*   )r   r   r   r+   rF      sD  









	
	


z(RayDistributedExecutor._init_workers_rayreconfig_requestc                 C   s,   | j d|fd |jtjkr|   d S d S )Nreinitialize_distributedr   )r   new_data_parallel_rankr   SHUTDOWN_CURRENT_RANKr`   )rP   r   r*   r*   r+   r     s   z/RayDistributedExecutor.reinitialize_distributedFrO   	non_blockc                 C   s>   | j d ur	td| jr|js| |d |S || _ |rtS d S )NzOState error: sample_tokens() must be called after execute_model() returns None.)rO   r   rN   total_num_scheduled_tokens_execute_dagr   )rP   rO   r   r*   r*   r+   execute_model  s   
z$RayDistributedExecutor.execute_modelgrammar_outputzGrammarOutput | Nonec                 C   s.   | j }|du r|rtS dS d| _ | |||S )as  Execute the model on the Ray workers.

        The scheduler output to use should have been provided in
        a prior call to execute_model().

        Args:
            grammar_output: The structured outputs grammar bitmask, if applicable.
            non_block: If True, the method will return a Future.

        Returns:
            The model runner output.
        N)rO   r   r   )rP   r   r   rO   r*   r*   r+   sample_tokens  s
   z$RayDistributedExecutor.sample_tokensc                 C   sz   | j d u r| jdd| _ | j ||f}| js%|s|d  S t|d S | jd us,J |s7| jt|S t|| jS )NF)enable_asyncior   )	r>   _compiled_ray_dagexecuterI   rE   r   kv_output_aggregator	aggregater   )rP   rO   r   r   refsr*   r*   r+   r     s   
z#RayDistributedExecutor._execute_dagr*   methodtimeoutr   r   c                    sX   t |tr|nt|~du ri  fdd| jD }|r%t|S tj||dS )z%Runs the given method on all workers.Nc                    s&   g | ]}|j jg R i qS r*   )execute_methodr   )rk   r   r   r   sent_methodr*   r+   r     s    z9RayDistributedExecutor.collective_rpc.<locals>.<listcomp>)r   )
isinstancer)   cloudpickledumpsr^   r   r   rE   )rP   r   r   r   r   r   ray_worker_outputsr*   r   r+   r     s   	z%RayDistributedExecutor.collective_rpcc                 C   s   dd l }ddlm} |d}||jd}||k r&td| d| dd l}|jd}|d u r8td|jd	}|d u rKt	j
d
krMtdd S d S )Nr   )versionz2.43.0r   zRay version z is required, but found z!ray.experimental.compiled_dag_refzQRay Compiled Graph is not installed. Run `pip install ray[cgraph]` to install it.cupyncclzcupy is not installed but required since VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE is set to 'nccl'. Run `pip install ray[cgraph]` and check cupy installation.)importlib.metadata	packagingr   parsemetadata
ValueErrorimportlib.utilutil	find_specr   r9   )rP   	importlibr   required_versioncurrent_versioncgraph_spec	cupy_specr*   r*   r+   _check_ray_cgraph_installation  s,   
z5RayDistributedExecutor._check_ray_cgraph_installationr   c                    sv  | j jsJ |   tjdd ddlm}m} t	
dtjd  t	
dtj t	
dtj tj}|dvr?td	| d
| I  fdd| jd D t| jD ]+\}}fddt|D t| jd }||k rtjdkrtjfddD qT|}W d    n1 sw   Y  tjrddlm}	 ddlm}
 |	d|
d t	
d nt	
d |j|tjdS )NRAY_CGRAPH_get_timeout300r   )	InputNodeMultiOutputNodez#RAY_CGRAPH_get_timeout is set to %sz+VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE = %sz+VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s)autor   r8   z:Invalid value for VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: z-. Valid values are: 'auto', 'nccl', or 'shm'.c                    s   g | ]} qS r*   r*   )rk   r   )
input_datar*   r+   r   L  s    z<RayDistributedExecutor._compiled_ray_dag.<locals>.<listcomp>c                    s    g | ]\}}|j  | qS r*   )execute_model_raybind)rk   r   r   )outputsr*   r+   r   P  s    rS   r8   c                    s   g | ]}|j  d qS )	transport)with_tensor_transport)rk   outputr  r*   r+   r   ^  r   )register_accelerator_context)RayPPCommunicatorcuda)torch_module_namecommunicator_clszeUsing RayPPCommunicator (which wraps vLLM _PP GroupCoordinator) for Ray Compiled Graph communication.zCUsing Ray's NCCL communicator for Ray Compiled Graph communication.)r   _overlap_gpu_communication)rC   use_rayr   rA   rB   rf   ray.dagr   r   rZ   r[   r   r9   &VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMMr   r   r   r   VLLM_USE_RAY_WRAPPED_PP_COMM,ray.experimental.channel.accelerator_contextr
  6vllm.distributed.device_communicators.ray_communicatorr  experimental_compile)rP   r   r   r   channel_typer   tp_grouplast_pp_rankr>   r
  r  r*   )r  r  r  r+   r   !  sn   




#z(RayDistributedExecutor._compiled_ray_dagc                 C   s   |    d S rq   )r`   rs   r*   r*   r+   __del__  s   zRayDistributedExecutor.__del__c                 C   s   d S rq   r*   rs   r*   r*   r+   check_health  s   z#RayDistributedExecutor.check_health)r7   N)rD   r   )F)Nr*   NF)'r#   r$   r%   r&   r   r   r5   boolr'   r6   rR   propertyr(   rY   r`   r   r)   r   ri   rp   rt   rF   r   r   r   r   r   r   r   r   r   floattupler   r   r   r   r  r  r*   r*   r*   r+   r,   >   s   
 


	
 q



"
_r,   )5rA   collectionsr   collections.abcr   concurrent.futuresr   dataclassesr   typingr   r   r   	vllm.envsr   vllm.loggerr   vllm.platformsr	   vllm.ray.ray_envr
   vllm.utils.network_utilsr   r   r   vllm.v1.core.sched.outputr   r   vllm.v1.enginer   r   vllm.v1.executor.abstractr   vllm.v1.executor.ray_utilsr   r   r   r   vllm.v1.outputsr   	ray.actorr   ray.util.scheduling_strategiesr   ray.util.placement_groupr   r#   rZ   r   r'   
set_resultr   r,   r*   r*   r*   r+   <module>   s:   

