o
    c۷i)                     @   s   d dl mZ d dlmZ d dlmZ d dlmZmZm	Z	 d dl
mZmZmZ d dlmZmZmZ d dlmZmZmZ d dlmZ d d	lmZmZ G d
d deZdedefddZeddG dd deZG dd deZeddG dd deZdS )    )Optional)	dataclassN)BooleanInt32
const_expr)if_generateand_dsl_user_op)MbarrierArrayCooperativeGroup
PipelineOp)PipelineTmaAsyncPipelineStatePipelineUserType)PipelineTmaUmma)Agent
agent_syncc                   @   s.   e Zd ZeddddefddZdd ZdS )PipelineStateWAdvanceNlocipnum_iterationsc                C   sH   |  j t|7  _ | jt| }|| j }|  j|N  _|| j | _d S N)_countr   _indexstages_phase)selfr   r   r   	new_indexnum_crossings r    D/home/ubuntu/vllm_env/lib/python3.10/site-packages/quack/pipeline.pyadvance_iters   s
   
z#PipelineStateWAdvance.advance_itersc                 C   s(   t | jt|d t|d t|d S )Nr         )r   r   r   r   valuesr    r    r!   __new_from_mlir_values__   s   "z.PipelineStateWAdvance.__new_from_mlir_values__)__name__
__module____qualname__r	   r   r"   r'   r    r    r    r!   r      s    	r   typer   c                 C   sP   | t ju rt|tdtdtdS | t ju r$t|tdtdtdS J d)zz
    Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.
    r   r#   FzBError: invalid PipelineUserType specified for make_pipeline_state.)r   Producerr   r   Consumer)r+   r   r    r    r!   make_pipeline_state    s   

r.   T)frozenc                
   @   sf   e Zd ZdZedd Ze		dddddedee	 d	ee	 fd
dZ
eddddefddZdS )PipelineTmaCpAsynczZ
    PipelineTmaCpAsync is used for CpAsync + TMA producers and AsyncThread consumers
    c                  O   s"   t j| i |}t|dt |S )N	__class__)r   createobject__setattr__r0   )argskwargsobjr    r    r!   r2   <   s   zPipelineTmaCpAsync.createNTr   statetry_acquire_tokenis_tma_warpc                   sL   t |du p|dk fdd d t | fdd d dS )zk
        TMA producer commit conditionally waits on buffer empty and sets the transaction barrier.
        Nr   c                         j jjj dS Nr   sync_object_emptywaitindexphaser    r   r   r   r8   r    r!   <lambda>S       z5PipelineTmaCpAsync.producer_acquire.<locals>.<lambda>r   c                         j jjj dS r<   sync_object_fullarriver@   producer_maskr    rB   r    r!   rC   [   rD   )r   r   r8   r9   r:   r   r   r    rB   r!   producer_acquireD   s   
z#PipelineTmaCpAsync.producer_acquirec                C   "   t jj| j|||d||d dS zJ
        We need the mbarrier to track the completion of cp.async
        r   Ncutearchcp_async_mbarrier_arrive_noincproducer_get_barrierr   r8   r   r   r    r    r!   producer_cpasync_commit`      
z*PipelineTmaCpAsync.producer_cpasync_commitNT)r(   r)   r*   __doc__staticmethodr2   r	   r   r   r   rK   rT   r    r    r    r!   r0   6   s&    
r0   c                   @   s^   e Zd Ze		dddddejdedeee	f dede
e d	dfd
dZdd Zdd ZdS )MbarrierArrayWDropCountr   Nr   barrier_storage
num_stagesagenttx_count
drop_countreturnc                C   s   || _ || _|| _|\| _| _| jj| _|| _| jdkr td| jdkr)td| jt	j
u r8| jdk r8tdt|d urD| j| | _| j | _| j||d d S )Nr   z3Error: Mbarrier stage count must be greater than 0.z4Error: Mbarrier arrive count must be greater than 0.z=Error: Mbarrier tx count must not be less than 0 for TMA ops.r   )rZ   r]   r[   op_typecgsizearrive_countr^   
ValueErrorr   TmaLoadr   mbarrier_basembarrier_init)r   rZ   r[   r\   r]   r^   r   r   r    r    r!   __init__k   s    


z MbarrierArrayWDropCount.__init__c                 C   s   | j | jgS r   )rZ   r^   )r   r    r    r!   __extract_mlir_values__   s   z/MbarrierArrayWDropCount.__extract_mlir_values__c                 C   s$   t |d | j| j| jf| j|d S )Nr   r#   )rY   r[   r`   ra   r]   r%   r    r    r!   r'      s   z0MbarrierArrayWDropCount.__new_from_mlir_values__)r   N)r(   r)   r*   r	   rO   Pointerinttupler   r   r   r   rh   ri   r'   r    r    r    r!   rY   j   s,    

