o
    i`                     @   sj  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZ d dlZd dlZd dlmZmZmZmZmZ d dlZd dlZzd dlmZ W n# eys   zd dlmZ W n eyp Z zededZ[ww Y nw dd	lmZ d
dlmZmZ d
dl m!Z! dZ"dZ#dZ$de%de%de%fddZ&de%de'dej(de%dej)f
ddZ*de%de%de%de+fddZ,dee% de%fdd Z-G d!d" d"eZ.erd d#l/m0Z0 d$d% Z1G d&d' d'Z2G d(d) d)e.Z3G d*d+ d+e.Z4eG d,d- d-Z5G d.d/ d/Z6G d0d1 d1Z7G d2d3 d3eZ8G d4d5 d5e8Z9G d6d7 d7e8Z:d8e%de+fd9d:Z;G d;d< d<Z<G d=d> d>Z=dS )?    N)ABCabstractmethod)	dataclass)AnyDictListOptionalTYPE_CHECKING)driver)cudaz^Could not import the 'cuda' module. Please install cuda-python that matches your CUDA version.   )checkCudaErrors   )create_dlpack_capsulepack_strided_memory)Mapping	   i   Fvalgranreturnc                 C   s   | | d |d  @ S )z6Efficient implementation assuming gran is a power of 2r    )r   r   r   r   K/home/ubuntu/vllm_env/lib/python3.10/site-packages/flashinfer/comm/mnnvl.pyround_up=   s   r   ptrshapedtype	device_idc           	      C   sZ   d}|D ]}||9 }qt jg |d }t| |||||}t jj|j}||_|	|S )a0  
    Create a PyTorch tensor from a CUDA memory pointer using DLPack.

    Args:
        ptr: CUDA memory pointer address as integer
        shape: Desired tensor shape
        dtype: PyTorch data type
        device_id: CUDA device ID

    Returns:
        PyTorch tensor that wraps the CUDA memory
    r   )r   )
torchtensorelement_sizer   utilsdlpackfrom_dlpackcapsule_capsule_wrapperview)	r   r   r   r   numeldimr   capsule_wrapperr   r   r   r   create_tensor_from_cuda_memoryB   s   

r)   sizec              
   C   s   z&t d|}t|}tt|| | tt| || td| d W dS  tyD } ztd| dd|  W Y d}~dS d}~ww )	a  
    Test if CUDA memory at ptr is accessible by trying to read/write a small amount.

    Args:
        ptr: CUDA memory pointer
        size: Size of memory region
        device_id: CUDA device ID

    Returns:
        True if memory is accessible, False otherwise
       z+DEBUG: Memory access test PASSED for ptr=0xxTz+DEBUG: Memory access test FAILED for ptr=0x: NF)min	bytearrayr   r   cuMemcpyDtoHcuMemcpyHtoDprint	Exception)r   r*   r   	test_size	host_dataer   r   r   test_cuda_memory_accessf   s   
r7   host_ptr_arrayc                 C   sV   | sdS t jt|  }||  }t |}tt|}tt|t || t	|S )zj
    A helper function that allocates memory on cuda and copies the data from the host to the device.
    N)
ctypesc_uint64lensizeofr   r   
cuMemAllocr1   	addressofint)r8   	ArrayTypec_arraysize_in_bytes
device_ptrr   r   r   alloc_and_copy_to_cuda   s   
rD   c                   @   s   e Zd ZdZedefddZedefddZededee fdd	Z	ede
d
ede
fddZedddZedededd fddZdS )CommBackendz(Abstract communication backend interfacer   c                 C      d S Nr   selfr   r   r   Get_rank      zCommBackend.Get_rankc                 C   rF   rG   r   rH   r   r   r   Get_size   rK   zCommBackend.Get_sizedatac                 C   rF   rG   r   rI   rM   r   r   r   	allgather   rK   zCommBackend.allgatherrootc                 C   rF   rG   r   rI   rM   rP   r   r   r   bcast   rK   zCommBackend.bcastNc                 C   rF   rG   r   rH   r   r   r   barrier   rK   zCommBackend.barriercolorkeyc                 C   rF   rG   r   rI   rT   rU   r   r   r   Split   rK   zCommBackend.Splitr   N)__name__
__module____qualname____doc__r   r?   rJ   rL   r   rO   r   rR   rS   rW   r   r   r   r   rE      s    rE   MPIc               
   C   s6   z	ddl m}  | W S  ty } ztd|d}~ww )zLazy import for mpi4pyr   r]   zmpi4py is not installedN)mpi4pyr^   ImportError)r^   errr   r   r   lazy_import_mpi   s   
rb   c                   @   sL   e Zd ZU dZeed< dZeed< edd ZedefddZ	d	d
 Z
dS )MpiCommN_comm_MPIc                 C   s"   | j d u rt | _ | j j| _| j S rG   )re   rb   
COMM_WORLDrd   )clsr   r   r   _get_mpi   s   

zMpiComm._get_mpinew_commc                 C   s   |    || _d S rG   )rh   rd   )rg   ri   r   r   r   set_mpi_comm   s   
zMpiComm.set_mpi_commc                 C   s   | j d u r	|   t| j |S rG   )rd   rh   getattr)rI   namer   r   r   __getattr__   s   
zMpiComm.__getattr__)rY   rZ   r[   rd   r   __annotations__re   classmethodrh   rj   rm   r   r   r   r   rc      s   
 
rc   c                   @   sz   e Zd Zdd ZdefddZdefddZdedee fd	d
Zde	dede	fddZ
dd ZdededefddZdS )
MPIBackendc                 C   s   t  | _d S rG   )rc   _mpicommrH   r   r   r   __init__      zMPIBackend.__init__r   c                 C   
   | j  S rG   )rq   rJ   rH   r   r   r   rJ         
zMPIBackend.Get_rankc                 C   rt   rG   )rq   rL   rH   r   r   r   rL      ru   zMPIBackend.Get_sizerM   c                 C      | j |S rG   )rq   rO   rN   r   r   r   rO      rs   zMPIBackend.allgatherrP   c                 C   s   | j ||S rG   )rq   rR   rQ   r   r   r   rR         zMPIBackend.bcastc                 C      | j   d S rG   )rq   BarrierrH   r   r   r   rS      rw   zMPIBackend.barrierrT   rU   c                 C   s   | j ||| _ t S rG   )rq   rW   rp   rV   r   r   r   rW      s   zMPIBackend.SplitN)rY   rZ   r[   rr   r?   rJ   rL   r   rO   r   rR   rS   rE   rW   r   r   r   r   rp      s    rp   c                   @   s   e Zd ZdZddee fddZdefddZdefd	d
Z	dede
e fddZdededefddZdddZdededd fddZdS )TorchDistBackendz-Communication backend using torch.distributedNgroupc                 C   s,   ddl m} | std|| _|| _dS )z
        Initialize TorchDistBackend.

        Args:
            group: Optional process group. If None, uses the default process group.
        r   Nz_torch.distributed is not initialized. Please call torch.distributed.init_process_group() first.)torch.distributeddistributedis_initializedRuntimeError_group_dist)rI   r{   distr   r   r   rr      s   
zTorchDistBackend.__init__r   c                 C      | j | jS rG   )r   get_rankr   rH   r   r   r   rJ      rw   zTorchDistBackend.Get_rankc                 C   r   rG   )r   get_world_sizer   rH   r   r   r   rL     rw   zTorchDistBackend.Get_sizerM   c                 C   s&   dg|    }| jj||| jd |S )z5All-gather arbitrary Python objects across all ranks.Nr{   )rL   r   all_gather_objectr   )rI   rM   output_listr   r   r   rO     s   zTorchDistBackend.allgatherrP   c                 C   s"   |g}| j j||| jd |d S )z1Broadcast a Python object from root to all ranks.)srcr{   r   )r   broadcast_object_listr   )rI   rM   rP   object_listr   r   r   rR     s   zTorchDistBackend.bcastc                 C   s   | j j| jd d S )Nr   )r   rS   r   rH   r   r   r   rS     s   zTorchDistBackend.barrierrT   rU   c                 C   s   |   }| |||f}i }|D ]\}}}||vrg ||< || ||f q|D ]}|| jdd d q)dd || D }	| jj|	d}
t|
dS )a  
        Split the communicator into sub-groups based on color.

        All processes with the same color will be in the same new group.
        The key determines the rank ordering within the new group.

        Args:
            color: Processes with the same color are placed in the same group
            key: Determines rank ordering within the new group (lower key = lower rank)

        Returns:
            New TorchDistBackend with the split process group
        c                 S   s   | d S )Nr   r   )r,   r   r   r   <lambda>0  s    z(TorchDistBackend.Split.<locals>.<lambda>)rU   c                 S   s   g | ]\}}|qS r   r   ).0_rr   r   r   
<listcomp>3  s    z*TorchDistBackend.Split.<locals>.<listcomp>)ranksr   )rJ   rO   appendsortr   	new_grouprz   )rI   rT   rU   global_rankall_infocolor_groupsckr   my_group_ranksr   r   r   r   rW     s   
zTorchDistBackend.SplitrG   rX   )rY   rZ   r[   r\   r   r   rr   r?   rJ   rL   r   rO   rR   rS   rW   r   r   r   r   rz      s    
rz   c                   @   s:   e Zd ZU dZdZee ed< dZe	ed< dZ
e	ed< dS )MnnvlConfigz)Configuration for MNNVL memory managementNcomm_backendr   allocation_granularity    fabric_page_size)rY   rZ   r[   r\   r   r   rE   rn   r   r?   r   r   r   r   r   r   ;  s
   
 r   c                   @   s  e Zd ZU dZeed< dZeed< dZeed< dZ	eed< dZ
eed< dZeed	< d
Zee ed< d
Zeed< i Zeeef ed< i Zeeef ed< d
Zee ed< dedefddZdd Zdd Zedd Zed0dedefddZedefddZedefddZedefd d!Zededefd"d#Z ededefd$d%Z!ed&efd'd(Z"ed1d*efd+d,Z#ed-efd.d/Z$d
S )2MnnvlMemoryFinitializedr   current_mem_offsetcurrent_rank_stridecurrent_start_addressr   r   r   Ncommdev_idallocated_mapaddress_refcntconfigmappingr*   c                 C   s&   || _ || _t| j |\| _| _d S rG   )r   segment_sizer   open_mnnvl_memoryr   rank_stride)rI   r   r*   r   r   r   rr   [  s   zMnnvlMemory.__init__c                 C   s   t  st| j d S d S rG   )sysis_finalizingr   close_mnnvl_memoryr   rH   r   r   r   __del__`  s   zMnnvlMemory.__del__c                 C   s$   t j }t| j| j| j||t jS rG   )r   r   rL   r   r   r   r   r   )rI   r   num_segmentsr   r   r   as_torch_strided_tensord  s   
z#MnnvlMemory.as_torch_strided_tensorc                  C   sL   t js$tjddd} zt  W n tjy   t  Y nw dt _d S d S )Nr   r   )deviceT)r   r   r   emptypynvmlnvmlDeviceGetCountNVMLError_UninitializednvmlInit)r   r   r   r   
initializeo  s   
zMnnvlMemory.initializec                 C   s:   |pt t dt_|j| j| j | j | j	}|t_
d S )N)r   )r   rp   r   r   r   rW   pp_rankcp_sizecp_ranktp_rankr   )r   r   r   r   r   r   set_comm_from_config{  s
   
z MnnvlMemory.set_comm_from_configc                 C   s8   t jd urt jS t | j| j | j | j}|t _|S rG   )r   r   rc   rW   r   r   r   r   )r   r   r   r   r   get_comm  s   
zMnnvlMemory.get_commc                 C   sb   t  }t jj|_| |_t  }t jj|_t	
  }d|v }|r't jj|_nt jj|_||_|S )Naarch64)r   CUmemLocationCUmemLocationTypeCU_MEM_LOCATION_TYPE_DEVICEtypeidCUmemAllocationPropCUmemAllocationTypeCU_MEM_ALLOCATION_TYPE_PINNEDplatformmachinelowerCUmemAllocationHandleTypeCU_MEM_HANDLE_TYPE_FABRICrequestedHandleTypes(CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTORlocation)r   r   allocation_proparchis_on_aarch64r   r   r   get_allocation_prop  s   

