o
    /i(%                     @  s  U d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlmZmZ ddlmZ ddlZddlZddlZddlmZ ddlmZ ddlmZ ed	Zeed
  ejej dd e!e"Z#dZ$dZ%ddgdgdgdgdgdgdgdgdgd	Z&de'd< ddddddd d!d"d#d$d%Z(d&e'd'< d(Z)d)Z*d*Z+d+Z,dId/d0Z-dJd4d5Z.dId6d7Z/dKd:d;Z0eG d<d= d=Z1eG d>d? d?Z2d@dA Z3dLdDdEZ4dFdG Z5e"dHkre5  dS dS )MuN  
Patch all Stage B final-shard metadata.parquet files in R2.

Three fixes applied to every row in every shard:
  1. Normalize Unicode decimal digits → ASCII (all text fields)
  2. Null transcription_native where script doesn't match segment_language
  3. Clean transcription_romanized: IAST→ASCII, strip leaked Indic/Arabic chars
    )annotationsN)ThreadPoolExecutoras_completed)	dataclassfield)Path)Config)load_dotenvz/home/ubuntu/transcriptsz.envz%%(asctime)s %(levelname)s %(message)s)levelformatfinalsftdatazproduction-20260312) 	  i	  )i  i  )i	  i	  )i 
  i
  )i
  i
  )i   i  )i  i  )i   i  )i  i  )i     )	
DevanagariBengaliGurmukhiGujaratiOriyaTamilTeluguKannada	Malayalamz dict[str, list[tuple[int, int]]]SCRIPT_RANGESr   r   r   r   r   r   r   r   r   )himrbnaspaguortateknmlzdict[str, str]LANG_TO_TARGETr   r   i   i  text
str | Nonereturnc                 C  sN   | s| S g }| D ]}t |d}|dkr|t| q|| qd|S )Nr    )unicodedatadecimalappendstrjoin)r%   outchd r2   scripts/patch_shard_metadata.pynormalize_digits>   s   
r4   r-   target_scriptboolc                 C  sV   t |g }|s
dS | D ]}t|}|D ]\}}||  kr%|kr'  dS  qqqdS )NTF)r   getord)r%   r5   rangesr0   cplor   r2   r2   r3   has_target_script_charsK   s   r<   c                 C  s   | s| S t d| }g }|D ]0}t |}|drqt|}t|  kr)tkr,n nqt|  kr6tkr9n nq|	| qd
|S )NNFKDMr)   )r*   	normalizecategory
startswithr8   INDIC_BLOCK_MININDIC_BLOCK_MAXARABIC_BLOCK_MINARABIC_BLOCK_MAXr,   r.   )r%   nfkdr/   r0   catr:   r2   r2   r3   clean_romanizedW   s   


rH   databytesc                 C  s   t |  S )N)hashlibsha256	hexdigest)rI   r2   r2   r3   sha256_bytesi   s   rN   c                   @  s.   e Zd ZU ded< ded< ded< ded< dS )	ShardInfor-   shard_idlanguagemetadata_keymanifest_keyN)__name__
__module____qualname____annotations__r2   r2   r2   r3   rO   m   s
   
 rO   c                   @  sh   e Zd ZU dZded< dZded< dZded< dZded< dZded< dZ	ded< e
ed	Zd
ed< dS )
PatchStatsr   intshards_processedshards_failed
rows_totaldigits_patchednative_nulledroman_cleaned)default_factoryz	list[str]errorsN)rT   rU   rV   rZ   rW   r[   r\   r]   r^   r_   r   listra   r2   r2   r2   r3   rX   u   s   
 rX   c                
   C  s4   t jdtjd tjd tjd dtdddid	d
S )Ns3R2_ENDPOINT_URLR2_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYauto@   max_attempts   )max_pool_connectionsretries)endpoint_urlaws_access_key_idaws_secret_access_keyregion_nameconfig)boto3clientosenvironr   r2   r2   r2   r3   make_s3   s   rv   sharddictc              
   C  sl  |j ddddd d d d}z
| jt|jdd  }tt|}|	 }t
||d< t|j}d}|D ]p}|d}	|d}
|d	}t|	}t|
}t|}||	ks_||
ks_||kri|d
  d7  < d}||d< ||d< ||d	< |r|rt||sd|d< |d  d7  < d}t|d	 }||d	 kr||d	< |d  d7  < d}q6|s|W S tjj||jd}t }tj||dd | }t||d< t
