#!/usr/bin/env python3
"""
Pipeline API routes for processing new videos.

Features:
- Submit new YouTube URLs for processing
- WebSocket endpoint for real-time progress updates
- Job status tracking
- Integration with existing pipeline
"""

import asyncio
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from enum import Enum

from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel, HttpUrl
import shutil
import hashlib
import subprocess
import json
import time

# Import pipeline modules
import sys
root_dir = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(root_dir))

from src.config import Config
from src.download import download_audio, extract_video_id, VideoValidationError, DownloadError

# === FIX: Convert config.output_dir to absolute path from project root ===
# When backend runs from visualizer/backend/, relative paths resolve incorrectly
# Must use project root (4 parents up) as base for output_dir
def get_absolute_output_dir() -> Path:
    """Get absolute path to output directory from project root."""
    config = Config()
    return (root_dir / config.output_dir).resolve()

logger = logging.getLogger("VisualizerBackend.Pipeline")

router = APIRouter()

# === Job Management ===
class JobStatus(str, Enum):
    QUEUED = "queued"
    VALIDATING = "validating"
    DOWNLOADING = "downloading"
    PROCESSING_VAD = "processing_vad"
    PROCESSING_DIARIZATION = "processing_diarization"
    PROCESSING_EMBEDDINGS = "processing_embeddings"
    PROCESSING_CLUSTERING = "processing_clustering"
    FINALIZING = "finalizing"
    COMPLETED = "completed"
    FAILED = "failed"


class ProcessRequest(BaseModel):
    """Request to process a new video"""
    url: str
    skip_if_exists: bool = True


class ProcessResponse(BaseModel):
    """Response after submitting processing job"""
    job_id: str
    video_id: str
    status: JobStatus
    message: str


class JobInfo(BaseModel):
    """Job status information"""
    job_id: str
    video_id: str
    status: JobStatus
    progress: float  # 0.0 to 1.0
    current_stage: str
    message: str
    started_at: str
    completed_at: Optional[str] = None
    error: Optional[str] = None


# In-memory job tracking (for simple implementation)
# TODO: Use Redis or database for production
jobs: Dict[str, JobInfo] = {}

# WebSocket connections for progress updates
ws_connections: Dict[str, list[WebSocket]] = {}


# === Request Models ===
@router.post("/process", response_model=ProcessResponse)
async def process_video(request: ProcessRequest, background_tasks: BackgroundTasks):
    """
    Submit a new YouTube URL for processing.
    
    Args:
        request: Processing request with URL and options
        background_tasks: FastAPI background tasks
        
    Returns:
        Job information with job_id for tracking progress
    """
    logger.info(f"🎬 New processing request: {request.url}")
    
    try:
        # Extract video ID
        video_id = extract_video_id(request.url)
        
        # Check if already processed
        # FIX: Use absolute path to prevent cwd-relative resolution
        abs_output_dir = get_absolute_output_dir()
        output_dir = abs_output_dir / video_id
        metadata_file = output_dir / "metadata.json"
        
        if request.skip_if_exists and metadata_file.exists():
            logger.info(f"✅ Video {video_id} already processed, skipping")
            return ProcessResponse(
                job_id="cached",
                video_id=video_id,
                status=JobStatus.COMPLETED,
                message=f"Video already processed: {video_id}"
            )
        
        # Create job
        job_id = str(uuid.uuid4())
        
        job = JobInfo(
            job_id=job_id,
            video_id=video_id,
            status=JobStatus.QUEUED,
            progress=0.0,
            current_stage="Queued",
            message="Job queued for processing",
            started_at=datetime.utcnow().isoformat()
        )
        
        jobs[job_id] = job
        
        # Start processing in background
        background_tasks.add_task(process_video_task, job_id, request.url)
        
        logger.info(f"✅ Job {job_id} created for video {video_id}")
        
        return ProcessResponse(
            job_id=job_id,
            video_id=video_id,
            status=JobStatus.QUEUED,
            message=f"Processing started: {job_id}"
        )
        
    except Exception as e:
        logger.error(f"❌ Failed to create processing job: {e}")
        raise HTTPException(status_code=400, detail=str(e))


