o
    پi/,                     @   s   d dl mZ d dlmZ d dlmZ d dlmZmZm	Z	 d dl
mZmZ d dlmZmZmZmZ d dlmZmZmZmZ d dlmZ G d	d
 d
eZdedefddZeddG dd deZG dd deZeddG dd deZdS )    )Optional)	dataclassN)BooleanInt32
const_expr)if_generateand_)MbarrierArrayCooperativeGroup
PipelineOppipeline_init_wait)PipelineAsyncPipelineTmaAsyncPipelineStatePipelineUserType)PipelineTmaUmmac                   @   s"   e Zd ZdefddZdd ZdS )PipelineStateWAdvancenum_iterationsc                 C   sH   |  j t|7  _ | jt| }|| j }|  j|N  _|| j | _d S N)_countr   _indexstages_phase)selfr   	new_indexnum_crossings r   B/home/ubuntu/.local/lib/python3.10/site-packages/quack/pipeline.pyadvance_iters   s
   
z#PipelineStateWAdvance.advance_itersc                 C   s(   t | jt|d t|d t|d S )Nr         )r   r   r   r   valuesr   r   r   __new_from_mlir_values__   s   "z.PipelineStateWAdvance.__new_from_mlir_values__N)__name__
__module____qualname__r   r   r#   r   r   r   r   r      s    	r   typer   c                 C   sP   | t ju rt|tdtdtdS | t ju r$t|tdtdtdS J d)zz
    Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.
    r   r   FzBError: invalid PipelineUserType specified for make_pipeline_state.)r   Producerr   r   Consumer)r'   r   r   r   r   make_pipeline_state   s   

r*   T)frozenc                   @      e Zd ZdZedddddededededejd	e	ej
 d
e	e fddZ		ddede	e de	e fddZdefddZdS )PipelineTmaCpAsynczZ
    PipelineTmaCpAsync is used for CpAsync + TMA producers and AsyncThread consumers
    N)barrier_storagecta_layout_vmnktidx
num_stagesproducer_groupconsumer_grouptx_countr.   r/   r0   c                 C   s   t |tjstdt| tj}tj}||f}	||f}
t	|j
dd| |	|}t	|j
dd|  | |
}|du rBtj \}}}|du rKtd}t||\}}|du s^t|dkrad}n|}d}t| t||| |||S )a  
        This helper function computes any necessary attributes and returns an instance of PipelineTmaAsync.
        :param barrier_storage: Pointer to the smem address for this pipeline's mbarriers
        :type barrier_storage: cute.Pointer
        :param num_stages: Number of buffer stages for this pipeline
        :type num_stages: Int32
        :param producer_group: CooperativeGroup for the producer agent
        :type producer_group: CooperativeGroup
        :param consumer_group: CooperativeGroup for the consumer agent
        :type consumer_group: CooperativeGroup
        :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage
        :type tx_count: int
        :param cta_layout_vmnk: Layout of the cluster shape
        :type cta_layout_vmnk: cute.Layout | None
        :param tidx: thread index to consumer async threads
        :type tidx: Int32 | None
        7Expected barrier_storage to be a cute.Pointer, but got    	min_alignN)r   r   r   r   r   )
isinstancecutePointer
ValueErrorr'   r   TmaLoadAsyncThreadr   _make_sync_objectalignarch
thread_idxmake_layoutr    init_empty_barrier_arrive_signalsizer   r-   )r1   r2   r3   r4   r.   r/   r0   producer_typeconsumer_typeproducerconsumersync_object_fullsync_object_empty_dst_rankis_signalling_threadproducer_maskr   r   r   create:   sF   

