o
     i                     @   s0  U d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl
mZ d dlZ	d dlmZ d ZdZdZde	jfddZG d	d
 d
Zi Zeeee f ed< dejdefddZde	jdejdee fddZde	jjfddZ eddddddde	j!de	j!dejdee	j! dedee	j! deee	j!ee	j! f  dee	j de	j!fddZ"eddddddde	j!dee	j! dejdeee	j!  dedee	j! deee	j!ee	j! f  dee	j dee	j! fd dZ"ddddddde	j!dee	j!ee	j! f dejdeee	j!ee	j! f  dedee	j! deee	j!ee	j! f  dee	j dee	j!ee	j! f fd!dZ"e	j#j$d"d#hd$d%de	j!d&ee	j! d'e%d#ee	j! ded(ed)ede	j!d*eee	j!  ddfd+d,Z&dd-d.ee	j! d/eee	j! eeg e	jjf gdf dejdeddf
d0d1Z'edddddd2d3e	j!de	j!dejdee	j! ded4ee	j! deee	j!ee	j! f  dee	j de	j!fd5d6Z(edddddd2d3e	j!dee	j! dejdeee	j!  ded4ee	j! deee	j!ee	j! f  dee	j dee	j! fd7d6Z(dddddd2d3e	j!dee	j!ee	j! f dejdeee	j!ee	j! f  ded4ee	j! deee	j!ee	j! f  dee	j dee	j!ee	j! f fd8d6Z(e	j#j$d9d:hd$d%d3e	j!d&ee	j! d'e%d:ee	j! ded(ed)ed4e	j!d*eee	j!  ddfd;d<Z)dd-d/eee	j! eeg e	jjf gdf d:ee	j! dejdeddf
d=d>Z*dS )?    N)CallableDictListOptionalSequenceUnionoverload)get_symm_mem_workspace   i  dtc                 C   s   | j o
t| jdkS N   )is_floating_pointtorchfinfobits)r    r   \/home/ubuntu/.local/lib/python3.10/site-packages/xformers/ops/sequence_parallel_fused_ops.py_is_fp8_dtype   s   r   c                   @   s   e Zd ZdZdejdejfddZdej	j
deg ej	j
f fdd	Z	
	
ddeej deeej eeg ej	j
f gdf dededef
ddZ	
	
ddeeej eeg ej	j
f gdf deej deej dededefddZdS )_FusedSequenceParallelaI	  Set up a communication ring and perform fused ops on it

    Stores the persistent state needed to support a ring of connections between
    processes, and the logic that can do fused comms + matmuls on it.

    We want to achieve overlap between:
    - a computation which reads from the data we received from a remote GPU
    - and the communication where we send some data to another GPU
    And in order to do that we need some staging buffers and a way to
    synchronize access to them across processes.

    To perform the communication over NVLink we make the processes exchange
    their staging buffers using IPC (Inter-Process Communication) handles, which
    "mounts"/"mmaps" an allocation on one GPU into the virtual address space of
    another GPU: the memory remains backed by the original GPU but the other GPU
    can access it as if it were local. We exchange these IPC handles using
    multiprocessing Connections (and the "reductions" provided by PyTorch),
    which we establish over UNIX domain sockets, whose addresses we exchange by
    using a ProcessGroup.

    To synchronize accesses we use a set of counters/sequence numbers that are
    also allocated in memory shared over IPC handles. Processes signal that they
    completed an operation by launching a kernel that increases that value, and
    they wait for anoher process to complete an operation by launching a kernel
    that busy-waits for that value to increase. Currently we implement these
    kernels manually, but on recent CUDA drivers (515.43.04+, corresponding to
    CUDA 11.7) we could use standard stream memory operations (see
    https://docs.nvidia.com/cuda/archive/11.7.0/cuda-driver-api/group__CUDA__MEMOP.html).

    We prefer to use these kernels (or the stream memory ops) over IPC events
    because IPC events require signaling between processes at launch time to
    ensure that the wait on one process occurs after the record on another
    process. This signaling means that _launching_ our fused operation becomes a
    synchronization barrier, which can increase the launch overhead. It would
    also behave differently from NCCL, where launching is async and all the
    synchronization happens on device in the kernels. A previous version of this
    code which uses IPC events can be found here:
    https://github.com/fairinternal/xformers/pull/504.

    devicegroupc                 C   sf   || _ | | _| | _|| _tj | _	tjjdd| _
tjjdd| _tjjdd| _d| _d S )N)priorityr   )	my_devicerankmy_ranksize
world_sizer   r   cudaStreamsecond_streammemcpy_streamcompute_wait_streammemcpy_wait_streamnext_stream_idx)selfr   r   r   r   r   __init__D   s   


