o
    lQi>                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlZd dlmZ d dlmZ ddlmZ eeZG d	d
 d
ZdddZdddZdddZdddZdS )    )annotationsN)Path)Optional)TransferConfig)Config   )FinalExportConfigc                   @  sn   e Zd Zd%ddZdd Zd	d
 Zd&ddZdd Zdd Zdd Z	d'ddZ
d'ddZd(d d!Zd"d# Zd$S ))FinalExportReferenceStoreconfigr   
cache_rootr   c                 C  s  || _ |d | _|j| _|j| _|j| _|j| _	|j
| _d | _| j jdkrw| jjddd | jt|j | _| jt|j | _|jrJ| jt|j nd | _|jrX| jt|j nd | _	|jrf| jt|j nd | _|jrt| jt|j nd | _| jd | _d | _t | _d S )Nfinal_export_referencer2Tparentsexist_okzfinal_export_reference.duckdb)r
   	cache_dircanonical_segments_pathlocal_canonical_pathraw_transcripts_pathlocal_raw_transcripts_pathvalidation_pathlocal_validation_pathyoutube_meta_pathlocal_youtube_meta_pathvariants_pathlocal_variants_pathlocal_manifest_pathreference_modemkdir_cache_namecanonical_segments_r2_keyraw_transcripts_r2_keyvalidation_r2_keyyoutube_meta_r2_keyvariants_r2_keyreference_manifest_r2_keyduckdb_path_conasyncioLock_lock)selfr
   r    r,   #src/final_export_reference_store.py__init__   s>   
z"FinalExportReferenceStore.__init__c                   s"   t  }|d | jI d H  d S N)r(   get_running_looprun_in_executor_start_syncr+   loopr,   r,   r-   start=   s   zFinalExportReferenceStore.startc                   s0   | j d u rd S t }|d | jI d H  d S r/   )r'   r(   r0   r1   _close_syncr3   r,   r,   r-   closeA   s
   
zFinalExportReferenceStore.closevideo_idstrreturndict[str, dict]c                   sR   | j d u r	td| j d|g}dd |jD  dd  fdd| D D S )	Nz,Final export reference store not initializeda  
            SELECT
                c.video_id,
                c.recommended_action,
                c.queue_language,
                c.corrected_language,
                c.segment_file,
                c.speaker_id,
                c.parent_segment_file,
                c.is_split_segment,
                c.split_index_from_id,
                c.original_start_ms,
                c.original_end_ms,
                c.trimmed_start_ms,
                c.trimmed_end_ms,
                c.leading_pad_ms,
                c.trailing_pad_ms,
                c.expected_language_hint,
                c.tx_detected_language,
                c.gemini_lang,
                c.segment_language,
                c.transcription,
                c.tagged,
                c.num_unk,
                c.num_inaudible,
                c.num_event_tags,
                c.text_length_per_sec,
                c.tx_quality_score,
                c.asr_eligible,
                c.tts_clean_eligible,
                c.tts_expressive_eligible,
                c.lang_mismatch_flag,
                c.duration_s,
                c.final_validation_source,
                c.final_has_validation,
                c.final_bucket,
                c.input_script_profile,
                coalesce(nullif(c.native_script_text, ''), v.native_script_text) AS native_script_text,
                coalesce(nullif(c.romanized_text, ''), v.romanized_text) AS romanized_text,
                coalesce(nullif(c.variant_route, ''), v.variant_route) AS variant_route,
                coalesce(nullif(c.variant_validation_errors, ''), v.variant_validation_errors) AS variant_validation_errors,
                y.youtube_audio_language,
                y.youtube_default_language,
                y.channel_id,
                y.channel_title,
                y.title,
                y.description,
                y.tags,
                rt.transcription AS raw_transcription,
                rt.tagged AS raw_tagged,
                rt.detected_language AS raw_detected_language,
                rt.quality_score AS raw_quality_score,
                rt.speaker_emotion,
                rt.speaker_style,
                rt.speaker_pace,
                rt.speaker_accent,
                vs.lid_consensus,
                vs.lid_agree_count,
                vs.consensus_lang,
                vs.conformer_multi_ctc_normalized,
                vs.mms_confidence
            FROM canonical_segments c
            LEFT JOIN raw_transcripts rt
                ON rt.video_id = c.video_id
               AND rt.segment_file = c.segment_file
            LEFT JOIN variants_source v
                ON v.video_id = c.video_id
               AND v.segment_file = c.segment_file
            LEFT JOIN validation_source vs
                ON vs.video_id = c.video_id
               AND vs.segment_file = c.segment_file
            LEFT JOIN youtube_meta y
                ON y.video_id = c.video_id
            WHERE c.video_id = ?
            ORDER BY c.segment_file
            c                 S  s   g | ]}|d  qS )r   r,   ).0dr,   r,   r-   
