o
    i                     @   s0  d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZ d dlmZmZ d dlZd dlmZ d dlmZmZmZ d d	lmZ d
dlmZ d
dlmZ d
dl m!Z! d
dl"m#Z#m$Z$m%Z% G dd deZ&e 'e(Z)e)*e j+ eG dd dZ,eG dd dZ-e$ G dd deZ.G dd deZ/e$ G dd de/Z0e$ G dd de/Z1e%ddej2ddfd ej3d!ed"e!d#e4e4e5  d$ej6d%e7d&ee5 fd'd(Z8eG d)d* d*Z9e%d+d, Z:e$ G d-d. d.Z;e0e1d/Z<e$ G d0d1 d1Z=G d2d3 d3Z>dS )4    N)ABCabstractmethod)deque)	dataclassfield)Enum)partial)OptionalUnion)profilescheduletensorboard_trace_handler)tqdm   )Cache)PretrainedConfig)GenerationConfig)ContinuousBatchProcessorMetricsattach_tracertracedc                   @   s,   e Zd ZdZdZdZdZdZdZdZ	dZ
d	S )
RequestStatusz5Status of a generation request through its lifecycle.pending
prefillingprefilling_splitsplit_pending_remainderdecodingfinishedfailedN)__name__
__module____qualname____doc__PENDING
PREFILLINGPREFILLING_SPLITSPLIT_PENDING_REMAINDERDECODINGFINISHEDFAILED r)   r)   n/home/ubuntu/maya3_transcribe/venv/lib/python3.10/site-packages/transformers/generation/continuous_batching.pyr   '   s    r   c                   @   s   e Zd ZU dZeed< eedZee	 ed< eedZ
ee	 ed< eedZee ed< dZee ed< ejZeed	< eejdZeed
< dS )GenerationOutputa  Tracks the output of a generation request.

    Attributes:
        request_id (str): The ID of the generation request.
        prompt_ids (list[int]): The IDs of the prompt tokens.
        generated_tokens (list[int]): The generated tokens.
        logprobs (list[float]): The log probabilities of the generated tokens.
        error (Optional[str]): Any error message associated with the request. When None, the request was successful.
    
request_iddefault_factory
prompt_idsgenerated_tokenslogprobsNerrorstatuscreated_time)r   r   r    r!   str__annotations__r   listr/   intr0   r1   floatr2   r	   r   r"   r3   timer4   r)   r)   r)   r*   r+   8   s   
 
r+   c                   @   s  e Zd ZU dZeed< dZeee	  ed< dZ
eee	  ed< eedZee	 ed< eedZee	 ed< eedZee	 ed	< d
Ze	ed< ejZeed< dZe	ed< dZe	ed< eejdZeed< dZee ed< de	fddZde	fddZede	defddZdd Zdd ZdS )RequestStatezTracks the state of a generation request through its lifecycle.

    Attributes:
        status (RequestStatus): can be one of PENDING, PREFILLING, PREFILLING_SPLIT,
                                SPLIT_PENDING_REMAINDER, DECODING, FINISHED, FAILED
    r,   Nr/   full_prompt_idsr-   remaining_prompt_idsstatic_outputsallocated_blocksr   position_offsetr3      max_new_tokenseos_token_idr4   r2   returnc                 C   s   | j S )zCGet the current length of the sequence (prompt + generated tokens).)r@   selfr)   r)   r*   current_lend   s   zRequestState.current_lenc                 C   
   t | jS )z*Get the number of tokens generated so far.)lenr>   rF   r)   r)   r*   generated_lenh      