zMnnvlMemory.get_allocation_propc                 C   sF   t jdkrt jS t | }ttjj}ttj||d}|t _t jS )Nr   )propoption)r   r   r   r    CUmemAllocationGranularity_flags$CU_MEM_ALLOC_GRANULARITY_RECOMMENDEDr   cuMemGetAllocationGranularity)r   r   r   granularityr   r   r   get_allocation_granularity  s   

z&MnnvlMemory.get_allocation_granularityc                 C   sx   |t j d t j }|t j }td|  t | }| }|| }tt|t jdd}t	|t _
|t _dt _d S )Nr   z+[MnnvlMemory] creating address with stride=r   )r   r   logginginfor   rL   r   r   cuMemAddressReserver?   r   r   r   )r   r*   
page_countr   r   	comm_sizeaddress_sizer   r   r   r   new_mnnvl_memory_address  s    



z$MnnvlMemory.new_mnnvl_memory_addressc           #         sp  t t }t|}tjd u r|t_|tjks"J d| dtj t| }| }| }|	 }t
||ks<J t fdd|D sKJ dt|} | d | | }	tj|	 tjkrht| |	 tj|	 tjksrJ t|}
t tj|	|
dd}t t||
jd}|
jtjjkr|	|j}n|	|}|	t }tjd d	d
}|j}d}d}g }|D ]%}|||d}|dk rt }td| d| dt| || qg }t ||d	dD ]@\}}||||d}|dk r!t }d| d| d| dt| d	}|dkr|d7 }t||d7 }t||| q|}t! }|
j"|_"tj#j$|_%d g| }t&|D ]H\}}tj'tj|  tj }||krc|||< t t(||	d|d nt t)||
j} | ||< t t(||	d| d t t*||	|gd q?tj'tj }!tj}"| |	|tj'tjtjftj+|!< tj,-tj'dd tj,tj'< t j|	7  _|!|"fS )NzDifferent dev_id found dev_id=z but MnnvlMemory.dev_id=c                 3   s    | ]}| kV  qd S rG   r   )r   r,   r*   r   r   	<genexpr>  s    z0MnnvlMemory.open_mnnvl_memory.<locals>.<genexpr>z"Not all rank allocating same size.r   r   )flagsT)	use_errnoi  i  zpidfd_open(z) failed with errno r-   )strictzpidfd_getfd(pidfd=z, fd=.zj Permission denied. If running in a container, try adding --cap-add=SYS_PTRACE to your docker run command.z9 This may be due to kernel version (requires Linux 5.6+).).r   r   cuCtxGetDevicer?   r   r   r   rJ   rL   rO   r;   allr   r   r   r   r   cuMemCreatecuMemExportToShareableHandler   r   r   rM   osgetpidr9   CDLLsyscall	get_errnor   strerrorr   zipCUmemAccessDescr   CUmemAccess_flags"CU_MEM_ACCESS_FLAGS_PROT_READWRITEr   	enumerater   cuMemMapcuMemImportFromShareableHandlecuMemSetAccessr   r   get)#r   r*   devr   r   	comm_rankr   all_rank_allocate_sizesr   aligned_sizer   allocated_mem_handleexported_fabric_handleall_handles_dataall_pidslibcr   SYS_pidfd_openSYS_pidfd_getfdpidfdspidpidfdra   
remote_fdsfd	remote_fd	error_msgmadescmem_handlesiremote_handle_datarank_ptrimported_mem_handler   strider   r   r   r     s   






