o
    3NPi+J                  	   @   s   d Z ddlZddlZddlZddlZddlmZmZ ddlm	Z	m
Z
mZmZ ddlmZ edZeG dd dZeG d	d
 d
ZG dd dZe ZddedededefddZdS )a  
Compute Resource Detection & Monitoring - Adaptive Pipeline Configuration

Auto-detects: nproc, vCPUs, GPU vRAM, CPU cores
Monitors: CPU/GPU utilization at each stage
Adapts: Worker counts, batch sizes, parallelism based on available resources

Key principle: Use ALL available compute at every stage.
- CPU-bound: Maximize workers up to core count
- GPU-bound: Keep GPU saturated, prepare CPU work in parallel
- Memory-bound: Adapt batch sizes to available RAM/VRAM
    N)	dataclassfield)DictOptionalCallableAny)contextmanagerzFastPipelineV5.Computec                   @   s   e Zd ZU dZdZeed< dZeed< dZeed< dZ	e
ed< dZe
ed< d	Zeed
< dZeed< dZe
ed< dZe
ed< dZeed< dZe
ed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dS )ComputeResourcesz2Detected compute resources for the current system.   	cpu_countcpu_count_logicalnproc        ram_total_gbram_available_gbFgpu_available gpu_namegpu_vram_total_gbgpu_vram_free_gbgpu_compute_capabilityg?max_utilization   optimal_vad_workersoptimal_embedding_batch@   optimal_music_batch   optimal_download_workersoptimal_cpu_workersoptimal_chunk_workersN)__name__
__module____qualname____doc__r   int__annotations__r   r   r   floatr   r   boolr   strr   r   r   r   r   r   r   r   r   r     r*   r*   Q/home/ubuntu/.cursor/worktrees/maya3data__SSH__216.81.248.184_/zxg/src/compute.pyr	      s&   
 r	   c                   @   s   e Zd ZU dZeed< dZeed< dZeed< dZ	eed< dZ
eed< dZeed< dZeed	< dZeed
< dZeed< dZeed< dZeed< dS )StageMetricsz$Metrics for a single pipeline stage.
stage_namer   
start_timeend_timedurationcpu_percent_avgcpu_percent_maxgpu_percent_avggpu_percent_maxgpu_memory_used_gbr   items_processed
throughputN)r!   r"   r#   r$   r)   r&   r.   r'   r/   r0   r1   r2   r3   r4   r5   r6   r%   r7   r*   r*   r*   r+   r,   :   s   
 r,   c                       s   e Zd ZdZdZ fddZdd Zdd Zd	d
 Zde	e
ef fddZdefddZd%ddZed&de
defddZdd Zdd Zdd Zdefdd Zde
fd!d"Zd#d$ Z  ZS )'ComputeMonitorz
    Singleton compute resource monitor.
    
    Features:
    1. Auto-detect system resources on init
    2. Compute optimal worker counts
    3. Track utilization during stages
    4. Suggest optimizations
    Nc                    s&   | j d u rt | | _ d| j _| j S )NF)	_instancesuper__new___initialized)cls	__class__r*   r+   r;   V   s   
zComputeMonitor.__new__c                 C   sH   | j rd S d| _ t | _i | _d| _d | _d | _g | _g | _| 	  d S )NTF)
r<   r	   	resourcesstage_metrics_monitoring_monitor_thread_current_stage_cpu_samples_gpu_samples_detect_resourcesselfr*   r*   r+   __init__\   s   zComputeMonitor.__init__c              
   C   s  t d t d t d zWddl}| | j_zddl}|jddp(| jj| j_W n ty;   | jj| j_Y nw zddl}|j	dgddd	d
}t
|j | j_W n tyd   | jj| j_Y nw W n' ty } zt d| d d| j_d| j_d| j_W Y d}~nd}~ww t d| jj d| jj d t d| jj d z*ddl}| }|jd | j_|jd | j_t d| jjdd| jjdd W n ty   t d Y nw zxddl}|j rTd| j_|jd| j_|jdjd }|jdd }	|jdd }
|| j_||
 | j_ |jd}|j! d|j" | j_#t d| jj  t d| jj dd| jjdd t d| jj#  nt d W n tyu } zt d|  W Y d}~nd}~ww | $  t d  t d!| jj%d" d#d$ t d%| jj&  t d&| jj'  t d'| jj(  t d(| jj)  t d)| jj*  t d*| jj+  t d dS )+z'Detect all available compute resources.F======================================================================u8   🖥️  DETECTING COMPUTE RESOURCES (Adaptive Pipeline)r   NF)logicalr   T   )capture_outputtexttimeoutzCPU detection failed: z, using defaultsr   z   CPU: z physical / z logical coresz
   nproc: z usable processors   @z   RAM: .1fz / z GB availablez.psutil not available, memory detection skipped.z   GPU: z	   VRAM: z GB freez   Compute: SM z&   GPU: None available (CPU-only mode)zGPU detection failed: F----------------------------------------------------------------------u'   🎯 OPTIMAL SETTINGS (auto-computed @ d   .0fz% cap):z   VAD workers: z   Embedding batch: z   Music batch: z   Download workers: z   CPU workers: z   Diarization workers: ),loggerinfomultiprocessingr   r@   r   psutilImportError
