o
    ۷in,                     @   s   d dl mZ d dlZd dlmZ d dlmZ d dlmZ g dZ		dd	ej
d
edededej
f
ddZG dd dejjZ	dd	ej
d
edededej
f
ddZG dd dejjZG dd dZdS )    )AnyN)Tensor)current_omni_platform)all_to_all_4Dall_to_all_5DSeqAllToAll4DSeqAllToAll5DRingComm      Finputscatter_idx
gather_idxuse_syncreturnc                 C   s  |   dksJ d|    d| j t|}|dkrp|dkrp| j\}}}}	|| }
|| }| |||||	dd }t|}|dkrVtj	|||d |rUt
  n|}||
|||	}|dd ||
||	}|S |dkr|dkr| j\}}
}}	|| }|
| }t|}| |||||	dddd |||||	}t|}|dkrtj	|||d |rt
  n|}|||||	}|dd ||||	}|S td	)
a  
    all-to-all for QKV

    Args:
        input (torch.tensor): a tensor sharded along dim scatter dim
        scatter_idx (int): default 1
        gather_idx (int): default 2
        group (torch.distributed.ProcessGroup): torch process group
        use_sync (bool): whether to synchronize after all-to-all

    Returns:
        torch.tensor: resharded tensor (bs, seqlen/P, hc, hs)
       zinput must be 4D tensor, got  and shape r
   r   r   group   z8scatter_idx must be 1 or 2 and gather_idx must be 1 or 2dimshapedistget_world_sizereshape	transpose
contiguoustorch
empty_likeall_to_all_singler   synchronizeRuntimeError)r   r   r   r   r   seq_world_sizebsshard_seqlenhchsseqlenshard_hcinput_toutput r,   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/comm.pyr      sN   &



r   c                   @   s<   e Zd Ze	ddedejdededede	defd	d
Z
dS )r   Fctxr   r   r   r   r   r   c                 C   *   || _ || _|| _|| _t|||||dS N)r   r   )r   r   r   r   r   r.   r   r   r   r   r   r,   r,   r-   forwardh   s
   	zSeqAllToAll4D.forwardN)F__name__
__module____qualname__staticmethodr   r   ProcessGroupr   intboolr2   r,   r,   r,   r-   r   g   s$    r   r   c                 C   s  |   dksJ d|    d| j t|}|dkr|dkr| j\}}}}	}
|dks.J || }|	| }| ||d|||
dd }t|}|dkr^tj	|||d |r]t
  n|}||d|||
}|dddd }|||d||
 S |dkr|dkr| j\}}}}}
|| }	|| }t|}| |||d||
dd	dd |||d||
}t|}|dkrtj	|||d |rt
  n|}||	|d||
}|dd }|||d|	|
 S td
)a  
    all-to-all for QKV
    forward (bs, seqlen/N, 3, hc, hs) -> (bs, seqlen, 3, hc/N, hs)

    Args:
        input (torch.tensor): a tensor sharded along dim scatter dim
        scatter_idx (int): default 1
        gather_idx (int): default 2
        group (torch.distributed.ProcessGroup): torch process group
        use_sync (bool): whether to synchronize after all-to-all

    Returns:
        torch.tensor: resharded tensor (bs, seqlen/P, 3, hc, hs)
       zinput must be 5D tensor, got r   r   r   r   r   r
   r   z8scatter_idx must be 1 or 3 and gather_idx must be 1 or 3r   )r   r   r   r   r   r#   r$   r%   t_cntr&   r'   r(   r)   r*   r+   _r,   r,   r-   r   x   sP   &
 


r   c                   @   s@   e Zd Ze			ddedejdededed	e	d
efddZ
dS )r   r   r   Fr.   r   r   r   r   r   r   c                 C   r/   r0   )r   r   r   r   r   r1   r,   r,   r-   r2      s
   	zSeqAllToAll5D.forwardN)r   r   Fr3   r,   r,   r,   r-   r      s(    r   c                   @   sR   e Zd ZdZdejfddZddejdejdB dejfd	d
Z	dd Z
dd ZdS )r	   z@Ring communication utility for Ring Attention P2P communication.process_groupc                 C   s   || _ g | _t| j | _t| j | _d | _| jd | j | _| jd | j | _	|d urAt
| j | j| _t
| j | j	| _	d S d S )Nr   )_process_group_opsr   get_rankrankr   
world_size_reqs	send_rank	recv_rankget_global_rank)selfr>   r,   r,   r-   __init__   s   zRingComm.__init__Nto_sendrecv_tensorr   c                 C   s   |  s| }|d u rtj|tjd}n
|}|  s| }tjtj|| j| j	d}tjtj
|| j| j	d}| j| | j| |S )N)memory_formatr   )is_contiguousr   r   r   contiguous_formatr   P2POpisendrE   r?   irecvrF   r@   append)rH   rJ   rK   ressend_oprecv_opr,   r,   r-   	send_recv   s   zRingComm.send_recvc                 C   s$   | j d ur	tdt| j| _ d S )Nzcommit called twice)rD   r"   r   batch_isend_irecvr@   )rH   r,   r,   r-   commit	  s   
zRingComm.commitc                 C   s6   | j d u r	td| j D ]}|  qd | _ g | _d S )Nzwait called before commit)rD   r"   waitr@   )rH   reqr,   r,   r-   rY     s   



zRingComm.wait)N)r4   r5   r6   __doc__r   r8   rI   r   r   rV   rX   rY   r,   r,   r,   r-   r	      s    "r	   )r
   r   NF)r   r   NF)typingr   r   torch.distributeddistributedr   r   vllm_omni.platformsr   __all__tensorr9   r:   r   autogradFunctionr   r   r   r	   r,   r,   r,   r-   <module>   sB   
W
Z