o
    `۷iE                     @   s  d dl Z d dlZd dlmZmZmZmZ d dlZd dlm	Z	 d dl
mZmZmZmZ d dlmZ d dlmZ d dlmZmZmZ e eZedd	d
edefddZedd	dee fddZedd	dee fddZedd	defddZedd		d*ded
ededeeee f  def
ddZ!edd			d+ded
edeeee f  dedeeeee f f f
ddZ"edd		d,deded ed!edeeef f
d"d#Z#edd	G d$d% d%Z$edd	e		d+ded&ed'eeee f  dede$f
d(d)Z%dS )-    N)DictListOptionalTuple)TPUAcceleratorManager)VALID_TPU_TYPESget_chips_per_hostget_num_chips_from_topologyreserve_tpu_slice)client_mode_wrap)	PublicAPI)PlacementGroupplacement_groupremove_placement_groupalpha)	stabilityaccelerator_typereturnc                 C   sb   |   }|dr|dd}n|dr|dd}n|}|tvr/td|  dtt d|S )a  Extracts the version from the accelerator type.

    Args:
        accelerator_type: The full accelerator type string (e.g. "TPU-V6E").

    Returns:
        The version string (e.g. "v6e").

    Raises:
        ValueError: If the accelerator type is invalid.
    ztpu- tpuvzInvalid accelerator_type: z. Must be one of z/ or start with 'TPU-' followed by a valid type.)lower
startswithreplacer   
ValueErrorlist)r   accel_type_lowerversion r   B/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/util/tpu.pyget_tpu_version_from_type   s   

r    c                  C   s   t  } | dkr
d} | S )z
    Return the name of the TPU pod that the worker is a part of.

    Returns:
        The name of the TPU pod. Returns None if not part of a TPU pod.
    r   N)r   get_current_node_tpu_name)tpu_namer   r   r   get_current_pod_name7   s   r#   c                   C      t  S )z
    Count the number of workers associated with the TPU pod that the worker belongs to.

    Returns:
        The total number of workers in the TPU pod. Returns None if the worker is not
        part of a TPU pod.
    )r   "get_num_workers_in_current_tpu_podr   r   r   r   get_current_pod_worker_countE   s   	r&   c                   C   r$   )z
    Return the number of TPU chips on the node.
    Returns:
        The total number of chips on the TPU node. Returns 0 if none are found.
    )r   !get_current_node_num_acceleratorsr   r   r   r   get_num_tpu_chips_on_nodeQ   s   r(   topologynum_workersresources_per_workerc                 C   sZ   | r|sdS zt | ||dd\}}|dkrW dS tdt|| W S  ty,   Y dS w )a  
    Calculates the number of slices needed to accommodate the specified number of workers.

    Args:
        topology: The TPU topology string.
        accelerator_type: The accelerator type string.
        num_workers: The desired number of workers.
        resources_per_worker: Optional dict of resources per worker.

    Returns:
        The number of slices required. Returns 1 if inputs are invalid or incomplete.
       r)   r   resources_per_unit
num_slicesr   )get_tpu_worker_resourcesmaxmathceil	Exception)r)   r   r*   r+   workers_per_slice_r   r   r   get_tpu_num_slices_for_workers[   s   
r7   r,   r.   r/   c                 C   s   t |}t| |}t| }|| }|r| ni }d|vr!d|d< d|vr)||d< |d }	|	dkr5td||	 dkrFtd| d|	 d||	 dkrWtd	|	 d
| dt||	 }
|
|fS )aF  
    Calculates the number of workers and the resources required for each worker
    to run based on a TPU topology.

    Args:
        topology: The TPU topology string.
        accelerator_type: The accelerator string.
        resources_per_unit: Optional manual override for resources per unit. If
            unspecified, the number of TPU chips in a host is assumed.
        num_slices: The number of TPU slices.

    Returns:
        A tuple containing:
        - num_workers: Total workers required.
        - unit_resources: The resource dictionary for a single worker.
    CPUr,   TPUr   zTPU resources must be positive.zTotal chips (z,) not divisible by TPUs requested per unit (z).z$The requested resources per bundle (zE TPU chips) do not divide evenly into the chips available per slice (zi). This configuration results in an uneven distribution of workers across slices, which is not supported.)r    r   r	   copyr   int)r)   r   r.   r/   accelerator_versionchips_per_hosttotal_chips_per_slicetotal_chips_availablefinal_resourcestpus_per_unitr*   r   r   r   r0      s4   
r0   8081coordinator_addressslice_idcoordinator_portc                 C   s   | |t |t |dS )a  
    Returns the environment variables required for JAX multi-slice coordination.

    Args:
        coordinator_address: The IP address or hostname of the coordinator.
        num_slices: The total number of slices in the cluster.
        slice_id: The index of the current slice.
        coordinator_port: The port the coordinator is listening on.

    Returns:
        A dictionary mapping environment variable names to their values.
    )MEGASCALE_COORDINATOR_ADDRESSMEGASCALE_PORTMEGASCALE_NUM_SLICESMEGASCALE_SLICE_ID)str)rC   r/   rD   rE   r   r   r   get_tpu_coordinator_env_vars   s
   rK   c                   @   sb  e Zd ZdZ					d,dededeeeef  d	ed
edee defddZ	defddZ
dd Z			d-d	ed
edee defddZedefddZedefddZedefddZedefddZedefddZedefd d!Zedefd"d#Zedee fd$d%Zedeeeef  fd&d'Zedeeef fd(d)Zd*d+ ZdS ).SlicePlacementGroupa.
  
    A handle to a placement group reservation for a TPU slice.

    The following definitions are added for clarity:

    - Accelerator type: A string describing the accelerator type and version (e.g. TPU-V2, TPU-V6E).
    - Accelerator version: The accelerator generation only (e.g. v6e, v5p, v5litepod).
    - Pod type: The TPU accelerator version and the number of chips in a topology. (e.g. v6e-128, v5p-8).
    - Accelerator topology: The physical topology representing the structure (e.g. 2x2x2, 16x16).

        Args:
            topology: The TPU topology string (e.g. "2x2x2").
            accelerator_version: The TPU accelerator generation (e.g. "v6e", "v5p", "v4").
            resources_per_bundle: Optionally specify the resources to include in every worker bundle.
            strategy: PlacementGroup parameter. The strategy to create the placement group. Currently default to "SPREAD"

             - "PACK": Packs Bundles into as few nodes as possible.
             - "SPREAD": Places Bundles across distinct nodes as even as possible.
             - "STRICT_PACK": Packs Bundles into one node. The group is
               not allowed to span multiple nodes.
             - "STRICT_SPREAD": Packs Bundles across distinct nodes.

            lifetime: PlacementGroup parameter. Either `None`, which defaults to the placement group
                will fate share with its creator and will be deleted once its
                creator is dead, or "detached", which means the placement group
                will live as a global object independent of the creator.

            num_slices: Number of TPU slices in the SlicePlacementGroup. Defaults to 1 when unspecified.

        Examples:

        .. testcode:: python
            :skipif: True

            import ray
            from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
            from ray.util.tpu import SlicePlacementGroup

            slice_handle = SlicePlacementGroup(topology="4x4", accelerator_version="v6e")
            slice_pg = slice_handle.placement_group
            ray.get(slice_pg.ready(), timeout=10)

            @ray.remote(num_cpus=0, resources={'TPU': 4})
            def spmd_task(world, rank):
                print(f"Current TPU is rank {rank} of {world}")

            tasks = [
                spmd_task.options(
                    scheduling_strategy=PlacementGroupSchedulingStrategy(
                        placement_group=slice_pg,
                    )
                ).remote(world=4, rank=i)
                for i in range(slice_handle.num_hosts)
            ]

    NSPREADr   r,   r)   r<   resources_per_bundlestrategynamelifetimer/   c           
      C   s   |   | _|   | _|pi | _|| _t| j| j|| jd\| _| _t	| j| j| _
t| j}td|| j
 }	|	| j | _g | _g | _|   d | _| |||| _d S )Nr-   r,   )stripr   	_topology_accelerator_version_resources_per_bundle_num_slicesr0   _num_bundles_bundle_resourcesr   _chips_per_hostr	   r1   
_num_hosts	_head_pgs_bundle_label_selector_validate_tpu_config_placement_group_reserve_slice)
selfr)   r<   rN   rO   rP   rQ   r/   total_chipshosts_per_slicer   r   r   __init__  s2   


zSlicePlacementGroup.__init__c                 C   s    |t vrtd| dt  d S )NzInvalid accelerator version: z. Must be one of: )r   r   )r`   r<   r   r   r   _accelerator_version_checkH  s
   z.SlicePlacementGroup._accelerator_version_checkc                 C   s<   |  | j tj| j| jdstd| j d| j dd S )N)tpu_accelerator_versiontpu_topologyzInvalid accelerator topology: 'z' for accelerator version: '')rd   r<   r   !is_valid_tpu_accelerator_topologyrS   r   r`   r   r   r   r]   N  s   
z(SlicePlacementGroup._validate_tpu_configr   c              	      s   g  _ g } j j }d j  }zSt jD ]@}t j|}|s2t	d j d j d| d|\}	}
 j
|
 tjj|	i} j |g|  | fddt|D 7 }qt|||| j d}|W S  tyq       w )	z8Performs the two-step scheduling to reserve a TPU slice.zTPU-z'Failed to reserve TPU slice. Requested z slice(s) of topology 'z' with accelerator type 'zE'. Ensure that sufficient TPU resources are available in the cluster.c                    s   g | ]} j  qS r   )rX   r:   ).0r6   ri   r   r   
<listcomp>|  s    
z6SlicePlacementGroup._reserve_slice.<locals>.<listcomp>)bundlesrO   rP   rQ   bundle_label_selector)r\   rW   rV   r<   upperranger/   r
   rS   RuntimeErrorr[   appendray_rayletRAY_NODE_TPU_SLICE_NAME_KEYextendr   r4   shutdown)r`   rO   rP   rQ   rl   bundles_per_slicer   r6   reservation