subprocessrunr%   stdoutstripr   	Exceptionwarningvirtual_memorytotalr   	availabler   torchcudais_availabler   get_device_namer   get_device_propertiestotal_memorymemory_allocatedmemory_reservedr   r   majorminorr   _compute_optimal_settingsr   r   r   r   r   r   r    )rI   rY   rZ   r\   resultememre   rc   	allocatedreservedpropsr*   r*   r+   rG   l   s   


 ($

z ComputeMonitor._detect_resourcesc                 C   s:  | j }|j}tt|j|j| }tdt|d|_t|j| d d }t|j||_td|j|_|j	rI|j
| }tdtt|d d|_nd|_|j	rb|j
| }tdtt|d d	|_nd|_td
tdt|j| d |_tdt|j| d |_|j	r|j| }d}tdt|| |_dS d|_dS )z
        Compute optimal worker counts based on detected resources.
        
        All settings respect max_utilization cap (default 80%) to leave headroom
        for system stability and unexpected spikes.
        r   r   i   i,  r   (                g       @r
   N)r@   r   r%   maxr   r   minr   r   r   r   r   r   r   r   r   r    )rI   rcapbase_workers
max_by_ramusable_vramvram_per_diarizationr*   r*   r+   ro      s,   

 

z(ComputeMonitor._compute_optimal_settingsreturnc                 C   s$   | j }|j|j|j|j|j|jdS )z3Return optimal configuration dict for Config class.)vad_workersmax_workersdownload_workerschunk_workersembedding_batch_sizemusic_batch_size)r@   r   r   r   r    r   r   )rI   r}   r*   r*   r+   get_optimal_config   s   z!ComputeMonitor.get_optimal_configr~   c                 C   s<   t dtd|}|| j_|   td|d dd dS )z
        Set max utilization cap and recompute optimal settings.
        
        Args:
            cap: Utilization cap between 0.1 and 1.0 (default 0.8 = 80%)
        皙?      ?u$   🎚️  Utilization cap updated to rU   rV   z%, settings recomputedN)r{   r|   r@   r   ro   rW   rX   )rI   r~   r*   r*   r+   set_max_utilization  s   z"ComputeMonitor.set_max_utilizationc                 C   s4   |   }| D ]\}}t||rt||| qdS )a  
        Apply optimal settings to a Config object.
        
        This updates the config's parallelism and batch size settings
        based on auto-detected system resources.
        
        Args:
            config: Config dataclass instance to update
        N)r   itemshasattrsetattr)rI   configoptimalkeyvaluer*   r*   r+   apply_to_config  s   

zComputeMonitor.apply_to_configr   r-   r   c              	   c   s
   t ||d}t |_|| _g | _g | _|   zx|V  W |   t |_|j|j |_	| jrBt
| jt| j |_t| j|_| jrtdd | jD }dd | jD }|r_t
|t| nd|_|rht|nd|_|rqt|nd|_|dkr|j	dkr||j	 |_|| j|< d| _| | dS |   t |_|j|j |_	| jrt
| jt| j |_t| j|_| jrdd | jD }dd | jD }|rt
|t| nd|_|rt|nd|_|rt|nd|_|dkr|j	dkr||j	 |_|| j|< d| _| | w )z
        Context manager to monitor a pipeline stage.
        
        Usage:
            with COMPUTE.monitor_stage("VAD", items=100) as metrics:
                # do work
            print(metrics.duration, metrics.cpu_percent_avg)
        )r-   r6   c                 S      g | ]}|d  qS r   r*   .0sr*   r*   r+   
<listcomp>F      z0ComputeMonitor.monitor_stage.<locals>.<listcomp>c                 S   r   )r
   r*   r   r*   r*   r+   r   G  r   r   N)r,   timer.   rD   rE   rF   _start_monitoring_stop_monitoringr/   r0   sumlenr1   r{   r2   r3   r4   r5   r7   rA   _log_stage_summary)rI   r-   r   metrics	gpu_utilsgpu_memsr*   r*   r+   monitor_stage#  sV   