z_FusedSequenceParallel.__init__current_streamreturnc                    s    fdd}|S )Nc                     s0    j gj }  jd7  _ jd;  _| S )Nr
      )r!   r%   )streamr(   r&   r   r   result\   s   z:_FusedSequenceParallel.make_stream_factory.<locals>.resultr   )r&   r(   r-   r   r,   r   make_stream_factoryY   s   z*_FusedSequenceParallel.make_stream_factoryTscattered_inputs	my_matmulN	timeout_s_wait_memcpyc              	      sT  d j  tfddD sJ t fddD sJ dd D ttjj& tjj	j
  j  fddtj
D }W d   n1 sZw   Y  tj }td	j
D ](}j| j
 |rtj| t W d   n1 sw   Y  qjj| j| j| |}	td	j
D ]}j| j
 }
|rtjj j|
t|t d
 W d   n1 sw   Y  jj |rtjj t||
 D ]\}}|j | qW d   n	1 sw   Y  |r@tjj j|
j
t j d	d	d W d   n	1 s;w   Y  q|j|	 td	j
D ]M}j| j
 |r}tjj jt|t d
 W d   n	1 sxw   Y  |j jj |fdd|j D |	 qN|j |j dS )z5Perform a fused all-gather followed by a linear layerr   c                 3       | ]	}|j  jkV  qd S Nr   r   .0sir&   r   r   	<genexpr>q       z>_FusedSequenceParallel.allgather_and_linear.<locals>.<genexpr>c                 3       | ]}|j  kV  qd S r5   dtyper7   r>   r   r   r;   r       c                 S      g | ]}|  qS r   numelr7   r   r   r   
<listcomp>t       z?_FusedSequenceParallel.allgather_and_linear.<locals>.<listcomp>c              	      >   g | ]}fd dt |jg jddD qS )c                    $   g | ]\}}|  jf|j qS r   viewr   shape)r8   sr9   r:   r   r   rD   ~       zJ_FusedSequenceParallel.allgather_and_linear.<locals>.<listcomp>.<listcomp>r   dimzip
get_bufferr   splitr8   r   )r?   scattered_input_numelsr/   r&   symm_memtotal_scattered_input_numelr   r   rD   }       

Nr
   
timeout_msvalcountc                       g | ]}|  qS r   r   r8   rK   src_rankr   r   rD      rE   )r?   allsumr   r   r   r   r	   r   
group_namer   itemsizeranger(   r   r+   
put_signalOP_FINISHED_CHANNELr!   wait_streamr#   r$   r.   wait_signalMS_IN_Sr"   rP   copy_memset32get_signal_padCOMMS_READY_CHANNEL)r&   r/   r0   r1   r2   r3   buffersr(   iter_stream_factorydst_rankbsr9   r   )r?   rT   r/   r&   r`   rU   rV   r   allgather_and_lineard   s   



z+_FusedSequenceParallel.allgather_and_lineargathered_outputsscattered_outputsc              	      s  |d j tfdd|D sJ tfdd|D sJ tfddD s,J tfddD s9J dd D ttjj& tjj	j
 j fd	dtj
D }W d
   n1 stw   Y  tj }tdj
D ](}	j|	 j
 }
|rtj| |
t W d
   n1 sw   Y  qj| j| j| |}tdj
D ]}	j|	 j
  |rtjj j t|t d W d
   n1 sw   Y  |j jj | fdd|j D  | |jgjd d  }|| |j |rWtj| j j
t j ddd W d
   n	1 sRw   Y  q|fdd|D j| tdj
D ]i}	j|	 j
 }
|rtjj j|
t|t d W d
   n	1 sw   Y  jj |rtjj t|||
 D ]\}}||
 |j  qW d
   n	1 sw   Y  ql|j |j t|D ]\}}tj|d|d qd