zRequestState.generated_lentoken_idc                 C   s`   | j tjkrdS || jko| jdk}|  | jk}|r|r$| j|g |s(|r.tj| _ dS dS )zUpdate the request with a newly generated token and check for completion.

        Args:
            token_id: The token ID to add to the output sequence

        Returns:
            bool: True if the request is now complete, False otherwise
        FrC   T)	r3   r   r&   rD   rK   rB   r>   extendr'   )rG   rM   is_eos
is_max_lenr)   r)   r*   update_with_tokenl   s   zRequestState.update_with_tokenc                 C   s\   d| j  d| j d|   dt| j dt| j d| j dt| j d| j d	| j	 d
S )NzRequestState(
	request_id=z
,
	status=z,
	out_tokens=z,
	query_length=z, 
	remaining_tokens=z, 
	kv_length=z
	full_prompt_lenght=z,
	allocated_blocks=z,
	generated_tokens=z
))
r,   r3   rK   rJ   r/   r=   r@   r<   r?   r>   rF   r)   r)   r*   __repr__   s   \zRequestState.__repr__c                 C   s   t | j| j| j| jg | jdS )z7Convert the request state to a GenerationOutput object.)r,   r/   r3   r0   r1   r2   )r+   r,   r<   r3   r>   r2   rF   r)   r)   r*   to_generation_output   s   z!RequestState.to_generation_output) r   r   r    r!   r5   r6   r/   r	   r7   r8   r<   r   r=   r>   r?   r@   r   r"   r3   rB   rD   r:   r4   r9   r2   rH   rK   r   boolrQ   rR   rS   r)   r)   r)   r*   r;   M   s(   
 r;   c                   @   s  e Zd Zejddfdededejdejde	e
eeeejef f  de	eee   ddfd	d
Zedededee fddZededdfddZdefddZdedee fddZededee dee fddZedejdejdedeejejf fddZdS )PagedAttentionCacheNconfiggeneration_configdevicedtypelayer_device_mapinitial_prompt_shapesrE   c                 C   st  t |dddu r|jn|j| _t|dr|jn|j|j | _|j| _t |dd}t |dd}|du s6|du rTtd t	||||pBg |dd\}}td	| d
|  || _
|| _| j|| j
| jf| _|| _|| _g | _g | _t|jD ]8}	|dur||	 n|}
tj| j| j|
d}tj| j| j|
d}tj| tj| | j| | j| qutt|| _i | _dS )a  Initialize a paged attention cache for efficient memory usage.

        Args:
            config: Model configuration
            generation_config: Generation configuration containing cache parameters
            device: Device for the cache tensors
            dtype: Data type for the cache tensors
            layer_device_map: Optional mapping of layer indices to devices
            initial_prompt_shapes: Optional sample prompts to help calculate optimal cache size
        num_key_value_headsNhead_dim
num_blocks
block_sizez,Calculating optimal block size and number...   )median_prefill_lengthzUsing calculated num_blocks=z, block_size=rY   rX   )getattrnum_attention_headsr\   hasattrr]   hidden_sizenum_hidden_layersloggerinfocompute_optimal_blocksr_   r^   cache_shaperY   rX   	key_cachevalue_cacherangetorchzeros_dynamomark_static_addressappendr   _free_blocks_block_tables)rG   rV   rW   rX   rY   rZ   r[   r^   r_   idxlayer_devicenew_layer_key_cachenew_layer_value_cacher)   r)   r*   __init__   s@   


zPagedAttentionCache.__init__n_blocksr,   c                 C   s\   t | j|k r	dS g }t|D ]
}|| j  q|| jvr$g | j|< | j| | |S )z*Allocates n_blocks for a given request_id.F)rJ   rt   rn   rs   popleftru   rN   )rG   r{   r,   	allocated_r)   r)   r*   allocate_blocks   s   

z#PagedAttentionCache.allocate_blocksc                 C   s:   || j v r| j |}| j| dS td|  dS )z.Frees all blocks associated with a request_id.z6Attempted to free blocks for non-existent request_id: N)ru   poprt   rN   rh   warning)rG   r,   blocks_to_freer)   r)   r*   free_blocks   s   
zPagedAttentionCache.free_blocksc                 C   rI   )z,Returns the number of free blocks available.)rJ   rt   rF   r)   r)   r*   get_num_free_blocks   rL   z'PagedAttentionCache.get_num_free_blocksc                 C   s   | j |g S )z&Returns the block table for a request.)ru   getrG   r,   r)   r)   r*   get_block_table      z#PagedAttentionCache.get_block_tablestatelogical_indicesc                 C   s   |j }| j|}|std| | j}g }|D ],}|| }|| }	|t|kr6td| d| d| || }
|
| |	 }|| q|S )a  
        Maps logical sequence indices to physical cache indices using the block table, using PyTorch.

        Args:
            request_id: The request ID.
            logical_indices: A list of logical indices.

        Returns:
            A list of physical indices.

        Raises:
            ValueError: If no block table is found for the request ID.
            IndexError: If a logical index maps to a block index that is out of bounds.
        z!No block table found for request zLogical index z maps to block index z$ which is out of bounds for request )r,   ru   r   
ValueErrorr_   rJ   
IndexErrorrs   )rG   r   r   r,   block_tabler_   physical_indicesrv   	block_idxblock_offsetphysical_block_numphysical_indexr)   r)   r*   _get_physical_indices   s&   z)PagedAttentionCache._get_physical_indices
key_statesvalue_states	layer_idxc           
      K   s   | j | j }| j| | j|| j}| j| | j|| j}	|d |d d |d d f< |d |	d d |d d f< |d d d |d d f |	d d d |d d f fS Nr   )r^   r_   rl   viewr\   r]   rm   )
rG   r   r   r   
read_indexwrite_indexkwargstotal_slotsk_cache_flatv_cache_flatr)   r)   r*   update   s   0zPagedAttentionCache.update)r   r   r    ro   float16r   r   rX   rY   r	   dictr8   r
   r5   r7   rz   r   r   r   r   r   r;   r   Tensortupler   r)   r)   r)   r*   rU      sJ    
@ 'rU   c                   @   s   e Zd ZdZddedefddZedefdd	Z	ed
e
dee fddZedefddZeddedefddZededee
 fddZdS )	Schedulerz
    Abstract base class for scheduling requests in the continuous batch processor.
    It is expected that cache allocation and scheduling logic will be implemented in subclasses.
    Fcacheretain_cache_on_finishc                 C   s$   i | _ i | _t | _|| _|| _d S N)active_requestswaiting_requestsr   waiting_requests_orderr   r   )rG   r   r   r)   r)   r*   rz   9  s
   
zScheduler.__init__r   c                 C      dS z"Add a request to the waiting list.Nr)   )rG   r   r)   r)   r*   add_waiting_request@     zScheduler.add_waiting_requesttoken_budgetrE   c                 C   s   d S r   r)   )rG   r   r)   r)   r*   schedule_batchE  s   zScheduler.schedule_batchc                 C   s   | j p| jS )z2Check if there are requests ready to be processed.)r   r   rF   r)   r)   r*   has_pending_requestsI  s   zScheduler.has_pending_requestsTr,   evict_from_cachec                 C   r   )z:Finish processing a request and free its allocated blocks.Nr)   rG   r,   r   r)   r)   r*   finish_requestN  r   zScheduler.finish_requestc                 C   s   || j v r| j | jS g S r   )r   r>   r   r)   r)   r*   !get_active_request_static_outputsS  s   
z+Scheduler.get_active_request_static_outputsNFT)r   r   r    r!   rU   rT   rz   r   r;   r   r8   r7   r   r   r   r5   r   r   r)   r)   r)   r*   r   3  s    r   c                   @      e Zd ZededefddZedddededee fd	d
Z	edefddZ
ededee fddZeddedefddZdS )FIFOSchedulerr   len_next_tokensc                 C   t   |  }t|j| jj | }||k st|jdkr8|| d | jj d }| j||j}|s2dS |j| dS Nr      FTrH   rJ   r?   r   r_   r   r,   rN   rG   r   r   rH   	occupancyblocks_neededr}   r)   r)   r*   _allocate_blocks_if_needed\     z(FIFOScheduler._allocate_blocks_if_neededprepare_request	span_namer   "request_ids_to_remove_from_waitingc                 C      |j tjkr	|jn|j}t||k r?|j tjkr*|| j|j< tj	|_ |
|j dS |j tjkr=tj	|_ |j|_g |_dS dS |j tjkrV|| j|j< tj|_ |
|j n
|j tjkr`tj|_ ||d |_|d| |_dS z6Prepare a request for processing in the current batch.Nr3   r   r%   r=   r/   rJ   r"   r   r,   r#   addr$   rG   r   r   r   request_tokensr)   r)   r*   _prepare_request_for_processingj  (   
z-FIFOScheduler._prepare_request_for_processingc                 C   d   | j r#|j| jv r#| j|j}|jt|jd |_|j|_|j|_|| j	|j< | j
|j dS r   r   r,   r   r   r/   rJ   r<   r?   r@   r   r   rs   rG   r   	old_stater)   r)   r*   r        z!FIFOScheduler.add_waiting_requestrE   c           
         s,  g }g }g j  D ]}|jtjkr|| |jtjkr#|| qjD ]
}|j|  q'|| }t	  |D ]K}
||  t|j}|t|js]tjjdkr\ n+q;tdtffdd}|| ||8 }tdtf fdd}	|	| |dkr nq;t fddjD _S )	Nr   r   c                         |  d S r   rs   r   scheduled_requestsr)   r*   _add_to_scheduled_requests  r   z@FIFOScheduler.schedule_batch.<locals>._add_to_scheduled_requestsc                    *   | j }|jv rj|=  | d S d S r   r,   r   r   r   req_idr   rG   r)   r*   _remove_from_waiting_requests  
   
