o
    ۷i]                     @  sF  U d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZmZmZ eeZe
G d
d dZe
G dd dZe
G dd dZe
G dd dZddd fddd fddd fdZded< h dZh dZdhZdZded < eeeeZeeeeZ eeeeZ!G d!d" d"Z"dS )#    )annotationsN)defaultdict)Callable)contextmanager)	dataclass)Any)init_logger)_build_field_defs
_build_row_format_tablec                   @  s4   e Zd ZU dZded< dZded< eddd	Zd
S )
StageStatsr   inttotal_token        floattotal_gen_time_msreturnc                 C     | j dkr| jd | j  S dS Nr        @@r   )r   r   self r   M/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/metrics/stats.pyavg_tokens_per_s      zStageStats.avg_tokens_per_sNr   r   )__name__
__module____qualname__r   __annotations__r   propertyr   r   r   r   r   r      s
   
 r   c                   @  s   e Zd ZU ded< ded< ded< ded< ded< ded< ded	< ded
< ded< dZded< dZded< dZded< dZded< dZded< dZ	ded< e
dddZe
dddZdS )StageRequestStatsr   batch_id
batch_sizenum_tokens_innum_tokens_outr   stage_gen_time_msrx_transfer_bytesrx_decode_time_msrx_in_flight_time_msr   stage_statsNz
int | Nonestage_id
str | Nonefinal_output_type
request_idr   postprocess_time_mszdict[str, int]diffusion_metricsr   audio_generated_framesr   c                 C  s0   | j dkrt| j d tt| jdd  S dS )Nr   g       @gư>r   r   )r(   r   maxr)   r   r   r   r   rx_mbps-   s
   
 zStageRequestStats.rx_mbpsc                 C  r   r   )r'   r&   r   r   r   r   tokens_per_s5   r   zStageRequestStats.tokens_per_sr   )r   r   r   r    r,   r.   r/   r0   r1   r2   r!   r4   r5   r   r   r   r   r"      s(   
 r"   c                   @  sh   e Zd ZU ded< ded< ded< ded< ded< d	Zd
ed< dZded< dZded< edddZdS )TransferEdgeStatsr   
from_stageto_stagestrr/   
size_bytesr   
tx_time_msFboolused_shmr   r)   in_flight_time_msr   c                 C  s   t | jt | j t | j S N)r   r;   r)   r>   r   r   r   r   total_time_msE   r   zTransferEdgeStats.total_time_msNr   )	r   r   r   r    r=   r)   r>   r!   r@   r   r   r   r   r6   :   s   
 r6   c                   @  sD   e Zd ZU ded< ded< ded< ded< ded< edd
dZdS )RequestE2EStatsr9   r/   r   e2e_total_msr   e2e_total_tokenstransfers_total_time_mstransfers_total_bytesr   c                 C  s   | j dkr| j| j  S dS )Nr   r   )rC   rB   r   r   r   r   e2e_tptR   s   zRequestE2EStats.e2e_tptNr   )r   r   r   r    r!   rF   r   r   r   r   rA   J   s   
 rA   rx_transfer_kbytesc                 C     | d S Ng      @r   vr   r   r   <lambda>Z       rL   size_kbytesc                 C  rH   rI   r   rJ   r   r   r   rL   [   rM   transfers_total_kbytesc                 C  rH   rI   r   rJ   r   r   r   rL   \   rM   )r(   r:   rE   z+dict[str, tuple[str, Callable[[Any], Any]]]FIELD_TRANSFORMS>   r,   r/   r+   r.   r)   r(   r*   >   r8   r=   r7   r/   r/   zlist[str] | NoneOVERALL_FIELDSc                   @  s   e Zd ZdEddZdFddZdGddZdHddZdIdd ZdJd#d$ZdKd.d/Z		0dLdMd2d3Z
	0dLdNd4d5ZdOd7d8ZedPd9d:ZdQd;d<ZdRd>d?ZdSdAdBZdTdCdDZd0S )UOrchestratorAggregator
num_stagesr   	log_statsr<   wall_start_tsr   final_stage_id_for_e2edict[str, int] | intr   Nonec                 C  s:   t || _t|| _|| _| | i | _i | _g | _d S r?   )	r   rS   r<   rT   rV   init_run_statestage_eventstransfer_events
e2e_events)r   rS   rT   rU   rV   r   r   r   __init__t   s   