<listcomp>   s    zFFinalExportReferenceStore.get_video_reference_rows.<locals>.<listcomp>c                 S  s.   i | ]}t |d d rt |d  |qS )segment_file )r9   getstrip)r<   rowr,   r,   r-   
<dictcomp>   s    zFFinalExportReferenceStore.get_video_reference_rows.<locals>.<dictcomp>c                 3  s    | ]
}t t |V  qd S r/   )dictzip)r<   itemcolsr,   r-   	<genexpr>   s    zEFinalExportReferenceStore.get_video_reference_rows.<locals>.<genexpr>)r'   RuntimeErrorexecutedescriptionfetchall)r+   r8   relr,   rH   r-   get_video_reference_rowsG   s   
LOz2FinalExportReferenceStore.get_video_reference_rowsc                 C  s0   | j jddd | jjdkr|   |   d S )NTr   r   )r   r   r
   r   _download_reference_files_open_duckdbr+   r,   r,   r-   r2      s   z%FinalExportReferenceStore._start_syncc                 C  s"   | j d ur| j   d | _ d S d S r/   )r'   r7   rS   r,   r,   r-   r6      s   


z%FinalExportReferenceStore._close_syncc              	   C  sP  dd l }|jd| jjj| jjj| jjjdtt| jj	dd}t
ddt| jj	ddd	}| jrB| jjrB| j|| jj| jj| jd d
 | j|| jj| jj| j|d
 | j|| jj| jj| j|d
 | jrv| jjrv| j|| jj| jj| j|d
 | jr| jjr| j|| jj| jj| j|d
 | jr| jjr| j|| jj| jj| j|d
 d S d S d S )Nr   s3auto)max_pool_connections)endpoint_urlaws_access_key_idaws_secret_access_keyregion_namer
   i   r   T)multipart_thresholdmultipart_chunksizemax_concurrencyuse_threads)rT   bucketkeydesttransfer)boto3clientr
   baser2_endpoint_urlr2_access_key_idr2_secret_access_key
BotoConfig_s3_pool_sizereference_download_concurrencyr   maxr   r%   _try_download_optionalreference_bucket_download_requiredr    r   r!   r   r   r"   r   r#   r   r$   )r+   rc   rT   rb   r,   r,   r-   rQ      s|   
z3FinalExportReferenceStore._download_reference_filesr_   r`   ra   c                C  s   | j |||||dd d S )NFrT   r_   r`   ra   rb   force)_download_object)r+   rT   r_   r`   ra   rb   r,   r,   r-   ro      s   z,FinalExportReferenceStore._download_requiredc             
   C  sR   z| j |||||dd W d S  ty( } ztd|| W Y d }~d S d }~ww )NFrp   z2Optional final export reference unavailable %s: %s)rr   	Exceptionloggerinfo)r+   rT   r_   r`   ra   rb   excr,   r,   r-   rm      s   z0FinalExportReferenceStore._try_download_optionalrq   boolc                C  s   |j ||d}t|ddpd}|s*| r*| j|kr*td|jt	| |S td|||t	| t

 }	|d urAd|ini }
|j||t|fi |
 td|jt

 |	  |S )N)BucketKeyContentLengthr   z-Reusing cached final export reference %s (%s)z8Downloading final export reference s3://%s/%s -> %s (%s)r   zDownloaded %s in %.1fs)head_objectintrA   existsstatst_sizert   ru   name_format_bytestimedownload_filer9   )r+   rT   r_   r`   ra   rb   rq   head