zCFIFOScheduler.schedule_batch.<locals>._remove_from_waiting_requestsc                       g | ]}| vr|qS r)   r)   .0r   r   r)   r*   
<listcomp>      z0FIFOScheduler.schedule_batch.<locals>.<listcomp>)r   valuesr3   r   r&   rs   r%   r   r   setr   rJ   r/   r   r   rt   r   r;   r   
rG   r   priority_statessecond_priority_statesr   r   
candidatesrequest_lenr   r   r)   r   r   rG   r*   r     sH   




zFIFOScheduler.schedule_batchTr,   r   c                 C   .   |r| j | || jv r| j|= d S d S d S r   r   r   r   r   r)   r)   r*   r        
zFIFOScheduler.finish_requestNr   r   r   r    r   r;   r8   r   r   r5   r   r   r7   r   rT   r   r)   r)   r)   r*   r   Z  "    
6r   c                   @   r   )PrefillFirstSchedulerr   r   c                 C   r   r   r   r   r)   r)   r*   r     r   z0PrefillFirstScheduler._allocate_blocks_if_neededr   r   r   r   c                 C   r   r   r   r   r)   r)   r*   r     r   z5PrefillFirstScheduler._prepare_request_for_processingc                 C   r   r   r   r   r)   r)   r*   r     r   z)PrefillFirstScheduler.add_waiting_requestrE   c           
         s.  g }g }g j  D ]}|jtjkr|| q|jtjkr$|| qjD ]
}|j|  q(|| }t	  |D ]K}
||  t|j}|t|js^tjjdkr] n+q<tdtffdd}|| ||8 }tdtf fdd}	|	| |dkr nq<t fddjD _S )	Nr   r   c                    r   r   r   r   r   r)   r*   r   &  r   zHPrefillFirstScheduler.schedule_batch.<locals>._add_to_scheduled_requestsc                    r   r   r   r   r   r)   r*   r   .  r   zKPrefillFirstScheduler.schedule_batch.<locals>._remove_from_waiting_requestsc                    r   r)   r)   r   r   r)   r*   r   ;  r   z8PrefillFirstScheduler.schedule_batch.<locals>.<listcomp>)r   r   r3   r   r%   rs   r&   r   r   r   r   rJ   r/   r   r   rt   r   r;   r   r   r)   r   r*   r   	  sH   



