o
    -i^9                     @   s  d dl Z d dlmZmZ d dlmZ d dlZd dlm  m	Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ ddlmZ ddlm Z m!Z!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' ee(Z)dedeej* f dedej*f fddZ+G dd dZ,G dd de,Z-G dd de,Z.e/ Z0G dd de,Z1G dd de,Z2G d d! d!e'Z3dS )"    N)CallableSequence)Any)PatternMatcherPass)
VllmConfig)Range)get_tp_group tensor_model_parallel_all_reduce)$get_tensor_model_parallel_world_size)init_logger)kFp8StaticTensorSym)current_platform   )enable_fake_mode)MatcherFusedAddRMSNormMatcherQuantFP8MatcherRMSNorm)NoOpEliminationPass)VllmInductorPassVllmPatternMatcherPassfn.returnc                    s&   t  dtdtjf fdd}|S )Nargsr   c                     s    |  d S )Nr    )r   r   r   b/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/compilation/sequence_parallelism.pywrapper"   s   z&get_first_out_wrapper.<locals>.wrapper)	functoolswrapsr   torchTensor)r   r   r   r   r   get_first_out_wrapper   s   r!   c                   @   sr   e Zd ZdZdedejdedB ddfddZd	ej	dej	fd
dZ
d	ej	dej	fddZd	ej	dej	fddZdS )_SequenceParallelPatternHelperz)Helper for sequence parallelism patterns.epsilondtypedeviceNr   c                 C   s&   || _ || _|| _t | _t | _d S N)r#   r$   r%   r   tp_groupr
   tp_sizeselfr#   r$   r%   r   r   r   __init__,   s
   z'_SequenceParallelPatternHelper.__init__xc                 C   s   t |S r&   )r	   r*   r,   r   r   r   _all_reduce8   s   z*_SequenceParallelPatternHelper._all_reducec                 C      t jjjj|d| j| jjdS Nr   )dim
world_size
group_name)r   opsvllmreduce_scatterdefaultr(   r'   unique_namer-   r   r   r   _reduce_scatter;      
z._SequenceParallelPatternHelper._reduce_scatterc                 C   r/   r0   )r   r4   r5   
all_gatherr7   r(   r'   r8   r-   r   r   r   _all_gather@   r:   z*_SequenceParallelPatternHelper._all_gather)__name__
__module____qualname____doc__floatr   r$   strr+   r    r.   r9   r<   r   r   r   r   r"   )   s    
r"   c                       Z   e Zd ZdedejdedB ddf fddZdeej	 fdd	Z
d
eddfddZ  ZS )FirstAllReduceRMSNormPatternr#   r$   r%   Nr   c                       t  ||| t|| _d S r&   )superr+   r   rmsnorm_matcherr)   	__class__r   r   r+   G      z%FirstAllReduceRMSNormPattern.__init__c                 C   s6   t jg d| j| jd}t jdg| j| jd}||gS )Nr         r%   r$   rM   r   emptyr%   r$   )r*   inputarg3_1r   r   r   
get_inputsK   s   z'FirstAllReduceRMSNormPattern.get_inputspm_passc                    sp   dt jdt jdtt jt jf f fdd}dt jdt jdtt jt jf f fdd}t||  tj| d S )NrQ   rR   r   c                    s     | } ||}||fS r&   r.   rG   )rQ   rR   
all_reducermsnormr*   r   r   patternR   s   
z6FirstAllReduceRMSNormPattern.register.<locals>.patternc                    s(     | } ||} |}||fS r&   )r9   rG   r<   )rQ   rR   r6   rW   r;   rX   r   r   replacement[   s   

z:FirstAllReduceRMSNormPattern.register.<locals>.replacementr   r    tuplepmregister_replacementrS   fwd_onlyr*   rT   rY   rZ   r   rX   r   registerQ   s"   	
z%FirstAllReduceRMSNormPattern.registerr=   r>   r?   rA   r   r$   rB   r+   listr    rS   r   ra   __classcell__r   r   rH   r   rD   F   s    $rD   c                       rC   )MiddleAllReduceRMSNormPatternr#   r$   r%   Nr   c                    rE   r&   )rF   r+   r   rG   r)   rH   r   r   r+   k   rJ   z&MiddleAllReduceRMSNormPattern.__init__c                 C   sR   t jddg| j| jd}t jddg| j| jd}t jddg| j| jd}|||gS )NrM   rN   rO   )r*   mm_1residualrms_norm_weightsr   r   r   rS   o   s   z(MiddleAllReduceRMSNormPattern.get_inputsrT   c              
      s   dt jdt jdt jdtt jt jf f fdd}dt jdt jdt jdtt jt jf f fdd}t||  tj| tt|t|  tj| d S )	Nrg   rf   rh   r   c                    s(     |} ||| }|d |d fS )Nr   r   rU   )rg   rf   rh   rV   rW   rX   r   r   rY   |   s   
