o
    `۷i                     @   s  U d dl Z d dlmZmZmZmZmZ d dlZd dlm	Z	 d dl
mZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ erEd dlZG d	d
 d
eZi aeeef ed< i aeeef ed< e  adaedddedee dee fddZ g dZ!e dddge e ddge e ddge e ddge deddfddZ"d d!dee	 fd"d#Z#d$d%d&ede$fd'd(Z%d&edefd)d*Z&d&ed+efd,d-Z'd.eded/ fd0d1Z(dS )2    N)TYPE_CHECKINGDictList
NamedTupleOptional)	ObjectRef)GLOOTensorTransportNCCLTensorTransport)CudaIpcTransport)NixlTensorTransport)TensorTransportManagerTensorTransportMetadata)	PublicAPIc                   @   s&   e Zd ZU ee ed< ee ed< dS )TransportManagerInfotransport_manager_classdevicesN)__name__
__module____qualname__typer   __annotations__r   str r   r   ^/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/experimental/gpu_object_manager/util.pyr      s   
 r   transport_manager_infotransport_managersFalpha)	stabilitytransport_namer   r   c                 C   s^   |   } | tv rtd|  dt|tstd|j dt||t| < | tvr-dadS dS )aZ  
    Register a new tensor transport for use in Ray. Note that this needs to be called
    before you create the actors that will use the transport. The actors also
    need to be created in the same process from which you call this function.

    Args:
        transport_name: The name of the transport protocol.
        devices: List of PyTorch device types supported by this transport (e.g., ["cuda", "cpu"]).
        transport_manager_class: A class that implements TensorTransportManager.
    Raises:
        ValueError: If transport_manager_class is not a subclass of TensorTransportManager.
    z
Transport z already registered.ztransport_manager_class z. must be a subclass of TensorTransportManager.TN)	upperr   
ValueError
issubclassr   r   r   DEFAULT_TRANSPORTShas_custom_transports)r   r   r   r   r   r   register_tensor_transport*   s   
r$   )NIXLGLOONCCLCUDA_IPCr%   cudacpur&   r'   r(   returnr   c                 C   st   t . | tv rt|  W  d   S | tvrtd|  t|   t| < t|  W  d   S 1 s3w   Y  dS )a  Get the tensor transport manager for the given tensor transport protocol.

    Args:
        transport_name: The tensor transport protocol to use for the GPU object.

    Returns:
        TensorTransportManager: The tensor transport manager for the given tensor transport protocol.
    N'Unsupported tensor transport protocol: )transport_managers_lockr   r   r    r   )r   r   r   r   get_tensor_transport_managerY   s   $r.   actorzray.actor.ActorHandlec                 C   s4   t sdS dtttf fdd}| jjdd|tS )z
    If there's no custom transports to register, returns None.
    Otherwise returns an object ref for a task on the actor that will register the custom transports.
    Nowner_transport_manager_infoc                 S   s>   ddl m}m} | D ]\}}||vr|||j|j qd S )Nr   )r$   r   )(ray.experimental.gpu_object_manager.utilr$   r   itemsr   r   )selfr0   r$   r   r   transport_infor   r   r   register_transport_on_actor   s   zOregister_custom_tensor_transports_on_actor.<locals>.register_transport_on_actor_ray_system)concurrency_group)r#   r   r   r   __ray_call__optionsremoter   )r/   r5   r   r   r   *register_custom_tensor_transports_on_actoru   s   


r;   deviceztorch.devicetensor_transportc                 C   s&   |t vrtd| | jt | jv S )z*Check if the device matches the transport.r,   )r   r    r   r   )r<   r=   r   r   r   device_match_transport   s   r>   c                 C   s"   |   } | tvrtd|  | S )NzInvalid tensor transport: )r   r   r    )r=   r   r   r   'normalize_and_validate_tensor_transport   s   r?   ray_usage_funcc                 C   s(   t |  j std|  d| dd S )Nz*Trying to use two-sided tensor transport: z for zS. This is only supported for one-sided transports such as NIXL or the OBJECT_STORE.)r   r   is_one_sidedr    )r=   r@   r   r   r   validate_one_sided   s   rB   tensor_transport_metaztorch.Tensorc                 C   sD   dd l }g }| j}| jD ]}|\}}|j|||d}|| q|S )Nr   )dtyper<   )torchtensor_devicetensor_metaemptyappend)rC   rE   tensorsr<   metashaperD   tensorr   r   r   "create_empty_tensors_from_metadata   s   
rN   ))	threadingtypingr   r   r   r   r   rayray._rayletr   ?ray.experimental.gpu_object_manager.collective_tensor_transportr   r	   6ray.experimental.gpu_object_manager.cuda_ipc_transportr
   9ray.experimental.gpu_object_manager.nixl_tensor_transportr   <ray.experimental.gpu_object_manager.tensor_transport_managerr   r   ray.util.annotationsr   rE   r   r   r   r   r   Lockr-   r#   r   r$   r"   r.   r;   boolr>   r?   rB   rN   r   r   r   r   <module>   s^   
 &

"		