S )z9Perform a fused linear layer followed by a reduce-scatterr   c                 3   r4   r5   r6   r8   gor:   r   r   r;      r<   zB_FusedSequenceParallel.linear_and_reducescatter.<locals>.<genexpr>c                 3   r=   r5   r>   rw   r>   r   r   r;      r@   c                 3   r4   r5   r6   r8   sor:   r   r   r;      r<   c                 3   r=   r5   r>   ry   r>   r   r   r;      r@   c                 S   rA   r   rB   ry   r   r   r   rD      rE   zC_FusedSequenceParallel.linear_and_reducescatter.<locals>.<listcomp>c              	      rF   )c                    rG   r   rH   )r8   rK   rz   r:   r   r   rD      rL   zN_FusedSequenceParallel.linear_and_reducescatter.<locals>.<listcomp>.<listcomp>r   rM   rO   rS   )r?   scattered_output_numelsrv   r&   rU   total_scattered_output_numelr   r   rD      rW   Nr
   rX   c                    r]   r   r   r^   rr   r   r   rD     rE   r*   rZ   c                    s   g | ]}| j  qS r   )r   )r8   or:   r   r   rD   0  s    )rN   out) r?   ra   rb   r   r   r   r   r	   r   rc   r   rd   re   r(   r   r+   rf   rg   r!   rh   r#   r$   r.   ri   rj   r%   rl   rm   rn   r"   rP   rk   )r&   r0   ru   rv   r1   r2   r3   ro   r(   rp   r`   rq   final_streamrx   rs   rz   r   )rr   r?   r{   rv   r&   rU   r|   r   linear_and_reducescatter   s   




z/_FusedSequenceParallel.linear_and_reducescatter)TT)__name__
__module____qualname____doc__r   r   distProcessGroupr'   r   r    r   r.   r   Tensorintboolrt   r   r   r   r   r   r      sX    )


u	r   CACHEr   r)   c                 C   s   |   dkS r   )r   )r   r   r   r   -_can_ranks_communicate_all_to_all_over_nvlinkV  s   
r   r   c                 C   sz   |  }z	tt| }W |S  ty<   ttjddr d }n|dkr'd }nt|s.d }nt	| |}|tt|< Y |S w )NDISABLE_FUSED_SEQUENCE_PARALLEL0r
   )
r   r   idKeyErrorr   osenvirongetr   r   )r   r   r   objr   r   r   
_lazy_initc  s   
r   c                   C   s
   t j S r5   )r   r   r(   r   r   r   r   _default_stream_factoryv  s   
r   i  )r   r1   scale_scattered_inputscale_weight	out_dtypescattered_inputweightr   r1   r   r   r   c          	      K      d S r5   r   	r   r   r   r   r1   r   r   r   private_args_DO_NOT_USEr   r   r   fused_allgather_and_linearz     r   c          	      K   r   r5   r   r   r   r   r   r     r   c                   sJ  |  }	t|tr|n|g}
|du |du ksJ |durWt|tt|tks(J t|tr/|n|g}t|
t|ks<J tjsCJ tdd |
D sNJ dusVJ dndgt|
 }tdd |
D siJ jdkspJ tfdd|
D s}J  sJ |	fj	   fdd	|
D }|durt|tt|tksJ t|tr|n|g}t|t|ksJ td
d t
||D sJ tdd |D sJ durt|tr|D ]	}|jksJ qn|jksJ n
fdd	|D }tjjj|
|j|||dd|dd||d	 t|trdd	 |D S |d ddS )a  Performs a fused all-gather followed by a linear op

    It is equivalent to the following plain PyTorch code:

    # like scattered_input but with first dim multiplied by group's world size
    gathered_input = scattered_input.new_empty(...)
    dist.all_gather_into_tensor(gathered_input, scattered_input, group=group)
    return torch.nn.functional.linear(gathered_input, weight)

    It achieves this by breaking down the matmul into smaller partial ops (as
    many as the world size), each needing as input a different "contribution"
    to the all-gather (by a different rank), and writing to a different chunk of
    the output. Then, on one stream, it sends the local contribution to all
    other ranks (first one rank over, then two, ...) while, on another stream,
    it launches the sub-matmuls in the order in which the remote contributions
    (which are the sub-matmuls' inputs) are supposed to arrive, so that ideally
    none of the sub-matmuls will ever have to wait.

    The idea comes from this paper: https://arxiv.org/abs/2302.05442

    This method uses a staging buffer, which persists across calls, of the same
    size as the all-gathered input tensor (i.e., the input's size times the
    world size). If multiple inputs of multiple sizes are used, the staging
    buffer will be the maximum needed by any of them. Each call, when it starts,
    must first wait for the previous call to finish using the staging buffer. In
    normal conditions, where there's some other operation between two calls,
    this isn't an issue.

    Supports FP8 gemm for tensor-wise quantized weight and input tensors.
    To enable FP8 gemm:
    1. pass scattered_input and weight as quantized FP8 datatype
    2. pass scale_scattered_input and scale_weight, the scales used to
    quantize input and weight, respectively.
    3. set out_dtype, if not specified, will be inferred from scattered_input type.

    Nc                 s       | ]}t |jV  qd S r5   r   r?   r8   wr   r   r   r;     r@   z-fused_allgather_and_linear.<locals>.<genexpr>!output_dtype is required with FP8c                 s       | ]}|j d kV  qdS r*   Nndimr   r   r   r   r;     r@   r*   c                 3   $    | ]} j d  |j d  kV  qdS r   NrJ   r   )r   r   r   r;        " c                    s&   g | ]} d d |j d d  qS Nr   r   r   )gathered_input_shaper   r   rD     s   & z.fused_allgather_and_linear.<locals>.<listcomp>c                 s       | ]