z7MiddleAllReduceRMSNormPattern.register.<locals>.patternc                    sH     |}| d|ddf }  ||| } |d }||d fS )Nr   .r   )r9   sizerG   r<   )rg   rf   rh   r6   rW   r;   rX   r   r   rZ      s
   
z;MiddleAllReduceRMSNormPattern.register.<locals>.replacementr   r    r\   r]   r^   rS   r_   r!   r`   r   rX   r   ra   {   s8   	z&MiddleAllReduceRMSNormPattern.registerrb   r   r   rH   r   re   j   s    $re   c                       rC   )%FirstAllReduceRMSNormStaticFP8Patternr#   r$   r%   Nr   c                    (   t  ||| t|| _tt| _d S r&   )rF   r+   r   rG   r   r   quant_matcherr)   rH   r   r   r+      s   
z.FirstAllReduceRMSNormStaticFP8Pattern.__init__c                 C   sL   t jg d| j| jd}t jdg| j| jd}t jd| jt jd}|||gS )NrK   rN   rM   g      ?)r   zerosr%   r$   rP   tensorfloat32)r*   rQ   weightscaler   r   r   rS      s   
z0FirstAllReduceRMSNormStaticFP8Pattern.get_inputsrT   c              
      s|   dt jdt jdt jdtt jt jf f fdd}dt jdt jdt jdtt jt jf f fdd}t||  tj| d S )	NrQ   rq   rr   r   c                    s.     | } ||} ||\}}||fS r&   r.   rG   rm   )rQ   rq   rr   rV   rmsquant_rX   r   r   rY      s   
z?FirstAllReduceRMSNormStaticFP8Pattern.register.<locals>.patternc                    s8     | } ||} ||\}} |}||fS r&   )r9   rG   rm   r<   )rQ   rq   rr   r6   rt   ru   rv   r;   rX   r   r   rZ      s
   

zCFirstAllReduceRMSNormStaticFP8Pattern.register.<locals>.replacementr[   r`   r   rX   r   ra      s*   
z.FirstAllReduceRMSNormStaticFP8Pattern.registerrb   r   r   rH   r   rk      s    
rk   c                       rC   )&MiddleAllReduceRMSNormStaticFP8Patternr#   r$   r%   Nr   c                    rl   r&   )rF   r+   r   rG   r   r   rm   r)   rH   r   r   r+      s   
z/MiddleAllReduceRMSNormStaticFP8Pattern.__init__c                 C   sl   t jddg| j| jd}t jddg| j| jd}t jddg| j| jd}t jddg| jt jd}||||gS )NrM   rN   r   )r   rP   r%   r$   rp   )r*   rf   rg   rh   rr   r   r   r   rS      s
   z1MiddleAllReduceRMSNormStaticFP8Pattern.get_inputsrT   c                    s   dt jdt jdt jdt jdtt jt jf f
 fdd}dt jdt jdt jdt jdtt jt jf f
 fdd	}t||  tj| tt|t|  tj| d S )
Nrg   rf   rh   rr   r   c           	         s4     |} ||| \}} ||\}}||fS r&   rs   )	rg   rf   rh   rr   rV   rt   residual_outru   rv   rX   r   r   rY      s   
z@MiddleAllReduceRMSNormStaticFP8Pattern.register.<locals>.patternc           
         sT     |}| d|ddf }  ||| \}} ||\}} |}	|	|fS )Nr   .)r9   ri   rG   rm   r<   )