z$PrefillFirstScheduler.schedule_batchTr,   r   c                 C   r   r   r   r   r)   r)   r*   r   @  r   z$PrefillFirstScheduler.finish_requestNr   r   r)   r)   r)   r*   r     r   r   T
standaloneg?rX   rV   rW   inputsrY   safety_marginra   c                 C   s  t |d|j|j }t |d|j}t |dd}	| jdkr9tj| }
|
j}tj| }tj	| }|t
|| }n| jdkrEtd dS td	| j d
 dS t|| }|dkrbtd dS tjg |d }d| | | |	 }t |dd}|du r|rdd |D }|rtt|nd}n|du rd}|| }d}|| }t
dt|| }|| }t
|||d  }d|d  > }t
d|| }td| d| d|| |  d| d	 t|t|fS )a  Calculate optimal number and size of blocks for the KV cache.

    Args:
        device: The device where the model runs
        config: The model configuration
        generation_config: The generation configuration
        inputs: Sample input sequences to estimate memory requirements
        dtype: Data type for cache tensors
        safety_margin: Fraction of available memory to use
        median_prefill_length: Override for median prefill length calculation

    Returns:
        Tuple of (num_blocks, block_size)
    r]   r\   rg   (   cudampszBMPS memory estimation is approximate. Using conservative defaults.)      zUnsupported device type z/ for optimal block calculation. Using defaults.)       r   z9Not enough available memory. Using minimum configuration.)   r   )rY   r   rB   rA   Nc                 S   s   g | ]}|rt |qS r)   )rJ   )r   seqr)   r)   r*   r     r   z*compute_optimal_blocks.<locals>.<listcomp>@      r   zOptimal cache: z blocks of size z (can handle ~z sequences of length ))rc   rf   rd   typero   r   get_device_propertiestotal_memorymemory_allocatedmemory_reservedmaxrh   r   r8   tensorelement_size
statisticsmedian
bit_lengthri   )rX   rV   rW   r   rY   r   ra   r]   num_kv_headsrg   device_propertiesr  allocated_memoryreserved_memoryavailable_memory
dtype_sizememory_per_tokentokens_to_generatenon_empty_inputs
seq_lengthMIN_BLOCK_SIZEper_sequence_memorymax_concurrent_sequencestotal_tokensinitial_block_sizer_   r^   r)   r)   r*   rj   H  sT   




rj   c                   @   s   e Zd ZU ejed< ejed< ejed< ejed< ejed< eed< eed< ejed< ejed	< ejed
< eee	e f ed< e
ed< dZeed< dS )PagedAttentionArgs	input_idsattention_maskposition_idscumulative_seqlens_qcumulative_seqlens_kmax_seqlen_qmax_seqlen_kr   r   logits_indicesblock_tablesr   F	use_cacheN)r   r   r    ro   r   r6   r8   r   r5   r7   rU   r*  rT   r)   r)   r)   r*   r     s   
 