\}}|j |kV  qd S r5   r   )r8   rx   gosr   r   r   r;     s    
c                 s       | ]}|  V  qd S r5   is_contiguousrw   r   r   r   r;         c                    s(   g | ]}j | d ur njdqS Nr>   	new_emptyr?   r8   r   )r   r   r   r   rD         r2   Tr3   )r1   r2   r3   r   scales_weightsc                 S   s   g | ]}| d dqS )r   r
   )flattenrw   r   r   r   rD         r   r
   )r   
isinstancelistlenr   r?   ra   r   r   rJ   rP   r   opsxformers_python _fused_allgather_and_linear_implrc   r   r   )r   r   r   r   r1   r   r   r   r   r   weightsr   gathered_output_shapesru   r~   r   )r   r   r   r   r     sd   0


z1xformers_python::_fused_allgather_and_linear_implru   r   )mutates_argsdevice_typesr   process_group_namer2   r3   r   c	                    s\   t j|}	dttj dtdtg tjj	f dd f fdd}
t
| g|
|	|||d d S )Ninputsr`   rq   r)   c              
      s   t  D ]G\}}}tj| 3 d ur0|d ur0tj| d | || j||| d ntj| d | || d W d    n1 sHw   Y  qd S )Nr   r   scale_ascale_br   r   rP   r   r   r+   
_scaled_mmtr?   matmul)r   r`   rq   r   r   rx   ru   r   r   r   r   r   r0     s    	z8_fused_allgather_and_linear_custom_op.<locals>.my_matmulr   r1   r2   r3   )r   distributed_c10d_resolve_process_groupr   r   r   r   r   r   r    fused_allgather_and_anything)r   r   r   ru   r1   r2   r3   r   r   process_groupr0   r   r   r   %_fused_allgather_and_linear_custom_op  s$   
r   )r1   r/   r0   c          
   	      s^  |  t dkrtD ]|g t qd S tdd  D s$J t fdd D s1J t fdd D s>J fdd D }t d j|}dkr[| dt d S |d u rd	d t |D }t |D ]\}}	tj	|	||d
 qntD ]|fdd|D t qd S  d j|j
ksJ |j |||dd|ddd d S )Nr   c                 s   r   r5   r   r7   r   r   r   r;   G  r   z/fused_allgather_and_anything.<locals>.<genexpr>c                 3        | ]}|j  d  j kV  qdS r   Nr   r7   r/   r   r   r;   H      c                 3   r   r   r>   r7   r   r   r   r;   I  r   c                       g | ]} f|j  qS r   r   r7   r   r   r   rD   K  r   z0fused_allgather_and_anything.<locals>.<listcomp>r
   c                 S      g | ]	\}}| |qS r   r   )r8   r9   gisr   r   r   rD   T      )output_tensorinput_tensorr   c                    r]   r   r   )r8   gir_   r   r   rD   \  rE   r2   Tr3   r1   r2   r3   )r   r   re   r   ra   r   r   rP   r   all_gather_into_tensorr   rt   r   )