zComputeMonitor.monitor_stagec                 C   s&   d| _ tj| jdd| _| j  dS )z(Start background utilization monitoring.T)targetdaemonN)rB   	threadingThread_monitor_looprC   startrH   r*   r*   r+   r   U  s   z ComputeMonitor._start_monitoringc                 C   s"   d| _ | jr| jjdd dS dS )zStop background monitoring.Fr   )rP   N)rB   rC   joinrH   r*   r*   r+   r   [  s   zComputeMonitor._stop_monitoringc           
      C   sB  zddl }d}W n ty   d}Y nw zddl}|j }W n ty*   d}Y nw | jrz]|r=|jdd}| j| |rz)ddl	}|
  |d}||}||}	| j|j|	jd f |  W n ty   ddl}|j d }	| jd|	f Y nw td W n ty   td Y nw | js.dS dS )	z3Background thread that samples CPU/GPU utilization.r   NTFr   )intervalrQ   g      ?)rZ   r[   re   rf   rg   rB   cpu_percentrE   appendpynvmlnvmlInitnvmlDeviceGetHandleByIndexnvmlDeviceGetUtilizationRatesnvmlDeviceGetMemoryInforF   gpuusednvmlShutdownr`   rk   r   sleep)
rI   rZ   
has_psutilre   	has_torchcpur   handleutilrr   r*   r*   r+   r   a  sH   


zComputeMonitor._monitor_loopr   c                 C   s   |j dkrdn|j dk rdnd}|jdkrdn|jdk rdnd}td|j d|jdd	|j d
d| d|jd
d| d|jdd |j dk rW|jdkrWtd|j  |jdkrm|jdk ro|jdkrqtd dS dS dS dS )z"Log summary for a completed stage.F   u   ✅ HIGH   u
   ⚠️ LOWu   ➖ OKu   📊 : rR   	s | CPU: rV   z% (z	) | GPU: z
) | VRAM: GBrM   u<      💡 HINT: CPU underutilized - consider more workers for r   u@      💡 HINT: GPU underutilized - could run CPU work in parallelN)r1   r3   rW   rX   r-   r0   r5   )rI   r   
cpu_status
gpu_statusr*   r*   r+   r     s$     
z!ComputeMonitor._log_stage_summaryc                 C   s"  g d}d}d}d}| j  D ]2\}}|d|dd|jdd|jdd	|jdd
	 ||j7 }||j|j 7 }||j|j 7 }q|dkr|| }|| }|d |dddd|dd|dd	|dd
	 || d }	|	dkrvd}
n	|	dkr}d}
nd}
|d|
  |d d|S )zReturn full pipeline summary.)zG
======================================================================u    📊 COMPUTE UTILIZATION SUMMARYrK   r   z   20sr   z6.1fr   z5.1fz	% | GPU: %rT   TOTALrz   <   u   🟢 EXCELLENTrv   u	   🟡 GOODu   🔴 NEEDS OPTIMIZATIONz   Efficiency: rK   
)rA   r   r   r0   r1   r3   r   )rI   lines
total_timecpu_weightedgpu_weightednamemavg_cpuavg_gpu
efficiencyratingr*   r*   r+   summary  s.   2

,

zComputeMonitor.summaryc                 C   sd   z'ddl }|j r%|jdd }|jdd }| jj| | j_W dS W dS  ty1   Y dS w )z5Refresh GPU memory stats (call after clearing cache).r   NrQ   )	re   rf   rg   rk   rl   r@   r   r   r`   )rI   re   rs   rt   r*   r*   r+   refresh_gpu_memory  s   
z!ComputeMonitor.refresh_gpu_memory)r   Nr   )r!   r"   r#   r$   r9   r;   rJ   rG   ro   r   r)   r   r   r'   r   r   r   r%   r   r   r   r   r,   r   r   r   __classcell__r*   r*   r>   r+   r8   J   s$    	U;
1)%r8     segment_lengths
base_batchmax_memory_mbr   c                 C   sL   | s|S t | t|  }|d }|dkr|S t|| }tdt||d S )z
    Compute adaptive batch size based on segment lengths.
    
    Shorter segments = larger batches possible
    Longer segments = smaller batches to avoid OOM
    g?r   r
   rz   )r   r   r%   r{   r|   )r   r   r   
avg_lengthavg_memory_per_seg_mbr   r*   r*   r+   get_adaptive_batch_size  s   r   )r   )r$   osr   loggingr   dataclassesr   r   typingr   r   r   r   
contextlibr   	getLoggerrW   r	   r,   r8   COMPUTElistr%   r   r*   r*   r*   r+   <module>   s&   
    