r   c                 C   s`  | dd  | d d k}|dd  |d d k}t | | }| d |d  } |d |d  }| d }|d }tj|| jd}tj||jd}tj|| dd  dd}	tj||dd  dd}
|	d d d f |
d d d f k}| dd  | d d  dk | dd   }tj||ddd d d f |
k}tjtj|||	jddd }|	||@ d |S )Nr   rC   )rX   T)rightFdiagonal)
minsumro   arangerX   	bucketizetriuonesrT   masked_fill_)r$  r%  valid_docs_qvalid_docs_knum_valid_docstotal_qtotal_k	q_indices	k_indices	q_doc_ids	k_doc_idsdoc_mask	is_causalapply_causalcausal_maskr)   r)   r*   create_document_mask  s"    *  rB  c                   @   s.  e Zd Z		d3dedededejdejdej	de
jd	e
jd
ededefddZedddd Zee
 dd ZdefddZdd Zedddd Zedd Zedefdd Zed!d" Zed#d$ Zed%d& Zeded'efd(d)Zed*d+ Z edefd,d-Z!ed.d/ Z"ed0d1 Z#d2S )4ContinuousBatchProcessorFr   rV   rW   input_queueoutput_queue
stop_eventmodel_devicemodel_dtype	scheduler	streamingmanual_evictionc                 C   sh   || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	|| _
g | _|   t| j| _|   dS )a  Initialize the continuous batch processor.

        Args:
            cache: The paged attention cache to use
            generation_config: The generation configuration
            input_queue: Queue for incoming requests
            output_queue: Queue for outgoing results
            stop_event: Event to signal processing should stop
            model_device: Device for model inputs/outputs
            model_dtype: Data type for model inputs/outputs
            streaming: Whether to stream tokens as they're generated
        N)r   rV   rW   rD  rE  rF  rG  rH  rI  rJ  rK  requests_in_batch_configure_batch_parametersr   max_batch_tokensmetricssetup_static_tensors)rG   r   rV   rW   rD  rE  rF  rG  rH  rI  rJ  rK  r)   r)   r*   rz     s   z!ContinuousBatchProcessor.__init__Tr   c                 C   s  | j }| jj| jj }tj| jd}|| _tjd|ffi || _	tjd|ffi || _
tjdd||f| j| jd| _tj|d ffi || _tj|d ffi || _tj|ffi || _tj|ffi || _tj|fdfi || _d| _d| _tjd|fdfi || _d S )Nrb   r   rC   r   )rN  r   r^   r_   ro   int32rG  tensor_metadatarp   r!  r#  rH  r"  r$  r%  r   r   fullr(  r&  r'  
output_ids)rG   Tmax_token_budgetrR  r)   r)   r*   rP    s"   z-ContinuousBatchProcessor.setup_static_tensorsc                 C   s|   | j   | j  | jt| jj | j	  | j
  | jd | jd | jd d| _d| _| j  dS )z(Reset static tensors for the next batch.rC   r   N)r!  zero_r#  r"  fill_ro   finforH  r.  r$  r%  r   r   r(  r&  r'  rT  rF   r)   r)   r*   reset_static_tensors  s   



z-ContinuousBatchProcessor.reset_static_tensorsrE   c                 C   s:   | j | j| j| j| j| j| j| j| j| j	| j
j| j
ddS )z2Get model keyword arguments for the current batch.F)r!  r#  r"  r$  r%  r   r   r(  r&  r'  r)  r   r*  )r!  r#  r"  r$  r%  r   r   r(  r&  r'  r   ru   rF   r)   r)   r*   get_model_kwargs(  s   z)ContinuousBatchProcessor.get_model_kwargsc              	   C   s6   d| j  d| j d| jj d| jj d	|    S )Nz%ContinuousBatchProcessor(input_queue=z, output_queue=z, active_requests=z, waiting_requests=r  )rD  rE  rI  r   r   r[  rR   rF   r)   r)   r*   rR   ;  s   (
z!ContinuousBatchProcessor.__repr__c                 C   sn   | j j| j j }t| jdd}|dur|| _nt| jdd| _t|d | j}td|| _t| jdd| _dS )z>Set up batch processing parameters based on generation config.rN  Nmax_position_embeddingsr   r  r  )	r   r^   r_   rc   rW   rN  max_context_lenr.  r  )rG   total_cache_tokensuser_batch_tokensrecommended_batch_sizer)   r)   r*   rM  A  s   z4ContinuousBatchProcessor._configure_batch_parametersc              
   C   s   | j  sVz| j  }|du rW q | j| W n6 tjy#   Y dS  tyN } z tj	d| dd t
 d}|durD| || W Y d}~nd}~ww | j  rdS dS )z?Pull new requests from the input queue and add to waiting list.NzError processing new request: Texc_infor   )rD  empty
get_nowaitrI  r   queueEmpty	Exceptionrh   r2   localsr   _handle_request_error)rG   r   er)   r)   r*   _get_new_requestsT  s    

z*ContinuousBatchProcessor._get_new_requestsr   c                 C   s\   t j|_t||_t|jtr| j|j|_	ng |_	| j
|j|j | j|  dS )z(Handle general request processing error.N)r   r(   r3   r5   r2   
isinstancer,   rI  r   r>   rO  record_request_completionr4   rE  putrS   )rG   r2   r   r)   r)   r*   ri  f  s   
z.ContinuousBatchProcessor._handle_request_errorc                 C   s  |    | j sdS | jt| jjt| jj | j| j	| _
| j
s'dS |   g }g }g }g }dg}dg}g }| j| j
 | j
D ]u}|j}	||	 |j}
t|	}||
 }tt|}||
d }| j||}|| d }|| || || ||d |  ||d |  t|jdkr||d d  t| j|| _t| j|| _| j|7  _qEtdt| j
 dt| jj dt| jj d|d  d	|d  d
| j   | ||||||| | j| j dS )z=Prepare tensors and metadata for the next model forward pass.Nr   rC   r   zScheduled: z, Waiting: z
, Active: z	. cum Q: z
. cum KV: z, free blocks: )rk  rI  r   rO  record_queue_metricsrJ   r   r   r   rN  rL  rZ  record_batch_metricsr/   rN   r@   r7   rn   r   r   rs   r=   r  r&  r'  rh   r   r   _build_tensorsrecord_kv_cache_memory_metrics)rG   r#  r!  r   r   r$  r%  r(  r   next_input_idspast_lengthquery_length
key_lengthcache_indexpositions_to_addread_indiceswrite_indicesr)   r)   r*   prepare_next_batchu  sb   





J
z+ContinuousBatchProcessor.prepare_next_batchc                 C   s  t tjfi | j}||| jd d d t|f< ||| jd d d t|f< ||| jd t|< ||| jd t|< ||| j	d t|< ||| j
d t|< ||| jd t|< t| jj}	| jjdkrtt|d D ]t}
||
d  ||
  ||
d  ||
  k r||
d  ||
  dkr||
d  ||
d  ||
   d }|||
  }nd}t||
 ||
d  }t||
 ||
d  }tjtj| jd||f j|	| j| jd|d}|| jd||f< qtd S d S )Npaged_attentionr   .rb   r,  )r   ro   r  rR  r!  rJ   r#  r   r   r$  r%  r(  rY  rH  r.  rV   _attn_implementationrn   slicer2  rS  r"  shaperG  )rG   r!  r#  r   r   r$  r%  r(  	to_tensor	min_valueir-  query_range	key_rangemaskr)   r)   r*   rq    sB   "	z'ContinuousBatchProcessor._build_tensorsc                 C   s   | j  d S r   )rT  tolistrF   r)   r)   r*   _sync  r   zContinuousBatchProcessor._synctokenc                 C   sD   | j r||_| j|  dS |jtjkr | j|  dS dS )zCSend output to the queue based on streaming mode and request state.N)rJ  
next_tokenrE  rn  rS   r3   r   r'   )rG   r   r  r)   r)   r*   _maybe_send_output  s   z+ContinuousBatchProcessor._maybe_send_outputc                 C   s   |   }g }t| jD ]U\}}|j}t|jdkrV| j|j|j t	j
|_|| j|  }|g|_||rO| j|j|j | jj|j| j d || | || q|jt	jkr`t	j|_qdS )z0Update request states based on generated tokens.r   )r   N)r  	enumeraterL  r,   rJ   r=   rO  record_ttft_metricr4   r   r&   r3   r(  r/   rQ   rm  rI  r   rK  rs   r  r$   r%   )rG   
out_tokensfinished_request_idsr  r   r   r  r)   r)   r*   update_batch  s$   