$




	
zMnnvlMemory.open_mnnvl_memoryr   c                 C   s   t j| \}}}}}}t |}| }t|D ]}	||	|  | }
tt|
| tt	||	  qt j
|  d8  < t j
| dkrjt j
| t|}tt|||  |t jkrldt _dt _dt _d S d S d S )Nr   r   )r   r   popr   rL   ranger   r   
cuMemUnmapcuMemReleaser   CUdeviceptrcuMemAddressFreer   r   r   )r   r   r   r  start_addressr   address_offsetr   r   r  r  rC   r   r   r   r   I  s2   
	



zMnnvlMemory.close_mnnvl_memoryTneed_all_upc              	   C   s   t j }t|}tj}d}d}t|D ]&}zt||tjr0|d7 }t	||}|r0|d7 }W q tj
y;   Y qw | rF||koE|dkS |dkS )Nr   r   )r   r   current_devicer   nvmlDeviceGetHandleByIndexNVML_NVLINK_MAX_LINKSr  nvmlDeviceGetNvLinkCapabilityNVML_NVLINK_CAP_P2P_SUPPORTEDnvmlDeviceGetNvLinkStateNVMLError_NotSupported)r  r   handle
link_countactive_linksavailable_linkslink_idx	is_activer   r   r   support_nvlinkd  s.   