rg   rf   rh   rr   r6   rt   rx   ru   rv   r;   rX   r   r   rZ      s   


zDMiddleAllReduceRMSNormStaticFP8Pattern.register.<locals>.replacementrj   r`   r   rX   r   ra      s@   z/MiddleAllReduceRMSNormStaticFP8Pattern.registerrb   r   r   rH   r   rw      s    $rw   c                       sZ   e Zd ZdZededdf fddZdedefdd	Z	e
jd
ejddfddZ  ZS )SequenceParallelismPassa  
    This pass enables sequence parallelism for models.
    It identifies patterns where an AllReduce operation is followed by
    an RMSNorm (or RMSNorm and then Quantization) operation.
    These patterns are replaced with a ReduceScatter operation, followed by
    a local RMSNorm/Quantization, and then an AllGather operation.

    The general transformation is:
    Input -> AllReduce -> RMSNorm -> Output
    becomes
    Input -> ReduceScatter -> RMSNorm -> AllGather -> Output

    While this pass itself does not directly yield performance improvements,
    it lays the groundwork for subsequent fusion passes, such as
    GEMM + ReduceScatter and AllGather + GEMM fusions. These fusions can
    significantly reduce communication overhead and improve overall model
    performance.


    This pass splits up the residual tensor across TP ranks and hence divides its size.
    Because the pattern matcher starts at the end of the graph, the replacement
    contains a slice that temporarily conforms the input residual to the correct size.
    After all patterns have been matched, we use a NoOpEliminationPass to clean up
    what have now become no-op slices.

    Note that an older version of the pass did not need this as it operated only on
    custom rms_norm and fused_rms_norm_add custom ops which did not complain about
    mismatched shapes during replacement. So this approach has the same assumption that
    correctness is only maintained if all rms_norm operations are split across ranks.

    Correctness-wise, this is approach strictly better than before - before,
    the graph was incorrect semantically and shape-wise during the pass.
    With this approach there's only semantic incorrectness during the pass.
    Both approaches restore a correct graph once all patterns are matched.
    configr   Nc                    s   t  | t|| _| j d| jj | j_tdd| _dD ]2}t|| j| j	
| j t|| j| j	
| j t|| j| j	
| j t|| j| j	
| j q| || j d S )N.sequence_parallelism_pass)	pass_name)gh㈵>gư>)rF   r+   r   noop_cleanupr}   r   patternsrk   model_dtyper%   ra   rw   rD   re   dump_patterns)r*   rz   r#   rH   r   r   r+   4  s0   







z SequenceParallelismPass.__init__compile_rangec                 C   s4   | j jr| j jr
dS t }| o|j| dk}|S )NTr   )compilation_configsplitting_opsuse_inductor_graph_partitionr
   is_single_sizeend)r*   r   r(   resultr   r   r   is_applicable_for_rangeU  s   z/SequenceParallelismPass.is_applicable_for_rangegraphc                 C   s*   | j || _td| j | | d S )NzReplaced %s patterns)r   applymatched_countloggerdebugr~   )r*   r   r   r   r   __call__n  s   z SequenceParallelismPass.__call__)r=   r>   r?   r@   r   r   r+   r   boolr   r   time_and_logfxGraphr   rd   r   r   rH   r   ry     s    $ ry   )4r   collections.abcr   r   typingr   r   torch._inductor.pattern_matcher	_inductorpattern_matcherr]   torch.fxr   r   vllm.configr   vllm.config.utilsr   vllm.distributedr   r	   vllm.distributed.parallel_stater
   vllm.loggerr   9vllm.model_executor.layers.quantization.utils.quant_utilsr   vllm.platformsr   inductor_passr   matcher_utilsr   r   r   noop_eliminationr   vllm_inductor_passr   r   r=   r   r    r!   r"   rD   re   	fp8_dtype	FP8_DTYPErk   rw   ry   r   r   r   r   <module>   s>   

$7->