o
    oi\8                     @   s   d dl Z d dlZd dlZd dlmZmZ d dlmZ ddl	m
Z
mZ ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ G dd dZdS )    N)load_datasetconcatenate_datasets)List   )ConfigManagerDatasetConfig   )DatasetProcessor)worker_processAudioWorker)reader_worker_process)SpeakerEmbeddingProcessor)SpeakerEmbeddingGrouperc                   @   sV   e Zd ZdZddefddZdd Zdd	 Zd
d Zde	fddZ
dd Zdd ZdS )PipelineManagerz,Manages the entire audio processing pipelineconfig.yamlconfig_pathc                 C   sv   t || _| j | _| j | _| j | _tj	
 | _tj| jjdd z
tjddd W d S  ty:   Y d S w )NT)exist_okspawn)force)r   config_managerget_base_settingsbase_settingsget_save_settingssave_settingsget_speaker_embedding_settingsspeaker_emb_settingstorchcudadevice_countnum_gpusosmakedirsOUT_DIRmpset_start_methodRuntimeError)selfr    r'   K/home/ubuntu/kanitts-2-dataset-pipeline/utils/nanocodec/pipeline_manager.py__init__   s   
zPipelineManager.__init__c                    s   | j jsdS | j jsdS | j j | j }t fdd|D }|s$dS td td td td  d td	  d
 td td td	 }|sRd}|| j _td| d td dS )z
        Check whether the clustering speaker column name conflicts with any
        add_constant key in the dataset configs. If a conflict is found, prompt
        the user to supply an alternative column name.
        Nc                 3   s    | ]	} |  v V  qd S N)get_constant_columns).0dsconflict_columnr'   r(   	<genexpr>0   s
    

z=PipelineManager._check_clustering_conflict.<locals>.<genexpr>=
============================================================u<   ⚠️  WARNING: Clustering column naming conflict detected!<============================================================z&  'do_clusters: true' will produce a 'z' column (integer cluster IDs),z  but 'z<' is also defined in 'add_constant' (string constant value).zC  Please provide a different name for the clustering output column.<------------------------------------------------------------z4  New clustering column name [default: cluster_id]: 
cluster_idu(     ✅ Clustering column will be named: ''=============================================================
)
r   add_speaker_embdo_clustersclustering_speaker_columnr   get_datasetsanyprintinputstrip)r&   datasetshas_conflictnew_namer'   r.   r(   _check_clustering_conflict"   s@   


z*PipelineManager._check_clustering_conflictc                 C   s@  | j j}|r	|jsdS |j}| j }g }|D ]}|| v r)|d|j d q| j }|j	r=|j
r=|j|kr=|d |D ]}|j|krP|d|j d q?|rbtd| dd|  dS td	 td
 td td| d td td td td  }|dkrd|_td ntd td dS )a  
        Warn the user early if 'group_sp_emb.do_this: true' but the
        group_by_column_name cannot be traced to any known source:
          - add_constant in any dataset config
          - clustering output (do_clusters: true + matching column name)
          - speaker_column_name of any dataset config

        If no source is found, prompt the user to disable grouping via
        interactive input.
        Nzadd_constant in 'r5   zclustering (do_clusters: true)zspeaker_column_name in 'z#[Validation] group_sp_emb: column 'z' provided by: z, r1   u6   ⚠️  WARNING: group_sp_emb column source not found!r2   z0  'group_sp_emb.do_this: true' requires column 'zR  but it will NOT be produced by add_constant, clustering, or speaker_column_name.uN     It may already exist in the source dataset — proceed only if you are sure.r3   z,  Disable group_sp_emb for this run? [Y/n]: nFu     ✅ group_sp_emb disabled.uB     ⚠️  Proceeding — will fail at runtime if column is absent.r6   )r   	group_embdo_thisgroup_by_column_namer   r:   r+   appendnamer7   r8   r9   speaker_column_namer<   joinr=   r>   lower)r&   rD   	group_colr?   sourcesr-   spanswerr'   r'   r(   _check_group_emb_feasibilityQ   s`   






z,PipelineManager._check_group_emb_feasibilityc                 C   sJ   t d | j  |   |   | jdkrtdt d| j d dS )z&Validate configuration and environmentu    🔍 Validating configuration...r   u!   ❌ ERROR: No CUDA devices found!u
   ✅ Found z GPU(s)N)r<   r   validate_datasetsrB   rP   r   r%   r&   r'   r'   r(   validate   s   

zPipelineManager.validatedataset_configc                    s  t dd  t dj  t dj  t d  t}|jjjd jj}jj	rct dd  t d t d  t
j}|| j}||_| |_t d t d	|j  n8|r|jrt dd  t d
 t d  t|jjd}|| }||_| |_t d t d	|j  tjjjdt d t dtj   t dj  t djj  t djj  t djj  t djjd t djj  t d fddtjD }|D ]}|   q|   fddtjjD }	g tjjD ]}