zMnnvlMemory.support_nvlinkr   c                  C   s   t d} | S )NT)r   r,  )support_nvlink_and_all_upr   r   r   supports_mnnvl|  s   
zMnnvlMemory.supports_mnnvlrG   T)%rY   rZ   r[   r   boolrn   r   r?   r   r   r   r   r   r   rE   r   r   r   r   r   r   r   r   rr   r   r   staticmethodr   r   r   r   r   r   r   r   r,  r.  r   r   r   r   r   D  sH   
 
	 r   c                   @   sP   e Zd ZdZddedefddZdded	ed
ee fddZdd Zdd Z	dS )	IpcSocketz2Unix Domain Socket for IPC file descriptor passingTrankop_idc                 C   s   || _ || _|| _ttjtj| _d| d|d}|r#d| | _n|| _t	t
 t| W d   n1 s;w   Y  | j| j dS )z
        Initialize IPC socket

        Args:
            rank: Process rank
            op_id: Unique operation ID (hash)
            use_abstract: Use Linux abstract socket namespace
        /tmp/mcastmem-socket--r,    N)r3  r4  use_abstractsocketAF_UNIX
SOCK_DGRAMsocksocket_path
contextlibsuppressFileNotFoundErrorr   unlinkbind)rI   r3  r4  r8  socket_namer   r   r   rr     s   	zIpcSocket.__init__Nr  	dest_rank
dest_op_idc           	      C   sn   |p| j }d| d|d}| jrd| }n|}d}td|g}tjtj| fg}| j|g|d| dS )	z
        Send a file descriptor to another process

        Args:
            fd: File descriptor to send
            dest_rank: Destination process rank
            dest_op_id: Destination operation ID
        r5  r6  r,   r7      r  r   N)	r4  r8  arrayr9  