zOrchestratorAggregator.__init__c                 C  s   dd t | jD | _d| _d| _d| _t | _t|| _	t|| _
dd t | jD | _dd t | jD | _tdd | _td	d | _d S )
Nc                 S     g | ]}d qS )r   r   .0_r   r   r   
<listcomp>       z9OrchestratorAggregator.init_run_state.<locals>.<listcomp>r   r   c                 S  r^   r?   r   r_   r   r   r   rb      rc   c                 S  r^   r?   r   r_   r   r   r   rb      rc   c                   S     t tS r?   r   r   r   r   r   r   rL      rM   z7OrchestratorAggregator.init_run_state.<locals>.<lambda>c                   S  rd   r?   re   r   r   r   r   rL      rM   )rangerS   stage_total_tokensrB   rC   	e2e_countsete2e_doner   rU   last_finish_tsstage_first_tsstage_last_tsr   accumulated_gen_time_msr1   )r   rU   r   r   r   rY      s   


z%OrchestratorAggregator.init_run_stater7   r8   r/   r9   r6   c              
   C  sD   |||f}| j |}|d u r t|||dddddd}|| j |< |S )Nr   r   F)r7   r8   r/   r:   r;   r=   r)   r>   )r[   getr6   )r   r7   r8   r/   keyevtr   r   r   _get_or_create_transfer_event   s   


z4OrchestratorAggregator._get_or_create_transfer_eventr   r:   r;   r=   TransferEdgeStats | Nonec                 C  sj   z*|  t|t|t|}| jt|7  _| jt|7  _|jp&t||_|W S  ty4   Y d S w r?   )	rr   r   r9   r:   r;   r   r=   r<   	Exception)r   r7   r8   r/   r:   r;   r=   rq   r   r   r   record_transfer_tx   s   	z)OrchestratorAggregator.record_transfer_txstatsr"   c                 C  s   zG|j d u s|j dkrW d S t|j d }t|j }t|j}| |||}|jdkr1t|j|_| jt|j7  _| j	t|j
7  _	|W S  tyQ   Y d S w )Nr      )r,   r   r9   r/   rr   r:   r(   r)   r   r>   r*   rt   )r   rv   r7   r8   rid_keyrq   r   r   r   record_transfer_rx   s   


z)OrchestratorAggregator.record_transfer_rxoutput_to_yieldr,   c                 C  s   z[|j dkrS|jd }d urVt|dkrYtdd t|tr!|n|gD }| j|g }|rI|D ]}|j|krB| j	|7  _	 nq2W d S W d S t
d|| W d S W d S W d S W d S  tym   t
jd|dd Y d S w )	Naudior   c                 s  s,    | ]}|j d krt|jd  ndV  qdS )r   rw   N)ndimr   shape)r`   tr   r   r   	<genexpr>   s
    
zGOrchestratorAggregator.record_audio_generated_frames.<locals>.<genexpr>zXFailed to record audio generated frames for request %s at stage %s: no stage event foundz,Failed to record audio frames for request %sT)exc_info)r.   multimodal_outputro   lensum
isinstancelistrZ   r,   r2   loggerwarningrt   debug)r   rz   r,   r/   r   nframesstage_events_for_reqstage_eventr   r   r   record_audio_generated_frames   s>   


z4OrchestratorAggregator.record_audio_generated_framesresultdict[str, Any]
stage_typereq_idengine_outputsfinishedr.   r-   
Any | Nonec                  s   zj| d}	|	dur'| j|    |	j7  < | ||| |r'|  ||	| |du r.W dS |s6i |_W dS i |_t fddt| j |g D d}
|
dura|
j	dkra|
j
|
j|
j|
j	d|_| | | W dS  ty{   td | Y dS w )a  Process and record stage metrics.

        Args:
            result: Result dict containing metrics from stage
            stage_type: Type of the stage (e.g., 'llm', 'diffusion')
            stage_id: Stage identifier
            req_id: Request identifier
            engine_outputs: Engine output object
            finished: Whether stage processing is finished
            final_output_type: Type of final output (e.g., 'text', 'audio')
            output_to_yield: Output object to attach metrics to
        metricsNc                 3      | ]
}|j  kr|V  qd S r?   r,   r`   rq   r   r   r   r   *      z?OrchestratorAggregator.process_stage_metrics.<locals>.<genexpr>text)r%   r&   r,   r.   z.Failed to process metrics for stage %s, req %s)ro   rn   r'   accumulate_diffusion_metricson_stage_metricsr   nextreversedrZ   r.   r%   r&   r,   r   rt   r   	exception)r   r   r   r,   r   r   r   r.   rz   _mr   r   r   r   process_stage_metrics   s>   