z%ContinuousBatchProcessor.update_batchc                 C   s
   | j  S )z2Check if there are any active or waiting requests.)rI  r   rF   r)   r)   r*   r     s   
z-ContinuousBatchProcessor.has_pending_requestsc                 C   s.   | j }|D ]}| || | j|j qdS )z&Handle errors during batch processing.N)rL  ri  rI  r   r,   )rG   r2   failed_reqsreqr)   r)   r*   handle_batch_error	  s
   z+ContinuousBatchProcessor.handle_batch_errorc                 C   sl   | j j D ]}| || | j |j qt| j j D ]}| j j	|}| || q| j j
  dS )zFail all active requests with the given error.

        Args:
            error: The error to report in the failure message
        N)rI  r   r   ri  r   r,   r7   r   keysr   r   clear)rG   r2   r   r   r)   r)   r*   fail_all_requests  s   z*ContinuousBatchProcessor.fail_all_requestsN)FF)$r   r   r    rU   r   r   re  Queue	threadingEventro   rX   rY   r   rT   rz   r   rP  no_gradrZ  r   r[  rR   rM  rk  r;   ri  r{  rq  r  r8   r  r  r   r  r  r)   r)   r)   r*   rC    sn    	

0



>
.


rC  )fifoprefill_firstc                	   @   s  e Zd ZdZ			d>dededefdd	Zed
d Zdd Z	d?dede
e fddZd@de
e fddZ	dAdee de
e de
e defddZdeee  fddZd@de
e fddZd d! Zed"d# Zed$efd%d&Zed'd(d)d* Zed+d(d,d- Zed.d(d$efd/d0Zd1d2 Zed3d(dBd$ed4efd5d6Zed7d(d8d9 Zed$e
e fd:d;Zedefd<d=Z dS )CContinuousBatchingManagerzManager for handling continuous batching of generation requests.

    This class provides the user interface for submitting generation requests,
    retrieving results, and managing the background generation thread.
    Fr   TrW   rK  rJ  c                 C   s   || _ || _tj|d| _t | _t | _|| _	t
|dd| _d| _d| _t | _d| j j_t
|dd| _| j | j j| _t
|dd| _t
|d	d| _|| _d| _dS )
aS  Initialize the continuous batching manager.

        Args:
            model: The language model for generation
            generation_config: Configuration for generation parameters
            max_queue_size: Maximum size of the request queue (0 = unlimited)
            streaming: Whether to stream tokens as they are generated
        )maxsizelog_prob_generationFNr   	do_sampleTuse_cuda_graphr   )modelrW   re  r  rD  rE  r  r  rF  rJ  rc   r  _generation_thread_request_counterLock_request_locktop_pr  _get_logits_processorlogit_processorr  r   rK  batch_processor)rG   r  rW   rK  max_queue_sizerJ  r)   r)   r*   rz   4  s"   




z"ContinuousBatchingManager.__init__c                 C   sT   | j dur| j  rtd dS t | _tj| j	d| _ | j 
  td dS )z'Start the background generation thread.Nz"Manager thread is already running.)targetz$Continuous batching manager started.)r  is_aliverh   r   re  r  _result_queuer  Thread_run_generation_loopstartri   rF   r)   r)   r*   r  V  s   


zContinuousBatchingManager.startc                 C   s   | j duo	| j  S )z5Check if the background generation thread is running.N)r  r  rF   r)   r)   r*   
is_runningb  s   z$ContinuousBatchingManager.is_runningNblocktimeoutc                 C   sL   | j du rtd dS | j s| j  td |r$| | dS dS )zSignal the background thread to stop.

        Args:
            block: Whether to wait for the thread to stop
            timeout: Maximum time to wait for the thread to stop
        NzManager not started.z'Stopping continuous batching manager...)r  rh   r   rF  is_setr   ri   join)rG   r  r  r)   r)   r*   stopf  s   