SOL_SOCKET
SCM_RIGHTStobytesr<  sendmsg)	rI   r  rD  rE  dest_socket_name	dest_path
dummy_datafds	ancillaryr   r   r   send_fd  s   


zIpcSocket.send_fdc           	      C   s   t  d}| jdt|j\}}}}|D ],\}}}|tjkrB|tjkrBt  d}||dt	|t	||j    |d   S qt
d)z|
        Receive a file descriptor from another process

        Returns:
            int: Received file descriptor
        r  r   Nr   zNo file descriptor received)rG  r<  recvmsgr9  
CMSG_SPACEitemsizerH  rI  	frombytesr;   r   )	rI   rO  msgancdatar   addr
cmsg_level	cmsg_type	cmsg_datar   r   r   recv_fd  s    
	
zIpcSocket.recv_fdc                 C   sZ   | j   | js)| jr+tt t| j W d   dS 1 s"w   Y  dS dS dS )zClose the socketN)	r<  closer8  r=  r>  r?  r@  r   rA  rH   r   r   r   r]    s   
"zIpcSocket.closer/  rG   )
rY   rZ   r[   r\   r?   rr   r   rQ  r\  r]  r   r   r   r   r2    s    r2  c                   @   s~   e Zd ZdZdddedefddZeedej	fd	d
Z
edefddZedefddZedddZedddZdS )HandleExchangerzFAbstract interface for exchanging CUDA shareable handles across ranks.r   rE   
group_rank
group_sizec                 C   s   || _ || _|| _d S rG   )r   r3  r*   rI   r   r_  r`  r   r   r   rr     s   
zHandleExchanger.__init__r   c                 C      dS )z/The CUDA handle type this exchanger works with.Nr   rH   r   r   r   handle_type  s   zHandleExchanger.handle_typec                 C   rb  )z,All-gather shareable handles from all ranks.Nr   rI   local_handler   r   r   rO        zHandleExchanger.allgatherrP   c                 C   rb  )z*Broadcast a handle from root to all ranks.Nr   rI   r&  rP   r   r   r   	broadcast  rf  zHandleExchanger.broadcastNc                 C   rF   rG   r   rI   r&  r   r   r   cleanup  rK   zHandleExchanger.cleanupc                 C   rF   rG   r   rH   r   r   r   r]    rK   zHandleExchanger.closerX   )rY   rZ   r[   r\   r?   rr   propertyr   r   r   rc  r   rO   rh  rj  r]  r   r   r   r   r^    s    r^  c                   @   sT   e Zd ZdZedejfddZdefddZ	de
fdd	ZdddZdddZd
S )FabricHandleExchangerzEHandle exchange using CUDA Fabric handles via MPI/collective backend.r   c                 C      t jjS rG   )r   r   r   rH   r   r   r   rc       z!FabricHandleExchanger.handle_typec                 C   s   | j |jS rG   )r   rO   rM   rd  r   r   r   rO     rw   zFabricHandleExchanger.allgatherrP   c                 C   s    | j j|r|j|dS d |dS )NrP   )r   rR   rM   rg  r   r   r   rh    s    zFabricHandleExchanger.broadcastNc                 C   rF   rG   r   ri  r   r   r   rj    rK   zFabricHandleExchanger.cleanupc                 C   rF   rG   r   rH   r   r   r   r]    rK   zFabricHandleExchanger.closerX   )rY   rZ   r[   r\   rk  r   r   rc  r   rO   r?   rh  rj  r]  r   r   r   r   rl  	  s    
rl  c                       s   e Zd ZdZdddedef fddZdefd	d
Zede	j
fddZdefddZdefddZdddZdddZ  ZS )PosixFDHandleExchangerz=Handle exchange using POSIX file descriptors via IPC sockets.r   rE   r_  r`  c                    s   t  ||| |  | _d S rG   )superrr   _init_ipc_socket_socketra  	__class__r   r   rr      s   zPosixFDHandleExchanger.__init__r   c                 C   s8   | j dkrtdd}nd }| jj|dd}t| j |S )Nr   l    ro  )r3  randomrandintr   rR   r2  )rI   opIdr   r   r   rr  $  s
   
z'PosixFDHandleExchanger._init_ipc_socketc                 C   rm  rG   )r   r   r   rH   r   r   r   rc  ,  rn  z"PosixFDHandleExchanger.handle_typec                 C   sh   d g| j  }t| j D ]&}| j  | j|| j| | j   | j| j  | | j  }| j ||< q|S rG   )r*   r  r   rS   rs  rQ  r3  r\  )rI   re  resultr  r   r   r   r   rO   0  s   
z PosixFDHandleExchanger.allgatherrP   c                 C   s   | j |krtd| jD ]}| j  | j|| q|S t| j D ]}| j  q!| j }t| j| j  d D ]}| j  q8|S )Nr   )r3  r  r*   r   rS   rs  rQ  r\  )rI   r&  rP   pr   ry  r   r   r   rh  9  s   