slice_namehead_pgselectorpgr   ri   r   r_   [  sD   

z"SlicePlacementGroup._reserve_slicec                 C      | j S )z%The underlying PlacementGroup object.)r^   ri   r   r   r   r        z#SlicePlacementGroup.placement_groupc                 C   r}   )z0The number of chips per host for this TPU slice.)rY   ri   r   r   r   r=     s   z"SlicePlacementGroup.chips_per_hostc                 C   r}   )z5The total number of hosts in the SlicePlacementGroup.)rZ   ri   r   r   r   	num_hosts  r~   zSlicePlacementGroup.num_hostsc                 C   r}   )z7The total number of bundles in the SlicePlacementGroup.)rW   ri   r   r   r   num_bundles  r~   zSlicePlacementGroup.num_bundlesc                 C   r}   )z'The physical topology of the TPU slice.)rS   ri   r   r   r   r)     r~   zSlicePlacementGroup.topologyc                 C   r}   )z&The TPU accelerator type of the slice.)rT   ri   r   r   r   r<     r~   z'SlicePlacementGroup.accelerator_versionc                 C   r}   )z8The number of TPU slices this SlicePlacementGroup spans.)rV   ri   r   r   r   r/     r~   zSlicePlacementGroup.num_slicesc                 C   r}   )z1The internal head PGs used to reserve the slices.)r[   ri   r   r   r   head_placement_groups  r~   z)SlicePlacementGroup.head_placement_groupsc                 C   r}   )z1The bundle label selector list for the worker PG.)r\   ri   r   r   r   rm     r~   z)SlicePlacementGroup.bundle_label_selectorc                 C   r}   )z/The resources that are assigned to each bundle.)rX   ri   r   r   r   bundle_resources  r~   z$SlicePlacementGroup.bundle_resourcesc                 C   s4   | j rt| j  d| _ | jD ]}t| qg | _dS )z=Removes the worker placement group and all internal head PGs.N)r^   r   r[   )r`   rz   r   r   r   rv     s   



zSlicePlacementGroup.shutdown)NrM   r   Nr,   )rM   r   N)__name__
__module____qualname____doc__rJ   r   r   floatr;   rc   rd   r]   r   r_   propertyr   r=   r   r   r)   r<   r/   r   r   rm   r   rv   r   r   r   r   rL      sr    =

-
2rL   r<   rN   c                 K   s   t d| |||d|S )a9  Asynchronously creates a PlacementGroup for a TPU slice.

    A slice placement group reserves num_slices TPU slice(s) and creates a placement
    group for scheduling tasks or actors.

    Args:
        topology: The desired TPU pod topology (e.g. "4x4", "2x8").
        accelerator_version: The TPU accelerator generation, (e.g. "v4", "v5p", "v6e").
        resources_per_bundle: Specify the number of resources to reserve per bundle.
            When unspecified, SlicePlacementGroup defaults to reserving 1 bundle per TPU host in
            a topology, with the bundle resources set to the number of TPU in a host.
            Ex: Specifying {"TPU": 1} for a 4x4 topology would result in 16 bundles, each with 1 TPU.
            If resources_per_bundle=None for the same topology, there would be 4 bundles with 4 TPU each.
        num_slices: The number of tpu slices within the placement group
        **kwargs: Additional arguments for the placement group, such as 'name', 'lifetime', or 'strategy'.

    Returns:
        The handle for the created SlicePlacementGroup.
    )r)   r<   rN   r/   Nr   )rL   )r)   r<   rN   r/   kwargsr   r   r   slice_placement_group  s   r   )N)Nr,   )rB   )&loggingr2   typingr   r   r   r   rr   ray._private.acceleratorsr   ray._private.accelerators.tpur   r   r	   r
   ray._private.client_mode_hookr   ray.util.annotationsr   ray.util.placement_groupr   r   r   	getLoggerr   loggerrJ   r    r#   r;   r&   r(   r   r7   r0   rK   rL   r   r   r   r   r   <module>   s    
	(@
 j