zContinuousBatchingManager.stopc                 C   sH   | j dur"| j j|d | j  rtd dS td d| _ dS dS )zWait for the background thread to finish.

        Args:
            timeout: Maximum time to wait for the thread to stop
        Nr  z2Generation thread did not exit after join timeout.z$Continuous Batching Manager stopped.)r  r  r  rh   r   ri   )rG   r  r)   r)   r*   r  x  s   



zContinuousBatchingManager.joinr!  r,   rB   rE   c                 C   s   |du r$| j  d| j }|  jd7  _W d   n1 sw   Y  |du r,| jjn|}t|t|t||| jjd}| jj|ddd t	
d| d	 |S )
a/  Add a new generation request to the queue.

        Args:
            input_ids: Input token IDs to use as prompt
            request_id: Optional custom request ID (auto-generated if None)
            **kwargs: Additional generation parameters

        Returns:
            str: The request ID
        Nreq_r   )r,   r/   r<   rB   rD   T
   r  r  zAdded request z
 to queue.)r  r  rW   rB   r;   r7   rD   rD  rn  rh   debug)rG   r!  r,   rB   r   r)   r)   r*   add_request  s    	z%ContinuousBatchingManager.add_requestr   c                 K   s6   t |D ]\}}d| }| j|fd|i| qd S )N
batch_req_r,   )r  r  )rG   r   r   r  r!  r   r)   r)   r*   add_requests  s   
z&ContinuousBatchingManager.add_requestsc                 C   sX   | j du r| j rdS z| jjd|d}td|j  |W S  tjy+   Y dS w )zRetrieve one result from the output queue.

        Args:
            timeout: Maximum time to wait for a result

        Returns:
            Optional[Dict]: The result data or None if timeout
        NTr  zRetrieved result for request )	r  rE  rc  r   rh   r  r,   re  rf  )rG   r  resultr)   r)   r*   
get_result  s   	z$ContinuousBatchingManager.get_resultc                 c   s`    | j dur| j  s| j s.| jdd}|dur|V  | j dur'| j  s| j rdS dS )z.Iterate over results as they become available.Ng?r  )r  r  rE  rc  r  )rG   r  r)   r)   r*   __iter__  s   &z"ContinuousBatchingManager.__iter__c                 C   s   t j }|t j  t j| | | W d    n1 s#w   Y  t j | t j | _t j| j | | W d    d S 1 sNw   Y  d S r   )	ro   r   Streamwait_streamcurrent_streamstream_generation_step	CUDAGraphgraph)rG   r  r  r)   r)   r*   warmup  s   
"z ContinuousBatchingManager.warmupr  c                 C   sj   |  }t # | |}| jr|j| | ||}| || W d   dS 1 s.w   Y  dS )z6Perform a single generation step. This is cuda graphedN)	r[  ro   r  _model_forwardr  output_probscopy__process_logit_sample)rG   r  
batch_datalogitsprobsr)   r)   r*   r    s   

"z*ContinuousBatchingManager._generation_stepmodel_forwardr   c                 C   s   | j di |jS )Nr)   )r  r  )rG   r  r)   r)   r*   r    s   z(ContinuousBatchingManager._model_forwardlogit_processingc                 C   s   |  |d |S )Nr!  )r  )rG   r  r  r)   r)   r*   r    s   z(ContinuousBatchingManager._process_logitsamplingc                 C   sN   | j rtjj|dd}tj|d ddd}ntj|dd}|j	| d S )NrC   )dimr   r   )num_samples)
r  nn
functionalsoftmaxro   multinomialsqueezeargmaxrT  r  )rG   r  r  next_tokensr)   r)   r*   r    s
   z!ContinuousBatchingManager._samplec           
      C   s  d}zzt | jj| j| jj| jj}d}t| jdr1t| jj	}|du r0t
d| d t}nt}t|| jj| j| j| j| j| jj| jj||| j| j| j}|| _d}| jrtdddd	d
d}tdddd}tjjjtjjjg}t|||ddd(}| j r| r| || |rd}|  | j r| sW d   n1 sw   Y  n| j r| r| || |rd}| j r| sW n" t y }	 zt
j!d|	 dd | "|	| W Y d}	~	nd}	~	ww W t
#d dS W t
#d dS t
#d w )z6Main processing loop running in the background thread.NrI  zScheduler 'z ' not found. Defaulting to FIFO.Tr      r`   d   r   )
skip_firstr  activerepeatwaitz/fsx/arthur/transformerspaged_compile)dir_nameuse_gzipworker_nameF)
activitiesr   on_trace_readyrecord_shapes
with_stackzError in generation loop: ra  zGeneration loop finished.)$rU   r  rV   rW   rX   rY   re   SCHEDULER_MAPPINGr   rI  rh   r   r   rC  rD  rE  rF  rK  rJ  r  r   r   r   ro   profilerProfilerActivityCPUCUDAr  r   _inner_generation_loopsteprg  r2   _handle_critical_errorri   )
rG   r  paged_attention_cacherI  is_firsttracing_scheduletrace_handlerr  profrj  r)   r)   r*   r    s   
z.ContinuousBatchingManager._run_generation_loopgeneration_loopr  c              
   C   s   t j r