z PosixFDHandleExchanger.broadcastNc                 C   s   t | d S rG   )r   r]  ri  r   r   r   rj  H  rw   zPosixFDHandleExchanger.cleanupc                 C   rx   rG   )rs  r]  rH   r   r   r   r]  K  rw   zPosixFDHandleExchanger.closerX   )rY   rZ   r[   r\   r?   rr   r2  rr  rk  r   r   rc  r   rO   rh  rj  r]  __classcell__r   r   rt  r   rp    s    	
rp  
device_idxc                 C   s   t ttjj| }|dkrdS t  z-t| }t }t	|t
| |jtjkr;|jd dkr;W t  dS W t  dS t  w )Nr   FT)r   r   cuDeviceGetAttributeCUdevice_attribute0CU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTEDr   r   r   c_nvmlGpuFabricInfoV_tnvmlDeviceGetGpuFabricInfoVr9   byrefstateNVML_GPU_FABRIC_STATE_COMPLETEDclusterUuidnvmlShutdown)r|  fabric_handle_supportedr&  fabric_infor   r   r   is_mnnvl_fabric_supportedO  s&   
r  c                   @   s6  e Zd ZdZ			d4dededededee d	ed
efddZdd Z	de
e fddZde
e fddZdefddZdefddZdedefddZdefddZdefddZdefdd Zdefd!d"Zdefd#d$Zded	efd%d&Zd'd( Zdefd)d*Zd+d, Zd-d. Zd/d0 Zded1ejfd2d3ZdS )5SymmDeviceMemoryz1Python port of SymmDeviceMemory from TensorRT-LLMNTbuf_sizer`  r_  r|   comm_backend_for_handle_transferenable_multicastallocate_signal_padsc              
   C   s  t t|}t t|}	t t|	 ddlm}
 |
 r%dd lm} n	dd l	m
  m} t || || _|| _|| _|| _d| _d| _|pKt | _d| _g | _g | _d| _d| _d| _g | _d| _t| _t ttjj |}|dkrzt!dt"|| j| _t#$d| d| d| d| j  t%|rt&| j| j| j| _'n
t(| j| j| j| _'| )|| |rdg| j | _t*| jD ]}| j| | j | j|< || jkrt t+| j| d| j qt,| j| _t,| j| _d S )	Nr   )has_cuda_cudart   z8[SymmDeviceMemory] Device does not support multicasting.z[SymmDeviceMemory] Rank: z, Group size: z, device_idx: z, Signal pad offset: )-r   r   cuDeviceGetcuDevicePrimaryCtxRetaincuCtxSetCurrentflashinfer.utilsr  cuda.cudartcudartcuda.bindings.runtimebindingsruntimecudaSetDevicer|  r`  r_  r  signal_pad_offsetallocation_sizerp   r   mc_ptruc_ptrssignal_padssignal_pads_devuc_ptrs_dev	mc_handle
uc_handlesSIGNAL_PAD_ALIGNMENTSIGNAL_PAD_SIZEr}  r~  'CU_DEVICE_ATTRIBUTE_MULTICAST_SUPPORTEDr   r   r   r   r  rl  
_exchangerrp  _alloc_mn_mcast_memr  
cuMemsetD8rD   )rI   r  r`  r_  r|  r  r  r  	cu_deviceprimary_ctxr  r  multicast_supportedr  r   r   r   rr   l  s|   

zSymmDeviceMemory.__init__c                 C   s  t | dr
| j  t rdS zt  W n ty0 } ztd|  W Y d}~dS d}~ww | j	r<t
t| j	 | jrGt
t| j t | dr| jrt| jD ]J}| j| dkrz$t
t| j|  |t| jk r| j| rt
t| j| | j W qT ty } ztd| d|  W Y d}~qTd}~ww qTt | dr| jrt
t| j| j t | d	r| jr| jdkrzt
t| j| j t
t| j| j t
t| j W dS  ty } ztd
|  W Y d}~dS d}~ww dS dS dS )z%Destructor - cleanup allocated memoryr  Nz4Destructor: CUDA context invalid, skipping cleanup: r  r   z1Destructor: Failed to release UC handle for rank r-   uc_base_ptrr  z)Destructor: Failed to release MC handle: )hasattrr  r]  r   r   r   cuCtxGetCurrentr3   r2   r  r   	cuMemFreer  r  r  r`  r  r;   r  r  r  r  r  total_uc_sizer  r  )rI   r6   r3  r   r   r   r     sd   