@router.get("/jobs/{job_id}", response_model=JobInfo)
async def get_job_status(job_id: str):
    """
    Get status of a processing job.
    
    Args:
        job_id: Job identifier
        
    Returns:
        Current job status and progress
    """
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
    
    return jobs[job_id]


@router.websocket("/ws/progress/{job_id}")
async def websocket_progress(websocket: WebSocket, job_id: str):
    """
    WebSocket endpoint for real-time job progress updates.
    
    Client connects with job_id and receives progress updates as they happen.
    
    Message format:
    {
        "job_id": "...",
        "status": "processing_vad",
        "progress": 0.45,
        "current_stage": "Voice Activity Detection",
        "message": "Processing audio...",
        "timestamp": "2024-01-01T12:00:00"
    }
    """
    await websocket.accept()
    logger.info(f"📡 WebSocket connected for job {job_id}")
    
    # Register connection
    if job_id not in ws_connections:
        ws_connections[job_id] = []
    ws_connections[job_id].append(websocket)
    
    try:
        # Send initial status
        if job_id in jobs:
            await websocket.send_json({
                "job_id": job_id,
                "status": jobs[job_id].status,
                "progress": jobs[job_id].progress,
                "current_stage": jobs[job_id].current_stage,
                "message": jobs[job_id].message,
                "timestamp": datetime.utcnow().isoformat()
            })
        
        # Keep connection alive and listen for client messages
        while True:
            # Wait for client messages (ping/pong)
            try:
                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
                # Echo back to keep connection alive
                await websocket.send_json({"type": "pong"})
            except asyncio.TimeoutError:
                # Send heartbeat if no messages
                await websocket.send_json({"type": "heartbeat"})
                
    except WebSocketDisconnect:
        logger.info(f"📡 WebSocket disconnected for job {job_id}")
    except Exception as e:
        logger.error(f"❌ WebSocket error for job {job_id}: {e}")
    finally:
        # Remove connection
        if job_id in ws_connections:
            ws_connections[job_id].remove(websocket)
            if not ws_connections[job_id]:
                del ws_connections[job_id]