size_bytest0extrar,   r,   r-   rr      s"   z*FinalExportReferenceStore._download_objectc                 C  sB  t t| j| _tt pdt| j	j
d}| jd|  | jd t| j}t| j}| jd| d | jd| d | jr\| j r\t| j}| jd| d	 n| jd
 | jrz| j rzt| j}| jd| d	 n| jd | jr| j rt| j}| jd| d d S | jd d S )N   r   zSET threads = z$SET preserve_insertion_order = falsea  
            CREATE OR REPLACE VIEW canonical_segments AS
            SELECT
                *,
                coalesce(
                    nullif(tx_detected_language, ''),
                    nullif(gemini_lang, ''),
                    nullif(corrected_language, ''),
                    nullif(queue_language, '')
                ) AS segment_language
            FROM read_parquet('z')
            zk
            CREATE OR REPLACE VIEW raw_transcripts AS
            SELECT *
            FROM read_parquet('a:  
                CREATE OR REPLACE VIEW variants_source AS
                SELECT
                    coalesce(video_id, regexp_extract(row_id, '^([^/]+)/', 1)) AS video_id,
                    coalesce(segment_id, regexp_extract(row_id, '^[^/]+/(.*)$', 1)) AS segment_file,
                    native_script_text,
                    romanized_text,
                    coalesce(variant_route, processing_route) AS variant_route,
                    coalesce(variant_validation_errors, validation_errors) AS variant_validation_errors
                FROM read_parquet('z')
                a  
                CREATE OR REPLACE VIEW variants_source AS
                SELECT
                    ''::VARCHAR AS video_id,
                    ''::VARCHAR AS segment_file,
                    ''::VARCHAR AS native_script_text,
                    ''::VARCHAR AS romanized_text,
                    ''::VARCHAR AS variant_route,
                    ''::VARCHAR AS variant_validation_errors
                WHERE FALSE
                az  
                CREATE OR REPLACE VIEW validation_source AS
                SELECT
                    video_id,
                    segment_file,
                    lid_consensus,
                    lid_agree_count,
                    consensus_lang,
                    conformer_multi_ctc_normalized,
                    mms_confidence
                FROM read_parquet('a  
                CREATE OR REPLACE VIEW validation_source AS
                SELECT
                    ''::VARCHAR AS video_id,
                    ''::VARCHAR AS segment_file,
                    false AS lid_consensus,
                    0::INTEGER AS lid_agree_count,
                    ''::VARCHAR AS consensus_lang,
                    NULL::DOUBLE AS conformer_multi_ctc_normalized,
                    NULL::DOUBLE AS mms_confidence
                WHERE FALSE
                a  
                CREATE OR REPLACE VIEW youtube_meta AS
                SELECT
                    video_id,
                    regexp_extract(lower(coalesce(default_audio_language, '')), '^([a-z]+)', 1) AS youtube_audio_language,
                    regexp_extract(lower(coalesce(default_language, '')), '^([a-z]+)', 1) AS youtube_default_language,
                    channel_id,
                    channel_title,
                    title,
                    description,
                    tags
                FROM read_csv_auto('z ', header=true)
                a  
                CREATE OR REPLACE VIEW youtube_meta AS
                SELECT
                    ''::VARCHAR AS video_id,
                    ''::VARCHAR AS youtube_audio_language,
                    ''::VARCHAR AS youtube_default_language,
                    ''::VARCHAR AS channel_id,
                    ''::VARCHAR AS channel_title,
                    ''::VARCHAR AS title,
                    ''::VARCHAR AS description,
                    ''::VARCHAR AS tags
                WHERE FALSE
                )duckdbconnectr9   r&   r'   minos	cpu_countrl   r
   duckdb_threadsrL   	_sql_pathr   r   r   r}   r   r   )r+   threadscanonical_pathraw_tx_pathr   r   youtube_pathr,   r,   r-   rR     s\   



	


z&FinalExportReferenceStore._open_duckdbN)r
   r   r   r   )r8   r9   r:   r;   )r_   r9   r`   r9   ra   r   )r_   r9   r`   r9   ra   r   rq   rw   )__name__
__module____qualname__r.   r5   r7   rP   r2   r6   rQ   ro   rm   rr   rR   r,   r,   r,   r-   r	      s    
'
Y
A

r	   r   r|   r:   r9   c                 C  sR   | dkrdS t | }dD ]}|dk s|dkr|d|   S |d }q|  dS )Nr   0B)BKBMBGBTBg      @r   z.1fr   )float)r   valueunitr,   r,   r-   r     s   

r   pathr   c                 C  s   |   ddS )N'z'')as_posixreplace)r   r,   r,   r-   r     s   r   r`   c                 C  s   |  dddS )N/__)rB   r   )r`   r,   r,   r-   r     s   r   download_concurrencyc                 C  s   t | d}t d|d S )Nr          )rl   )r   concurrencyr,   r,   r-   rj     s   
rj   )r   r|   r:   r9   )r   r   r:   r9   )r`   r9   r:   r9   )r   r|   r:   r|   )
__future__r   r(   jsonloggingr   r   pathlibr   typingr   r   boto3.s3.transferr   botocore.configr   ri   final_export_configr   	getLoggerr   rt   r	   r   r   r   rj   r,   r,   r,   r-   <module>   s(    
  
p