zSymmDeviceMemory.__del__r   c                 C      | j S zFGet the raw array of signal pad pointers to all ranks (including self))r  rH   r   r   r   get_signal_pad_ptrs_host     z)SymmDeviceMemory.get_signal_pad_ptrs_hostc                 C   r  zCGet the raw array of unicast pointers to all ranks (including self))r  rH   r   r   r   get_buffer_ptrs_host  r  z%SymmDeviceMemory.get_buffer_ptrs_hostc                 C   r  r  )r  rH   r   r   r   get_signal_pad_ptrs_dev  r  z(SymmDeviceMemory.get_signal_pad_ptrs_devc                 C   r  r  )r  rH   r   r   r   get_buffer_ptrs_dev  r  z$SymmDeviceMemory.get_buffer_ptrs_devr3  c                 C   s<   |t | jkrtd| dt | jd  d| j| }|S )+Get the raw unicast pointer to a given rankzRank z out of range (0-r   ))r;   r  
ValueError)rI   r3  data_ptrr   r   r   get_unicast_ptr  s    
z SymmDeviceMemory.get_unicast_ptrc                 C   s
   t | jS zGet the raw multicast pointer)r?   r  rH   r   r   r   get_multicast_ptr"  s   
z"SymmDeviceMemory.get_multicast_ptrc                 C   r  )z(Get the rank of this device in the group)r_  rH   r   r   r   r   (  r  zSymmDeviceMemory.get_rankc                 C   r  )z,Get the total number of devices in the group)r`  rH   r   r   r   r   ,  r  zSymmDeviceMemory.get_world_sizec                 C   r  )z4Get the total allocation size (including signal pad))r  rH   r   r   r   get_allocation_size0  r  z$SymmDeviceMemory.get_allocation_sizec                 C   s   | j | j S )z1Get the usable buffer size (excluding signal pad))r  r  rH   r   r   r   get_usable_buffer_size4     z'SymmDeviceMemory.get_usable_buffer_sizec                 C   s6   |    | |\}}| | |r| | dS dS )z0Allocate multi-node multicast memory using MNNVLN)_verify_cuda_context_get_allocation_prop_allocate_unicast_buffers_setup_multicast)rI   r  r  r   mc_propr   r   r   r  8  s   
z$SymmDeviceMemory._alloc_mn_mcast_memc              
   C   sr   zt t }t|| jkrtd| d| j  W dS W dS  ty8 } ztd|  W Y d}~dS d}~ww )z1Verify CUDA context is set to the correct device.z'CUDA context device mismatch! Current: z, Expected: zError checking CUDA context: N)r   r   r   r?   r|  r2   r3   )rI   r  r6   r   r   r   r  F  s   
z%SymmDeviceMemory._verify_cuda_contextc                 C   s   t  }| jj|_t jj|_t  |_	t j
j|j	_| j|j	_d|j_tt |t jj}t|| j || _t  }| j|_| j|_| jj|_tt |t jj| _t| j| j| _||fS )zCCompute allocation size and return allocation/multicast properties.r   ) r   r   r  rc  r   r   r   r   r   r   r   r   r|  r   
allocFlagsgpuDirectRDMACapabler   r   r   r   r   r  r  CUmulticastObjectPropr`  
numDevicesr*   handleTypescuMulticastGetGranularityCUmulticastGranularity_flags$CU_MULTICAST_GRANULARITY_RECOMMENDED_mc_granularity)rI   r  r   alloc_granularityr  r   r   r   r  Q  s:   





z%SymmDeviceMemory._get_allocation_propc           
   	   C   sR  dg| j  | _tt| j|d| j| j< tt| j| j | jj	d}| j
|}t  t| j D ]}|| jkrQtt|| | jj	| j|< | j||  q3dg| j  | _| j| j  }|| _tt|| jdd}|| _t| j D ]"}| j| }t|| | j|< tt| j| | jd| j| d qu|  }	tt|||	gd dS )zFAllocate local UC memory, exchange handles with peers, and map memory.r   r   N)r`  r  r   r   r   r  r_  r   r  rc  rO   cuCtxSynchronizer  r   rj  r  r  r   r  r  r?   r   _get_mem_access_descr   )
rI   r   local_shareable_uc_handleall_shareable_uc_handlesrz  r  r  r  offsetaccess_descr   r   r   r  z  sT   
	