t j  |  t j rR| jrR|r| | n9t| drLz|   W n- t	yK } zt
jd| dd || W Y d }~d S d }~ww | | n| | t j rat j  |  d S )Nr  zModel forward pass failed: Tra  )ro   r   is_availablesynchronizer{  r  r  re   _graph_replayrg  rh   r2   r  r  r  )rG   r  r  rj  r)   r)   r*   r  8  s(   






z0ContinuousBatchingManager._inner_generation_loopgraph_replayc                 C   s   | j   d S r   )r  replayrF   r)   r)   r*   r  O  r   z'ContinuousBatchingManager._graph_replayc                 C   s\   | j   z	 | j }|dur||| q tjy    Y nw |dur,|| dS dS )z:Handle critical errors that terminate the generation loop.TN)rF  r   rD  rd  ri  re  rf  r  )rG   r2   r  req_datar)   r)   r*   r  S  s   

z0ContinuousBatchingManager._handle_critical_errorc                 C   s.   | j std| jdur| jj| dS dS )zSEvict a request from the cache. It is assumed that the request is already finished.z0Manual eviction is not enabled for this manager.N)rK  RuntimeErrorr  rI  r   r   r)   r)   r*   evict_request_from_cachef  s
   
z2ContinuousBatchingManager.evict_request_from_cache)Fr   T)FNr   )NNr   )!r   r   r    r!   r   rT   rz   r   r  r  r	   r9   r  r  r7   r8   r5   r  r  r+   r  r  r  rC  r  r  r  r  r  r  r  r  r  r)   r)   r)   r*   r  ,  sb    

"

!	



F
r  c                   @   s|   e Zd ZdZ				ddee dededed	ef
d
dZ	e
e 		ddeee  dee ded	eee  fddZdS )ContinuousMixinz?Mixin class for models to add continuous batching capabilities.NFr   rW   rK  r  rJ  rE   c                 C   st   t | drt | drt | dstd|dur|n| j}|du r$td|jdu r1td d|_t| ||||d	S )
a  Initialize a manager for continuous batching inference.

        Args:
            generation_config: Custom generation configuration
            max_queue_size: Maximum size of the input request queue
            streaming: Whether to stream tokens as they are generated

        Returns:
            `ContinuousBatchingManager`: The manager instance to add requests and retrieve results.
        rV   rX   rY   z;Model must have 'config', 'device', and 'dtype' attributes.Nz8A GenerationConfig must be provided or set in the model.zE`eos_token_id` not set in GenerationConfig. Setting to -1 (disabled).rC   )r  rW   rK  r  rJ  )re   AttributeErrorrW   r   rD   rh   r   r  )rG   rW   rK  r  rJ  
gen_configr)   r)   r*   init_continuous_batchingr  s   

z(ContinuousMixin.init_continuous_batchingTr   progress_barc              
   K   s  |sg S | j |d}|  i }t|}zzvddlm} |tga t|| d| dddD}	|j|fi | d}
|
|k rl|jdd	}|r^|j	}|j
tjkr]|||< |
d7 }
|	d n
| shtd
 n|
|k s?W d   n1 svw   Y  W d   n1 sw   Y  W n ty } ztjd| dd W Y d}~nd}~ww W |jddd |S W |jddd |S |jddd w )a=  Generate sequences for a batch of prompts using continuous batching.

        Args:
            inputs: List of input token sequences (prompts)
            generation_config: Optional generation configuration
            **kwargs: Additional generation parameters

        Returns:
            `list[list[int]]`: A list containing the generated sequences (including prompt tokens
                                if not handled otherwise) for each input prompt, in the same order.
                                Returns an empty list `[]` for requests that failed.
        )rW   r   )logging_redirect_tqdmzSolving z	 requestsrequest)totaldisabledescunitr   r  z*Generation thread terminated unexpectedly.NzError during batch generation: Tra  g      @r  )r  r  rJ   tqdm.contrib.loggingr  rh   r   r  r  r,   r3   r   r'   r   r  r2   rg  r  )rG   r   rW   r  r   managerresultsnum_requestsr  pbarfinished_countr  r   rj  r)   r)   r*   generate_batch  s^   


 zContinuousMixin.generate_batch)NFr   F)NT)r   r   r    r!   r	   r   rT   r8   r  r  r   ro   inference_moder7   r  r)   r)   r)   r*   r  o  s<    
%

r  )?loggingre  r  r  r:   abcr   r   collectionsr   dataclassesr   r   enumr   	functoolsr   typingr	   r
   ro   torch.nnr  torch.profilerr   r   r   r   cache_utilsr   configuration_utilsr   generation.configuration_utilsr   utils.metricsr   r   r   r   	getLoggerr   rh   setLevelWARNINGr+   r;   rU   r   r   r   bfloat16rX   r7   r8   rY   r9   rj   r   rB  rC  r  r  r  r)   r)   r)   r*   <module>   s   
H 'vv
Y
  U  D