||d< | jt|j|d | jt|jdd  }t|d}|d |d< |d |d< tdt |d< | jt|jtj |dd!dd W |S  t"y5 } zt#|d d |d< W Y d }~|S d }~ww )Nr   )rP   rowsr]   r^   r_   errornew_metadata_sha256new_metadata_size)BucketKeyBodyry   Ftranscription_mixedtranscription_nativetranscription_romanizedr]      Tr)   r^   r_   )schemazstd)compressionr{   r|   )r}   r~   r   zutf-8metadata_sha256metadata_size_bytesz%Y-%m-%dT%H:%M:%SZ
patched_at   )indenti,  rz   )$rP   
get_objectBUCKETrR   readpq
read_tableioBytesIO	to_pylistlenr$   r7   rQ   r4   r<   rH   r   Tablefrom_pylistr   write_tablegetvaluerN   
put_objectrS   jsonloadsdecodetimestrftimegmtimedumpsencode	Exceptionr-   )rc   rw   result	meta_bodytablery   r5   changedrow
mixed_orignative_orig
roman_orig	mixed_new
native_new	roman_newr_   	new_tablebufnew_meta_bytesmanifest_bodymanifestexcr2   r2   r3   patch_one_shard   st   


r   c                    s  t tjd } |  }|dtf dd | D }|   t	
dt| ttjdkr6ttjd nd}t }t t }t|d  fd	d
|D }tt|dD ]\}}|| }	| }
| j|
d 7  _| j|
d 7  _| j|
d 7  _| j|
d 7  _|
d r| jd7  _|j|	j d|
d   t	d|t||	j|
d d d  qY| jd7  _|d dks|t|krt | }|dkr|| nd}|dkrt|| | nd}t	
d|t||||j|j|j qYW d    n	1 sw   Y  t | }t	
d t	
d|j|j|j| t	
d|j|j|j |jd d D ]	}t	 d| q1|jdkrHt	d|j t tjd } d| _!|  }|dtf t	
d|j" |   d S )NDATABASE_URLz
        SELECT shard_id, language, metadata_key, manifest_key
        FROM final_export_shards
        WHERE run_id = %s
        ORDER BY language, shard_id
    c                 S  s,   g | ]}t |d  |d |d |d dqS )r   r   r   rj   )rP   rQ   rR   rS   )rO   ).0rr2   r2   r3   
<listcomp>   s    zmain.<locals>.<listcomp>zLoaded %d shards to patchr   0   )max_workersc                   s   i | ]
}  t||qS r2   )submitr   )r   rw   poolrc   r2   r3   
<dictcomp>   s    zmain.<locals>.<dictcomp>ry   r]   r^   r_   rz   z: z[%d/%d] FAIL %s: %sx   d   r   zG[%d/%d] ok  rate=%.1f/s  ETA=%.0fs  digits=%d native_nulled=%d roman=%dz=== PATCH COMPLETE ===z:shards_processed=%d shards_failed=%d rows=%d elapsed=%.1fsz3digits_patched=%d native_nulled=%d roman_cleaned=%d   z  %sz Re-run to retry %d failed shardsTz
        UPDATE final_export_shards
        SET metadata_json = jsonb_set(
            coalesce(metadata_json, '{}'::jsonb),
            '{metadata_patched_at}',
            to_jsonb(now()::text)
        )
        WHERE run_id = %s
    z*Updated %d shard rows with patch timestamp)#psycopg2connectrt   ru   cursorexecuteRUN_IDfetchallcloseloggerinfor   sysargvrY   rX   rv   r   r   	enumerater   r   r\   r]   r^   r_   r[   ra   r,   rP   warningrZ   rz   
autocommitrowcount)conncurshardsr   statst0futuresifuturerw   r   elapsedrateetaerrr2   r   r3   main   sx    &
	r   __main__)r%   r&   r'   r&   )r%   r-   r5   r-   r'   r6   )rI   rJ   r'   r-   )rw   rO   r'   rx   )6__doc__
__future__r   rK   r   r   loggingrt   r   r   r*   concurrent.futuresr   r   dataclassesr   r   pathlibr   rr   r   pyarrowr   pyarrow.parquetparquetr   botocore.configr   dotenvr	   ROOTbasicConfigINFO	getLoggerrT   r   r   r   r   rW   r$   rB   rC   rD   rE   r4   r<   rH   rN   rO   rX   rv   r   r   r2   r2   r2   r3   <module>   sr    






DJ