z,OrchestratorAggregator.process_stage_metricsNr   c                 C  sL   |}||_ ||_||_|| jv r!dd | j|i  D |_|S d|_|S )z,Convert dict to StageRequestStats if needed.c                 S  s   i | ]	\}}|t |qS r   )r   )r`   krK   r   r   r   
<dictcomp>L      zBOrchestratorAggregator._as_stage_request_stats.<locals>.<dictcomp>N)r,   r/   r.   r1   popitemsr   r,   r   r   r.   rv   r   r   r   _as_stage_request_stats?  s   
z.OrchestratorAggregator._as_stage_request_statsc                 C  sv   |  ||||}| j|j  t|j7  < |jdkr'| j|j  t|j7  < | jt|j	g 
| | | d S )Nr   )r   rg   r,   r   r&   r%   rZ   
setdefaultr9   r/   appendry   r   r   r   r   r   R  s   
z'OrchestratorAggregator.on_stage_metricspostproc_time_msc                 C  sJ   || j v r| j | D ]}|j|krt||_ d S q
d S td|| d S )NzRFailed to record postprocess time for request %s at stage %s: no stage event found)rZ   r,   r   r0   r   r   )r   r,   r   r   rv   r   r   r   record_stage_postprocess_timea  s   


z4OrchestratorAggregator.record_stage_postprocess_timec                 c  sV    t  }zdV  W t  | d }| ||| dS t  | d }| ||| w )zContext manager for measuring and recording stage postprocessing time.

        Usage:
            with metrics.stage_postprocess_timer(stage_id, request_id):
                next_inputs = next_stage.process_engine_inputs(...)
        Nr   )timeperf_counterr   )r   r,   r   _t0_postproc_msr   r   r   stage_postprocess_timern  s   z.OrchestratorAggregator.stage_postprocess_timerc                 C  sx   |dkrdS t |tr|r|d n|}t|di }t |tr"|d }|r8| D ]\}}| j| |  |7  < q(dS dS )zAccumulate diffusion metrics for a request.

        Handles extraction and accumulation of diffusion stage metrics.

        Args:
            req_id: Request ID
            engine_outputs: Engine output object containing metrics
        	diffusionNr   r   )r   r   getattrr   r1   )r   r   r   r   engine_outputr1   rp   valuer   r   r   r   }  s   	
z3OrchestratorAggregator.accumulate_diffusion_metricstx_msc                 C  s6   | j | d u rt | j |< | j||||||d d S )N)r7   r8   r/   r:   r;   r=   )rl   r   ru   )r   r7   r8   r   r:   r   r=   r   r   r   
on_forward  s   

z!OrchestratorAggregator.on_forwardreq_start_tsc              
     s:  t |  | jv rd S t|}t }| j| }|d u r|nt||| j|< t| j|| _|| d }d} | jv rU| j  D ]}	|	jdkrM|t	|	j
7 }|t	|	j7 }q?|  j|7  _|  j|7  _|  jd7  _| j  t ||tt fdd| j D t	t fdd| j D d}
| j|
 d S )Nr   r   rw   c                 3       | ]}|j  kr|jV  qd S r?   )r/   r@   r   rx   r   r   r         z=OrchestratorAggregator.on_finalize_request.<locals>.<genexpr>c                 3  r   r?   )r/   r:   r   r   r   r   r     r   )r/   rB   rC   rD   rE   )r9   rj   r   r   rm   r3   rk   rZ   r,   r   r%   r&   rB   rC   rh   addrA   r   r[   valuesr\   r   )r   r,   r   r   r   _t1	prev_laste2e_mstotal_tokensrq   per_req_recordr   r   r   on_finalize_request  s>   



z*OrchestratorAggregator.on_finalize_requestc           !   
     sV  j si S tdjj d }jdkr|j nd}|dkr'jd | nd}tjtr7dtji}nj}fddt	j
D }tjt|tjt|t|d}t|D ]\}}||d| d	< q^g }	tptt| D ]}
||
d }|d
vr|	|
 qu|	rtdtd||	 ttj dd jD B }g }g }g }|D ]u t fddjD d }|rt|t}|d i| t }| D ]\}
}|dvr||
 qt|}|rtdtd  d||d tj g dd d}t }t dd |D }|r|d t!t"|t#}g }|D ]"}d|j$it||}|j%r8|&|j% |'dd  || q"| |d |rt }|D ]}| D ]}
|
dkrg||
 q[qUg }t|D ]"}d}|D ]}||d }|d
vrd} nqw|s|| qq|rtdtd   d|d|d! t fd"dj() D d#d d}d$d |D } | | d% | r"t }| D ]}| D ]}
|
d&kr||
 q֐qg }t|D ]"}d}| D ]}||d }|d
vrd} nq|s|| q|r"tdtd'  d| d&|d! q|||||d(S ))Nr   r   r   *c                   sD   g | ]} j | d ur j| d ur j|  j |  d ndqS )Nr   r   )rl   rm   )r`   ir   r   r   rb     s    z@OrchestratorAggregator.build_and_log_summary.<locals>.<listcomp>)e2e_requestse2e_wall_time_msrC   e2e_avg_time_per_request_mse2e_avg_tokens_per_s
e2e_stage__wall_time_ms)r   r   r   N z
%szOverall Summaryc                 S  s   h | ]}|j qS r   r/   r`   er   r   r   	<setcomp>  s    z?OrchestratorAggregator.build_and_log_summary.<locals>.<setcomp>c                 3  r   r?   r   r   ridr   r   r     r   z?OrchestratorAggregator.build_and_log_summary.<locals>.<genexpr>r/   )r   r   Nr   zRequestE2EStats [request_id=])value_fieldsc                 S  s   | j d ur| j S dS )Nr   r   r   r   r   rL     s    z>OrchestratorAggregator.build_and_log_summary.<locals>.<lambda>)rp   c                 s  s    | ]	}t |d dV  qdS )r1   N)r   r   r   r   r   r   !  s    r0   r,   r1   )r/   stagesTFzStageRequestStats [request_id=)
column_keyr   c                   s   g | ]	}|j  kr|qS r   r   r   r   r   r   rb   Q  r   c                 S  s   | j | jfS r?   )r7   r8   r   r   r   r   rL   R  s    c                 S  s,   g | ]}d |j  d|j it|tqS )edgez->)r7   r8   r
   TRANSFER_FIELDSr   r   r   r   rb   T  s    )r/   	transfersr   zTransferEdgeStats [request_id=)final_stage_idoverall_summarystage_tabletrans_table	e2e_table)*rT   r3   rk   rU   rh   rC   r   rV   r   rf   rS   r   	enumeraterQ   r   keysro   r   r   infor   sortedri   rZ   r\   r   r
   
E2E_FIELDSr   r   STAGE_EXCLUDEcopyanyr	   r"   rP   r,   r1   updater   r[   r   )!r   wall_time_mse2e_avg_reqe2e_avg_tokfinal_stage_id_mapstage_wall_time_msr   idx	wall_timeoverall_fieldsr   rK   all_request_idsresult_stage_tableresult_trans_tableresult_e2e_tablee2e_evte2e_datanonzero_e2e_fieldsvalue_fields_e2e
stage_evtslocal_excludehas_diffusion_metricslocal_stage_fields
stage_rowsrq   rowall_value_fieldsvalue_fields_listfieldall_zerotransfer_evtstransfer_rowsr   )r   r   r   build_and_log_summary  s$  


"















z,OrchestratorAggregator.build_and_log_summary)
rS   r   rT   r<   rU   r   rV   rW   r   rX   )rU   r   r   rX   )r7   r   r8   r   r/   r9   r   r6   )r7   r   r8   r   r/   r   r:   r   r;   r   r=   r<   r   rs   )rv   r"   r   rs   )rz   r   r,   r   r/   r9   r   rX   )r   r   r   r9   r,   r   r   r9   r   r   r   r<   r.   r-   rz   r   r   rX   r?   )
r,   r   r   r9   r   r"   r.   r-   r   r"   )
r,   r   r   r   r   r"   r.   r-   r   rX   )r,   r   r   r   r   r   r   rX   )r,   r   r   r   )r   r9   r   r   r   r   r   rX   )r7   r   r8   r   r   r   r:   r   r   r   r=   r<   r   rX   )r,   r   r   r   r   r   r   rX   )r   r   )r   r   r   r]   rY   rr   ru   ry   r   r   r   r   r   r   r   r   r   r   r  r   r   r   r   rR   s   s&    






#H



+rR   )#
__future__r   r   collectionsr   collections.abcr   
contextlibr   dataclassesr   typingr   vllm.loggerr   vllm_omni.metrics.utilsr	   r
   r   r   r   r   r"   r6   rA   rP   r    r   TRANSFER_EXCLUDEE2E_EXCLUDErQ   STAGE_FIELDSr   r   rR   r   r   r   r   <module>   s<    	


	