z*SymmDeviceMemory._allocate_unicast_buffersc              	   C   s  | j dkrtt|| _tt| j| jjd}nd}| jj|dd}t	  | j dkr>tt
|| jj| _| j| tt| j| j tt| j| jdd| _tt| j| jd| jd |  }tt| j| j|gd tt| jd| j| j  d| jd dS )z?Create multicast object, exchange handle, map memory, and bind.r   Nro  r   )r_  r   r   cuMulticastCreater  r   r  rc  rh  r  r   rj  cuMulticastAddDevicer|  r   r  r  r  r   r  r   cuMulticastBindMemr  )rI   r  shareable_mc_handler  r   r   r   r    sR   


z!SymmDeviceMemory._setup_multicastc                 C   s6   t  }t  |_t jj|j_| j|j_t j	j
|_|S )z0Create memory access descriptor for this device.)r   r   r   r   r   r   r   r|  r   r   r   r   )rI   r  r   r   r   r    s   


z%SymmDeviceMemory._get_mem_access_descr   c                 C   s|   |t jks
|t jkrd}d}tj}n|t jkrd}d}tj}ntd| | j| j	 | }t
|t| j| j || d S )Ni   r   l        r+   zUnsupported dtype: )r   bfloat16float16r   cuMemsetD16float32cuMemsetD32r  r  r  r   r?   r  r_  )rI   r3  r   neg_zerodsizememset_funcnum_elementsr   r   r   lamport_initialize  s   
z#SymmDeviceMemory.lamport_initialize)NTT)rY   rZ   r[   r\   r?   r   rE   r0  rr   r   r   r  r  r  r  r  r  r   r   r  r  r  r  r  r  r  r  r   r   r  r   r   r   r   r  i  sL    
`<
);8	r  c                   @   s   e Zd ZdZ	ddedededejdee f
dd	Z	d
edej
fddZ	ddedej
dedejfddZ	ddedej
dedejfddZdefddZd
edefddZdefddZdS )McastGPUBufferz
    Wrapper class for SymmDeviceMemory to facilitate PyTorch tensor creation.
    It manages a buffer accessible via unicast or multicast for multi-node communication.

    Python port of McastGPUBuffer from TensorRT-LLM
    Nr  r`  r_  r   r  c                 C   s*   t ||||j|| _| j | _|| _dS )a&  
        Constructor for McastGpuBuffer.

        Args:
            buf_size: The requested size of the buffer in bytes. The actual usable size may differ due to alignment requirements.
            group_size: The number of ranks in the communication group
            group_rank: The rank of the local process within the group
            device: The CUDA device for buffer allocation
            mn_nvlink: Flag indicating if multi-node NVLink is used
            comm_backend_for_handle_transfer: Communication backend for handle transfer
        N)r  indexmcast_device_memoryr  r  local_device)rI   r  r`  r_  r   r  r   r   r   rr     s   
zMcastGPUBuffer.__init__r3  r   c                 C   s   | j || d S rG   )r  r  )rI   r3  r   r   r   r   r  0  s   z!McastGPUBuffer.lamport_initializer   sizesstorage_offsetr   c                 C      t d)a|  
        Returns a PyTorch tensor view of the multicast buffer portion.

        Args:
            sizes: The desired shape (dimensions) of the tensor
            dtype: The data type of the tensor elements
            storage_offset: The offset in elements from the start of the buffer

        Returns:
            A PyTorch tensor wrapping the multicast buffer section
        Not implemented yetNotImplementedErrorrI   r  r   r  r   r   r   get_multicast_buffer3  s   z#McastGPUBuffer.get_multicast_bufferc                 C   r  )zN
        Returns a PyTorch tensor view of the unicast buffer portion.
        r  r  r  r   r   r   get_unicast_bufferE  s   z!McastGPUBuffer.get_unicast_bufferc                 C   rt   r  )r  r  rH   r   r   r   r  O     
z McastGPUBuffer.get_multicast_ptrc                 C   rv   )r  )r  r  )rI   r3  r   r   r   r  S  r  zMcastGPUBuffer.get_unicast_ptrc                 C   rt   )z$Get the buffer pointers device array)r  r  rH   r   r   r   r  W  r  z"McastGPUBuffer.get_buffer_ptrs_devrG   )r   )rY   rZ   r[   r\   r?   r   r   r   rE   rr   r   r  tupleTensorr  r  r  r  r  r   r   r   r   r  
  sL    



r  )>r9   r   r   r9  rG  rv  r>  abcr   r   dataclassesr   r   r   typingr   r   r   r   r	   r   r   cuda.bindingsr
   r   r`   r6   
cuda_utilsr   dlpack_utilsr   r   r   r   OMPI_COMM_TYPE_HOSTr  MNNVL_DEBUGr?   r   r  r   r  r)   r0  r7   rD   rE   r_   r^   rb   rc   rp   rz   r   r   r2  r^  rl  rp  r  r  r  r   r   r   r   <module>   s   
$
P  Ec2   $