t}|	|
 |_|j|_!| qfddtjjD }|D ]}|   qCzn|D ]}|"  qNtjD ]	}
#t$j% q[|D ]}|"  qgt d t dj d t&j'(jjrfddt&)jjD }|rt*fd d!|D }t d"t+| d#|d$ d%d& W d+S W d+S W d+S  t,y   t d' |D ]}|-  q|D ]}|-  q|D ]	}|j"d(d) q|D ]	}|j"d(d) qt d*  w ),z-Process a single dataset through the pipeline
r2   u   🎯 Processing dataset: u   📝 Dataset prefix: )num_procu   ────────────────────────────────────────────────────────────u'   🎙️  Speaker Embedding: starting...u   ✅ Speaker Embedding done.z+   Columns preserved through tokenization: u0   📊  Group Embeddings (standalone): starting...)settingsembedding_columnu   ✅ Group Embeddings done.)maxsizeu"   
🚀 Starting processing pipelineu   💻 CUDA available: u   🔥 GPU workers: u   📖 Reader workers: u    ⚙️  Dataset load processes: u   📁 Output directory: u   🗂️  Lines per file: ,u   📦 Queue size: r3   c                    sF   g | ]}t jt|jj jjjjjjjjj	jj
f	d qS )targetargs)r#   Processr
   r   r"   dataset_prefix
gzip_levelbuffer_sizelines_per_filenum_readersaudio_codecr,   i)rT   qr&   r'   r(   
<listcomp>   s     z:PipelineManager.process_single_dataset.<locals>.<listcomp>c                    s   g | ]} j jj|d qS ))
num_shardsindex)shardr   rc   re   )datasetr&   r'   r(   rh      s    c                    s*   g | ]}t jt|jj|  fd qS r[   )r#   r^   r   r   rc   re   )rg   r&   shard_processorsr'   r(   rh     s    r1   u   🎉 Dataset z processed successfully!c                    s   g | ]
}|  jr|qS r'   )
startswithr_   r,   f)rT   r'   r(   rh   %  s    

c                 3   s*    | ]}t jt j jj|V  qd S r*   )r    pathgetsizerJ   r   r"   ro   rR   r'   r(   r0   (  s
    
z9PipelineManager.process_single_dataset.<locals>.<genexpr>u   📊 Generated z files for this dataset, size: i   @z.2fz GBu.   
⚠️  Interrupted! Terminating processes...
   )timeoutu   🛑 All processes terminatedN).r<   rH   r_   r	   r   r   load_dataset_num_procr   rD   r7   r   processget_datasetaudio_column_namerl   get_preserved_columnspreserve_columnsrE   r   rX   groupr#   Queueqsizer   r   is_availabler   rc   r"   rb   rangestartrG   rJ   putr   SENTINELr    rq   existslistdirsumlenKeyboardInterrupt	terminate)r&   rT   	processorrD   sp_processorupdated_datasetgrouperworkerspsharded_datasetsrf   
shard_procreadersprfiles
total_sizer'   )rl   rT   rg   r&   rm   r(   process_single_dataset   s   










z&PipelineManager.process_single_datasetc                 C   s   t dd  t d t d  tj| jjd}t d|  td| jjdddd	}t d
t| d | jj	rMt d| jj	  |
| jj	 t d | jjrgt d| jj  |j| jjdd t d t dd  t d t d  dS )z@Assemble all processed shards into final dataset and save/uploadrU   r2   u0   🔨 Assembling final dataset from all shards...z
*.jsonl.gzu   📂 Loading shards from: jsontrain	no_checks)data_dir
data_filessplitverification_modeu   ✅ Final dataset assembled: z samplesu!   
💾 Saving dataset locally to: u   ✅ Dataset saved to disku+   
☁️  Uploading dataset to HuggingFace: T)privateu'   ✅ Dataset uploaded to HuggingFace Hubu%   🎊 Pipeline completed successfully!N)r<   r    rq   rJ   r   r"   r   r   r   localsave_to_disk	hf_uploadpush_to_hub)r&   shard_filesfinal_datasetr'   r'   r(   assemble_and_save_final_datasetA  s0   
z/PipelineManager.assemble_and_save_final_datasetc                 C   sp   |    | j }tdt| d t|dD ]\}}td| dt|  | | q|   td dS )z*Run the complete pipeline for all datasetsu   
📋 Found z dataset(s) to processr   u   
🔄 Processing dataset /u   
👋 Pipeline finished!N)rS   r   r:   r<   r   	enumerater   r   )r&   r?   idxrT   r'   r'   r(   rune  s   
zPipelineManager.runN)r   )__name__
__module____qualname____doc__strr)   rB   rP   rS   r   r   r   r   r'   r'   r'   r(   r      s    /H $r   )r   multiprocessingr#   r    r?   r   r   typingr   r   r   r   dataset_processorr	   audio_workerr
   r   reader_workerr   speaker_emb.processorr   speaker_emb.group_embeddingsr   r   r'   r'   r'   r(   <module>   s    