r/   r0   r   r1   r   gathered_input_shapesr   gathered_inputsr9   r   r   )r/   r`   r   r   r   6  sB   
	


r   )r   r1   scale_gathered_inputr   r   gathered_inputr   c          	      K   r   r5   r   	r   r   r   r   r1   r   r   r   r   r   r   r   fused_linear_and_reducescatterm  r   r   c          	      K   r   r5   r   r   r   r   r   r   }  r   c                   s  |  }	t|tr|n|g}
|du |du ksJ |durWt|tt|tks(J t|tr/|n|g}t|
t|ks<J t jsCJ tdd |
D sNJ dusVJ dndgt|
 }tdd |
D siJ  jdkspJ t fdd|
D s}J   sJ  j	d |	 dksJ  
|	 j	d |	 f j	d	d    fd
d|
D }dd |D }|durt|tt|tksJ t|tr|n|g}t||ksJ t fdd|D sJ t fdd|D sJ tdd t||D sJ durt|tr|D ]}|jksJ q	n|jksJ n
 fdd|D }tjjj |
|j|||dd|dd||d	 t|trJ|S |d S )a	  Performs a fused linear op followed by a reduce-scatter

    It is equivalent to the following plain PyTorch code:

    gathered_output = torch.nn.functional.linear(gathered_input, weight)
    # like gathered_output but with first dim divided by group's world size
    scattered_output = gathered_output.new_empty(...)
    dist.reduce_scatter_tensor(scattered_output, gathered_output, group=group)

    Supports FP8 gemm with tensor-wise quantized weights. To enable FP8 gemm:
    1. pass weight and gathered_input as FP8 tensors
    2. Set `scale_gathered_input` and `scale_weight` to the scales used to quantize
    inputs and weight, respectively.
    3. Set out_dtype to the desired output dtype. If not specified, it will be inferred from
    gathered_input datatype.
    Nc                 s   r   r5   r   r   r   r   r   r;     r@   z1fused_linear_and_reducescatter.<locals>.<genexpr>r   c                 s   r   r   r   r   r   r   r   r;     r@   r*   c                 3   r   r   r   r   r   r   r   r;     r   r   r
   c                    s(   g | ]} j d d |j d d  qS r   r   r   r   r   r   rD     s   ( z2fused_linear_and_reducescatter.<locals>.<listcomp>c                 S   s   g | ]}|d d qS )r
   Nr   r   r   r   r   rD     r   c                 3       | ]	}|j  j kV  qd S r5   r   ry   r   r   r   r;     r<   c                 3   r   r5   r>   ry   r   r   r   r;     r<   c                 s   r   r5   r   )r8   rz   sosr   r   r   r;     s
    
c                    s(   g | ]} j |d urn jdqS r   r   )r8   r   r   r   r   r   rD     r   r2   Tr3   )r1   r2   r3   r   r   )r   r   r   r   r   r?   ra   r   r   rJ   rI   rP   r   r   r   $_fused_linear_and_reducescatter_implrc   r   )r   r   r   r   r1   r   r   r   r   r   r   r   r   scattered_output_shapesrv   r~   r   r   r   r     sn   



z5xformers_python::_fused_linear_and_reducescatter_implrv   c	                    sZ   t j|}	dttj dtdtg tjj	f dd f fdd}
t
|
||	|||d d S )Noutputsrr   rq   r)   c              
      s   t | D ]A\}}}tj| - d ur,|d ur,tj | | |j||d ntj | | |d W d    n1 sBw   Y  qd S )Nr   r   r   )r   rr   rq   r   r   r~   r   r   r   r   r   r   r0     s    	z<_fused_linear_and_reducescatter_custom_op.<locals>.my_matmulr   )r   r   r   r   r   r   r   r   r   r     fused_anything_and_reducescatter)r   r   r   rv   r1   r2   r3   r   r   r   r0   r   r   r   )_fused_linear_and_reducescatter_custom_op  s$   
r   c          
   
      sr  |  tdkrtD ] | g  t qd S tdd D s$J tfddD s1J tfddD s>J fddD }td j|}dkr[| dt d S |d u rd	d t|D }tD ] |  fd
d|D  t qmt|D ]\}}	tj	|	||d qd S d j|j
ksJ fdd|D }|j| |||dd|ddd d S )Nr   c                 s   r   r5   r   ry   r   r   r   r;   ,  r   z3fused_anything_and_reducescatter.<locals>.<genexpr>c                 3   r   r   r   ry   rv   r   r   r;   -  r   c                 3   r   r   r>   ry   r   r   r   r;   .  r   c                    r   r   r   ry   r   r   r   rD   0  r   z4fused_anything_and_reducescatter.<locals>.<listcomp>r
   c                 S   r   r   r   )r8   rz   r   r   r   r   rD   9  r   c                    r]   r   r   rw   r}   r   r   rD   ?  rE   )outputinputr   c                    s   g | ]	} d   |qS )r   r   r   r   r   r   rD   I  s    r2   Tr3   r   )r   r   re   r   ra   r   r   rP   r   reduce_scatter_tensorr   r   r   )
r0   rv   r   r1   r   r   r   ru   rx   rz   r   )rr   rv   r   r   r     sJ   




r   )+r   typingr   r   r   r   r   r   r   r   torch.distributeddistributedr    torch.multiprocessing.reductions#torch.distributed._symmetric_memoryr	   rg   rn   rj   r?   r   r   r   r   __annotations__r   r   r   r   r   r   r    r   r   r   library	custom_opstrr   r   r   r   r   r   r   r   r   <module>   s  
$  ;
			
m	
1	
7			
_	
1	