# === Background Processing Task ===
async def update_job_progress(job_id: str, status: JobStatus, progress: float, stage: str, message: str):
    """Update job progress and notify WebSocket clients"""
    if job_id not in jobs:
        return
    
    job = jobs[job_id]
    job.status = status
    job.progress = progress
    job.current_stage = stage
    job.message = message
    
    # Notify WebSocket clients
    if job_id in ws_connections:
        update_msg = {
            "job_id": job_id,
            "status": status,
            "progress": progress,
            "current_stage": stage,
            "message": message,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        for ws in ws_connections[job_id]:
            try:
                await ws.send_json(update_msg)
            except Exception as e:
                logger.error(f"Failed to send WebSocket update: {e}")


async def process_video_task(job_id: str, video_url: str):
    """
    Background task to process video through the pipeline.
    
    Uses the main pipeline's process_single_video which handles everything:
    download, VAD, diarization, embeddings, clustering, etc.
    """
    logger.info(f"🚀 Starting processing for job {job_id}: {video_url}")
    
    try:
        config = Config()
        # FIX: Convert output_dir to absolute path BEFORE passing to pipeline
        # Otherwise process_single_video will resolve relative to cwd (visualizer/backend/)
        config.output_dir = str(get_absolute_output_dir())
        video_id = extract_video_id(video_url)
        
        # === Stage 1: Validation ===
        await update_job_progress(
            job_id, JobStatus.VALIDATING, 0.05, 
            "Validation", "Validating video..."
        )
        
        from src.download import validate_video
        is_valid, message, info = validate_video(video_url)
        
        if not is_valid:
            raise VideoValidationError(f"Video validation failed: {message}")
        
        # === Stage 2: Download + Full Processing ===
        await update_job_progress(
            job_id, JobStatus.DOWNLOADING, 0.15,
            "Processing", "Starting full pipeline (download, VAD, diarization, clustering)..."
        )
        
        # Import and run the main pipeline (handles everything)
        from pipeline import process_single_video
        
        # Define a progress callback to update job status
        def progress_callback(stage: str, progress: float):
            """Update progress based on pipeline stage"""
            asyncio.run_coroutine_threadsafe(
                update_job_progress(
                    job_id, 
                    JobStatus.PROCESSING_DIARIZATION if progress < 0.8 else JobStatus.FINALIZING,
                    progress,
                    stage,
                    f"Processing: {stage}"
                ),
                asyncio.get_event_loop()
            )
        
        # Run the full pipeline in a thread (it handles all stages internally)
        await asyncio.to_thread(process_single_video, video_url, config)
        
        # === Video download DISABLED (user request) ===
        # Was downloading 500MB+ video just for visualization, not used in actual workflow.
        # To re-enable: uncomment the block below
        # try:
        #     from services.video_downloader import download_video as download_video_file
        #     video_path = await asyncio.to_thread(
        #         download_video_file, video_url, Path(config.output_dir) / video_id
        #     )
        #     logger.info(f"✅ Video downloaded for visualization: {video_path}")
        # except Exception as e:
        #     logger.warning(f"⚠️ Video download failed (audio-only mode): {e}")
        
        # === Stage 3: Finalize ===
        await update_job_progress(
            job_id, JobStatus.FINALIZING, 0.95,
            "Finalizing", "Processing complete, verifying results..."
        )
        
        # Verify metadata was created
        metadata_file = Path(config.output_dir) / video_id / "metadata.json"
        if not metadata_file.exists():
            raise Exception(f"Pipeline completed but metadata not found: {metadata_file}")
        
        # Mark as completed
        job = jobs[job_id]
        job.status = JobStatus.COMPLETED
        job.progress = 1.0
        job.current_stage = "Completed"
        job.message = f"Processing complete! Video ID: {video_id}"
        job.completed_at = datetime.utcnow().isoformat()
        
        await update_job_progress(
            job_id, JobStatus.COMPLETED, 1.0,
            "Completed", f"Processing complete! Video ID: {video_id}"
        )
        
        logger.info(f"✅ Processing complete for job {job_id}")
        
    except VideoValidationError as e:
        logger.error(f"❌ Validation error for job {job_id}: {e}")
        job = jobs[job_id]
        job.status = JobStatus.FAILED
        job.error = str(e)
        job.completed_at = datetime.utcnow().isoformat()
        
        await update_job_progress(
            job_id, JobStatus.FAILED, 0.0,
            "Failed", f"Validation failed: {str(e)}"
        )
        
    except Exception as e:
        logger.error(f"❌ Processing failed for job {job_id}: {e}")
        job = jobs[job_id]
        job.status = JobStatus.FAILED
        job.error = str(e)
        job.completed_at = datetime.utcnow().isoformat()
        
        await update_job_progress(
            job_id, JobStatus.FAILED, 0.0,
            "Failed", f"Processing failed: {str(e)}"
        )


# ============================================================================
# FILE UPLOAD PROCESSING
# ============================================================================

class UploadResponse(BaseModel):
    """Response for file upload"""
    job_id: str
    video_id: str
    filename: str
    file_size: int
    message: str


@router.post("/upload", response_model=UploadResponse)
async def upload_and_process(
    background_tasks: BackgroundTasks,
    file: UploadFile = File(...),
):
    """
    Upload a video/audio file and process it through the diarization pipeline.
    
    Supports:
    - Video files: MP4, MKV, AVI, MOV, WebM
    - Audio files: WAV, MP3, FLAC, M4A, AAC
    
    No file size limit - uses streaming upload.
    Processing runs in background - use /api/jobs/{job_id} to check status.
    """
    logger.info(f"📤 Upload received: {file.filename} ({file.content_type})")
    
    # Validate file type
    allowed_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.webm', '.wav', '.mp3', '.flac', '.m4a', '.aac'}
    file_ext = Path(file.filename).suffix.lower()
    
    if file_ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported file type: {file_ext}. Allowed: {', '.join(allowed_extensions)}"
        )
    
    # Generate video_id from filename
    filename_stem = Path(file.filename).stem
    # Create a unique ID using filename + timestamp hash
    unique_str = f"{filename_stem}_{int(time.time())}"
    video_id = f"upload_{hashlib.md5(unique_str.encode()).hexdigest()[:8]}"
    
    # Create output directory
    output_dir = get_absolute_output_dir() / video_id
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Save uploaded file (streaming - no memory limit)
    is_video = file_ext in {'.mp4', '.mkv', '.avi', '.mov', '.webm'}
    
    if is_video:
        saved_file = output_dir / f"{video_id}{file_ext}"
    else:
        saved_file = output_dir / f"{video_id}_original{file_ext}"
    
    file_size = 0
    try:
        with open(saved_file, "wb") as buffer:
            while chunk := await file.read(1024 * 1024):  # 1MB chunks
                buffer.write(chunk)
                file_size += len(chunk)
        
        logger.info(f"✅ File saved: {saved_file} ({file_size / (1024*1024):.1f} MB)")
    except Exception as e:
        logger.error(f"❌ Failed to save file: {e}")
        raise HTTPException(status_code=500, detail=f"Failed to save file: {e}")
    
    # Create job for tracking
    job_id = str(uuid.uuid4())
    job = JobInfo(
        job_id=job_id,
        video_id=video_id,
        video_url=f"file://{saved_file}",
        status=JobStatus.QUEUED,
        progress=0.0,
        current_stage="Queued",
        message=f"File uploaded: {file.filename}",
        created_at=datetime.utcnow().isoformat()
    )
    jobs[job_id] = job
    
    # Start background processing
    background_tasks.add_task(
        process_uploaded_file_task,
        job_id,
        video_id,
        str(saved_file),
        filename_stem,
        is_video
    )
    
    return UploadResponse(
        job_id=job_id,
        video_id=video_id,
        filename=file.filename,
        file_size=file_size,
        message="File uploaded successfully. Processing started."
    )