"rY   c                   @   s   e Zd ZdZeedddddddddededed	ed
ej	de
ej deeef dede
e fddZe		dddddede
e de
e fddZeddddefddZdS )PipelineTmaCpAsyncUmmazr
    PipelineTmaCpAsync is used for CpAsync + TMA producers and UMMA consumers
    (e.g. Blackwell mainloops)
    N)r#   r#   F)rZ   cta_layout_vmnkmcast_mode_mn
defer_syncproducer_drop_countr   r   r[   producer_groupconsumer_groupr]   rZ   rn   ro   rp   rq   c              	   C   sT  t |tjstdt| tj}tj}||f}||f}t|j	dd| ||||	|
d}t
j|j	dd|  | ||	|
d}|du sJtj||	|
ddkrOd}d}nt
j|||	|
d}t
j||	|
d}|du sptj|d	g|	|
d
dkrvtjjjjntjjjj}|}|stj  |du stj||	|
ddkrttj nttjdd t||| ||||S )a  Creates and initializes a new PipelineTmaUmma instance.

        :param num_stages: Number of buffer stages for this pipeline
        :type num_stages: int
        :param producer_group: CooperativeGroup for the producer agent
        :type producer_group: CooperativeGroup
        :param consumer_group: CooperativeGroup for the consumer agent
        :type consumer_group: CooperativeGroup
        :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage
        :type tx_count: int
        :param barrier_storage: Pointer to the shared memory address for this pipeline's mbarriers
        :type barrier_storage: cute.Pointer, optional
        :param cta_layout_vmnk: Layout of the cluster shape
        :type cta_layout_vmnk: cute.Layout, optional
        :param mcast_mode_mn: Tuple specifying multicast modes for m and n dimensions (each 0 or 1)
        :type mcast_mode_mn: tuple[int, int], optional
        :raises ValueError: If barrier_storage is not a cute.Pointer instance
        :return: A new PipelineTmaUmma instance configured with the provided parameters
        :rtype: PipelineTmaUmma
        z7Expected barrier_storage to be a cute.Pointer, but got    )	min_align)r^   r   r   r   Nr#   Tr   )moder   r   )
is_relaxed)
isinstancerO   rj   	TypeErrorr+   r   re   
TCGen05MmarY   alignr   _make_sync_objectrb   _compute_mcast_arrival_mask_compute_is_leader_ctanvgputcgen05CtaGroupONETWOrP   mbarrier_init_fencer   r   ThreadBlockThreadBlockClusterrm   )r[   rr   rs   r]   rZ   rn   ro   rp   rq   r   r   producer_typeconsumer_typeproducerconsumerrG   r>   rI   is_leader_cta	cta_groupconsumer_maskr    r    r!   r2      sd   $
	 

zPipelineTmaCpAsyncUmma.createTr   r8   r9   r:   c                   sT   t |du p|dk fdd d t tj| fdd d dS )z
        TMA producer commit conditionally waits on buffer empty and sets the
        transaction barrier for leader threadblocks.
        Nr   c                      r;   r<   r=   r    rB   r    r!   rC     rD   z9PipelineTmaCpAsyncUmma.producer_acquire.<locals>.<lambda>r   c                      rE   r<   rF   r    rB   r    r!   rC     rD   )r   r   r   rJ   r    rB   r!   rK     s   

z'PipelineTmaCpAsyncUmma.producer_acquirec                C   rL   rM   rN   rS   r    r    r!   rT     rU   z.PipelineTmaCpAsyncUmma.producer_cpasync_commitrV   )r(   r)   r*   rW   r	   rX   rk   r   rO   rj   r   Layoutrl   boolr   r2   r   r   rK   rT   r    r    r    r!   rm      sZ    
	
brm   )typingr   dataclassesr   cutlass.cuterO   cutlassr   r   r   cutlass.cutlass_dslr   r   r	   cutlass.pipeliner
   r   r   r   r   r   r   r   r   r   rk   r.   r0   rY   rm   r    r    r    r!   <module>   s    3-