zPipelineTmaCpAsync.createTstatetry_acquire_tokenis_tma_warpc                    s8   t |du p|dk fdd t | fdd dS )zk
        TMA producer commit conditionally waits on buffer empty and sets the transaction barrier.
        Nr   c                          j jjS r   rK   waitindexphaser   r   rQ   r   r   <lambda>       z5PipelineTmaCpAsync.producer_acquire.<locals>.<lambda>c                          j j jS r   rJ   arriverW   rO   r   rY   r   r   rZ      r[   )r   r   rQ   rR   rS   r   rY   r   producer_acquire   s   	z#PipelineTmaCpAsync.producer_acquirec                 C      t j| | dS zJ
        We need the mbarrier to track the completion of cp.async
        Nr:   rA   cp_async_mbarrier_arrive_noincproducer_get_barrierrY   r   r   r   producer_cpasync_commit      z*PipelineTmaCpAsync.producer_cpasync_commitNTr$   r%   r&   __doc__staticmethodintr
   r:   r;   r   Layoutr   rP   r   r   r`   rf   r   r   r   r   r-   4   s>    I
r-   c                   @   sR   e Zd Z		ddejdedeeef dede	e
 ddfd	d
Zdd Zdd ZdS )MbarrierArrayWDropCountr   Nr.   r1   agentr4   
drop_countreturnc                 C   s   || _ || _|| _|\| _| _| jj| _|| _| jdkr td| jdkr)td| jt	j
u r8| jdk r8tdt|d urD| j| | _| j | _|   d S )Nr   z3Error: Mbarrier stage count must be greater than 0.z4Error: Mbarrier arrive count must be greater than 0.z=Error: Mbarrier tx count must not be less than 0 for TMA ops.)r.   r4   r1   op_typecgrE   arrive_countrp   r<   r   r=   r   mbarrier_basembarrier_init)r   r.   r1   ro   r4   rp   r   r   r   __init__   s    


z MbarrierArrayWDropCount.__init__c                 C   s   | j | jgS r   )r.   rp   )r   r   r   r   __extract_mlir_values__   s   z/MbarrierArrayWDropCount.__extract_mlir_values__c                 C   s$   t |d | j| j| jf| j|d S )Nr   r   )rn   r1   rr   rs   r4   r!   r   r   r   r#      s   z0MbarrierArrayWDropCount.__new_from_mlir_values__)r   N)r$   r%   r&   r:   r;   rl   tupler   r
   r   r   rw   rx   r#   r   r   r   r   rn      s$    

rn   c                   @   r,   )PipelineTmaCpAsyncUmmazr
    PipelineTmaCpAsync is used for CpAsync + TMA producers and UMMA consumers
    (e.g. Blackwell mainloops)
    N)r.   r/   producer_drop_countr1   r2   r3   r4   r.   r/   r{   c                 C   s   t |tjstdt| tj}tj}||f}	||f}
t|j	dd| |	||d}t
|j	dd|  | |
}|du sBt|dkrGd}d}n
t|}t|}|du s_tj|dgd	dkretjjjjntjjjj}|}t| t||| ||||S )
aB  
        This helper function computes any necessary attributes and returns an instance of PipelineTmaUmma.
        :param barrier_storage: Pointer to the smem address for this pipeline's mbarriers
        :type barrier_storage: cute.Pointer
        :param num_stages: Number of buffer stages for this pipeline
        :type num_stages: Int32
        :param producer_group: `CooperativeGroup` for the producer agent
        :type producer_group: CooperativeGroup
        :param consumer_group: `CooperativeGroup` for the consumer agent
        :type consumer_group: CooperativeGroup
        :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage
        :type tx_count: int
        :param cta_layout_vmnk: Layout of the cluster shape
        :type cta_layout_vmnk: cute.Layout | None
        r5   r6   r7   )rp   Nr   Tr   )mode)r9   r:   r;   r<   r'   r   r=   
TCGen05Mmarn   r@   r   r?   rE   r   _compute_mcast_arrival_mask_compute_is_leader_ctanvgputcgen05CtaGroupONETWOr   rz   )r1   r2   r3   r4   r.   r/   r{   rF   rG   rH   rI   rJ   rK   rO   is_leader_cta	cta_groupconsumer_maskr   r   r   rP      sL   



zPipelineTmaCpAsyncUmma.createTrQ   rR   rS   c                    s@   t |du p|dk fdd t t j| fdd dS )z
        TMA producer commit conditionally waits on buffer empty and sets the
        transaction barrier for leader threadblocks.
        Nr   c                      rT   r   rU   r   rY   r   r   rZ   %  r[   z9PipelineTmaCpAsyncUmma.producer_acquire.<locals>.<lambda>c                      r\   r   r]   r   rY   r   r   rZ   +  r[   )r   r   r   r_   r   rY   r   r`     s   

z'PipelineTmaCpAsyncUmma.producer_acquirec                 C   ra   rb   rc   rY   r   r   r   rf   .  rg   z.PipelineTmaCpAsyncUmma.producer_cpasync_commitrh   ri   r   r   r   r   rz      s>    O
rz   )typingr   dataclassesr   cutlass.cuter:   cutlassr   r   r   cutlass.cutlass_dslr   r   cutlass.pipeliner	   r
   r   r   r   r   r   r   r   r   rl   r*   r-   rn   rz   r   r   r   r   <module>   s   g)