async def process_uploaded_file_task(
    job_id: str,
    video_id: str,
    file_path: str,
    original_filename: str,
    is_video: bool
):
    """
    Background task to process an uploaded file.
    
    Similar to process_video_task but handles local files instead of YouTube URLs.
    """
    logger.info(f"🚀 Starting processing for uploaded file: {original_filename}")
    
    try:
        output_dir = get_absolute_output_dir() / video_id
        audio_path = output_dir / f"{video_id}_trimmed.wav"
        
        # === Stage 1: Extract audio (if video) ===
        await update_job_progress(
            job_id, JobStatus.DOWNLOADING, 0.1,
            "Extracting Audio", "Converting to 16kHz WAV..."
        )
        
        if is_video or not file_path.endswith('.wav'):
            # Extract audio using ffmpeg
            cmd = [
                "ffmpeg", "-y",
                "-i", file_path,
                "-vn",
                "-acodec", "pcm_s16le",
                "-ar", "16000",
                "-ac", "1",
                str(audio_path)
            ]
            result = subprocess.run(cmd, capture_output=True, timeout=600)
            if result.returncode != 0:
                raise Exception(f"FFmpeg error: {result.stderr.decode()[:500]}")
            
            # Copy video file to standard name if needed
            if is_video:
                video_dest = output_dir / f"{video_id}.mp4"
                if not video_dest.exists() and Path(file_path).suffix == '.mp4':
                    shutil.copy(file_path, video_dest)
                elif not video_dest.exists():
                    # Convert to MP4 for visualization
                    convert_cmd = ["ffmpeg", "-y", "-i", file_path, "-c:v", "copy", "-c:a", "aac", str(video_dest)]
                    subprocess.run(convert_cmd, capture_output=True, timeout=600)
        else:
            # Already WAV, just copy/rename
            shutil.copy(file_path, audio_path)
        
        logger.info(f"✅ Audio extracted: {audio_path}")
        
        # === Stage 2: Get audio duration ===
        import torchaudio
        waveform, sr = torchaudio.load(str(audio_path))
        total_duration = waveform.shape[1] / sr
        logger.info(f"   Duration: {total_duration:.1f}s ({total_duration/60:.1f}min)")
        
        # === Stage 3: Run full pipeline ===
        # 
        # TODO(REFACTOR): This is a duplicated implementation of the main pipeline.py
        # Should be refactored to call a shared process_audio_file() function.
        # For now, we keep them in sync manually. Any new pipeline features must be
        # added to BOTH pipeline.py AND this file upload processing path.
        # See: https://github.com/... (create issue to track this)
        #
        await update_job_progress(
            job_id, JobStatus.PROCESSING_DIARIZATION, 0.2,
            "Diarization", "Running VAD, diarization, embeddings, clustering..."
        )
        
        # Import pipeline modules
        from src.config import Config
        from src.models import MODELS
        from src.vad import run_vad_parallel
        from src.diarization import create_chunks, run_diarization
        from src.overlap_detection import detect_nonspeech_regions, split_on_overlaps, filter_by_quality
        from src.unified_embeddings import UnifiedChunkEmbeddingCache, UnifiedEmbeddingConfig, build_segment_embeddings_from_cache, execute_chunk_reassignment_cached
        from src.clustering import merge_speakers, merge_adjacent_segments
        from src.audio_buffer import AudioBuffer
        from collections import defaultdict
        import numpy as np
        
        config = Config()
        config.output_dir = str(output_dir.parent)
        
        # Load models if not already loaded
        if not MODELS._loaded:
            logger.info("Loading models...")
            await asyncio.to_thread(MODELS.load_all, config)
        
        # Load audio buffer
        audio_buffer = AudioBuffer.from_file(audio_path)
        
        # VAD
        logger.info("Running VAD...")
        vad_segments = await asyncio.to_thread(
            run_vad_parallel, str(audio_path), config, audio_buffer
        )
        
        await update_job_progress(
            job_id, JobStatus.PROCESSING_DIARIZATION, 0.35,
            "Chunking", f"VAD complete: {len(vad_segments)} speech segments"
        )
        
        # Chunking
        chunks = await asyncio.to_thread(
            create_chunks, str(audio_path), vad_segments, config, audio_buffer, True
        )
        
        # Diarization
        await update_job_progress(
            job_id, JobStatus.PROCESSING_DIARIZATION, 0.45,
            "Diarization", "Running speaker diarization..."
        )
        
        segments, overlap_segments = await asyncio.to_thread(
            run_diarization, chunks, config, config.detect_overlap
        )
        nonspeech_segments = detect_nonspeech_regions(vad_segments, total_duration)
        
        # Split overlaps
        if overlap_segments and config.detect_overlap:
            segments = split_on_overlaps(segments, overlap_segments, min_segment=config.min_segment_duration)
        
        # Quality filter
        if config.filter_by_quality:
            segments, low_quality_segments = await asyncio.to_thread(
                filter_by_quality, segments, str(audio_path),
                config.min_snr_db, config.min_quality_score, audio_buffer
            )
        else:
            low_quality_segments = []
        
        await update_job_progress(
            job_id, JobStatus.PROCESSING_EMBEDDINGS, 0.55,
            "Embeddings", "Extracting speaker embeddings..."
        )
        
        # Embeddings
        embedding_config = UnifiedEmbeddingConfig(
            chunk_duration=1.5,
            min_speech_ratio=config.chunk_reassignment_min_speech,
            batch_size=128,
            severe_threshold=config.chunk_reassignment_severe,
            normal_threshold=config.chunk_reassignment_threshold,
        )
        
        embedding_cache = UnifiedChunkEmbeddingCache(
            audio_buffer=audio_buffer,
            vad_segments=vad_segments,
            embedding_model=MODELS.embedding_model,
            vad_model=MODELS.silero_vad,
            vad_utils=MODELS.silero_utils,
            config=embedding_config,
            device=MODELS.get_device()
        )
        await asyncio.to_thread(embedding_cache.build)
        
        embeddings = build_segment_embeddings_from_cache(segments, embedding_cache)
        
        await update_job_progress(
            job_id, JobStatus.PROCESSING_EMBEDDINGS, 0.7,
            "Clustering", "Merging speakers..."
        )
        
        # Clustering
        segments = await asyncio.to_thread(
            merge_speakers, str(audio_path), segments, embeddings, config
        )
        
        # Chunk reassignment
        if config.enable_chunk_reassignment:
            speaker_embeddings = defaultdict(list)
            for i, seg in enumerate(segments):
                if i in embeddings and seg.get('speaker') not in ['OVERLAP', 'NON_SPEECH']:
                    speaker_embeddings[seg['speaker']].append(embeddings[i])
            
            speaker_centroids = {
                spk: np.mean(embs, axis=0) 
                for spk, embs in speaker_embeddings.items() 
                if embs
            }
            
            segments, _ = await asyncio.to_thread(
                execute_chunk_reassignment_cached,
                segments, embedding_cache, speaker_centroids,
                config.chunk_reassignment_min_portion * 2,
                config.chunk_reassignment_min_portion,
                0.55, 0.10, True
            )
        
        await update_job_progress(
            job_id, JobStatus.FINALIZING, 0.85,
            "Finalizing", "Building output..."
        )
        
        # === Stage 4: Finalize ===
        # 1) Merge adjacent same-speaker segments.
        #
        # Include low-quality segments as UNUSABLE boundaries so we never merge
        # across them (prevents "smearing" bad audio into clean clips).
        speaker_segments_for_merge = list(segments)
        for seg in low_quality_segments:
            seg['status'] = 'unusable'
            seg['unusable_reason'] = seg.get('unusable_reason', 'low_quality')
            speaker_segments_for_merge.append(seg)
        
        # Treat overlap regions as hard boundaries so we never merge a "clean"
        # speaker segment across an overlap gap (would re-include multi-speaker audio).
        for seg in overlap_segments:
            speaker_segments_for_merge.append({
                'start': seg['start'], 'end': seg['end'], 'duration': seg['duration'],
                'speaker': 'OVERLAP', 'status': 'unusable', 'unusable_reason': 'overlap'
            })
        
        speaker_segments_for_merge.sort(key=lambda x: x['start'])
        merged_all_segments = merge_adjacent_segments(speaker_segments_for_merge, config)
        merged_speaker_segments = [
            s for s in merged_all_segments
            if s.get('speaker') not in ['OVERLAP', 'NON_SPEECH']
        ]
        overlap_output_segments = [s for s in merged_all_segments if s.get('speaker') == 'OVERLAP']
        
        # 2) Drop short speaker segments entirely (post-merge)
        if config.min_tts_duration > 0:
            kept = []
            dropped = 0
            for seg in merged_speaker_segments:
                if seg['duration'] >= config.min_tts_duration:
                    kept.append(seg)
                else:
                    dropped += 1
            
            if dropped > 0:
                logger.info(
                    f"   Duration filter (post-merge): dropped {dropped} speaker segments "
                    f"<{config.min_tts_duration}s"
                )
            
            merged_speaker_segments = kept
        
        segments = merged_speaker_segments + overlap_output_segments
        
        # 3) Add filtered non-speech (only inter-speaker gaps)
        #
        # NOTE: this is containment-based, and we run it AFTER adjacent-merge so
        # pauses between same-speaker fragments become internal and get filtered.
        speaker_segs = [s for s in segments if s.get('speaker') not in ['OVERLAP', 'NON_SPEECH', None]]
        overlap_segs = [s for s in segments if s.get('speaker') == 'OVERLAP']
        
        for seg in nonspeech_segments:
            ns_start, ns_end = seg['start'], seg['end']
            is_internal = False
            for sp in speaker_segs:
                if sp['start'] <= ns_start and ns_end <= sp['end']:
                    has_overlap_nearby = any(
                        ov['start'] < ns_end and ov['end'] > ns_start
                        for ov in overlap_segs
                    )
                    if not has_overlap_nearby:
                        is_internal = True
                        break
            
            if not is_internal:
                segments.append({
                    'start': seg['start'], 'end': seg['end'], 'duration': seg['duration'],
                    'speaker': 'NON_SPEECH', 'status': 'unusable', 'unusable_reason': 'non_speech'
                })
        
        segments.sort(key=lambda x: x['start'])
        
        # === OVERLAP DENSITY FILTER (v6.9) ===
        # Mark short segments sandwiched between overlaps as unusable
        # This catches "islands" of supposedly clean audio in high-overlap regions
        from src.overlap_detection import filter_overlap_sandwich_segments
        segments = filter_overlap_sandwich_segments(segments, config)
        
        # Stats (matching main pipeline's output format)
        all_speakers = list(set(s['speaker'] for s in segments if s['speaker'] not in ['OVERLAP', 'NON_SPEECH']))
        usable_segs = [s for s in segments if s.get('status') == 'usable']
        overlap_segs_final = [s for s in segments if s.get('speaker') == 'OVERLAP']
        nonspeech_segs = [s for s in segments if s.get('speaker') == 'NON_SPEECH']
        low_quality_segs_final = [s for s in segments if s.get('unusable_reason') == 'low_quality']
        overlap_proximity_segs = [s for s in segments if s.get('unusable_reason') == 'overlap_proximity']  # v6.9
        
        usable_duration = sum(s['duration'] for s in usable_segs)
        overlap_duration = sum(s['duration'] for s in overlap_segs_final)
        nonspeech_duration = sum(s['duration'] for s in nonspeech_segs)
        low_quality_duration = sum(s['duration'] for s in low_quality_segs_final)
        overlap_proximity_duration = sum(s['duration'] for s in overlap_proximity_segs)
        
        # Build output segments (matching main pipeline's format with original_start/end)
        output_segments = []
        for seg in segments:
            output_segments.append({
                'start': round(seg['start'], 3),
                'end': round(seg['end'], 3),
                'duration': round(seg['duration'], 3),
                'speaker': seg['speaker'],
                'status': seg.get('status', 'usable'),
                # For file uploads, original timestamps = processed timestamps (no intro skip)
                'original_start': round(seg['start'], 3),
                'original_end': round(seg['end'], 3),
            })
            if 'unusable_reason' in seg:
                output_segments[-1]['unusable_reason'] = seg['unusable_reason']
            if 'quality' in seg:
                output_segments[-1]['quality_score'] = seg['quality'].get('quality_score')
        
        # Build metadata (matching main pipeline's output format)
        result = {
            'video_id': video_id,
            'video_title': original_filename,
            'youtube_url': f"file://{file_path}",
            'original_duration': round(total_duration, 2),
            'intro_skipped': 0.0,  # No intro skip for file uploads
            'processed_duration': round(total_duration, 2),
            'num_speakers': len(all_speakers),
            'speakers': all_speakers,
            'total_segments': len(segments),
            'segments': output_segments,
            'quality_stats': {
                'usable_segments': len(usable_segs),
                'usable_duration': round(usable_duration, 2),
                'usable_percentage': round(usable_duration / total_duration * 100, 1) if total_duration > 0 else 0,
                'overlap_segments': len(overlap_segs_final),
                'overlap_duration': round(overlap_duration, 2),
                'overlap_percentage': round(overlap_duration / total_duration * 100, 1) if total_duration > 0 else 0,
                'nonspeech_segments': len(nonspeech_segs),
                'nonspeech_duration': round(nonspeech_duration, 2),
                'nonspeech_percentage': round(nonspeech_duration / total_duration * 100, 1) if total_duration > 0 else 0,
                'low_quality_segments': len(low_quality_segs_final),
                'low_quality_duration': round(low_quality_duration, 2),
                # v6.9 overlap density filter
                'overlap_proximity_segments': len(overlap_proximity_segs),
                'overlap_proximity_duration': round(overlap_proximity_duration, 2),
            },
            'processed_at': datetime.utcnow().isoformat(),
            'pipeline_version': 'v6.9-upload',  # Updated to match main pipeline version
        }
        
        # Save metadata
        with open(output_dir / "metadata.json", 'w') as f:
            json.dump(result, f, indent=2)
        
        logger.info(f"✅ Processing complete: {len(usable_segs)} usable segments")
        
        # Mark job complete
        job = jobs[job_id]
        job.status = JobStatus.COMPLETED
        job.progress = 1.0
        job.current_stage = "Completed"
        job.message = f"Processing complete! {len(all_speakers)} speakers, {len(usable_segs)} usable segments"
        job.completed_at = datetime.utcnow().isoformat()
        
        await update_job_progress(
            job_id, JobStatus.COMPLETED, 1.0,
            "Completed", f"Processing complete! Video ID: {video_id}"
        )
        
    except Exception as e:
        logger.error(f"❌ Processing failed for uploaded file: {e}")
        import traceback
        traceback.print_exc()
        
        job = jobs[job_id]
        job.status = JobStatus.FAILED
        job.error = str(e)
        job.completed_at = datetime.utcnow().isoformat()
        
        await update_job_progress(
            job_id, JobStatus.FAILED, 0.0,
            "Failed", f"Processing failed: {str(e)}"
        )

