o
    1iM                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	 ddl
mZ ddlZddlmZ ejejddd eeZd	Zd
d ZdddZdddZdd Zedkr[e  dS dS ) a  
Download parquet shards from an R2 bucket to local disk.

By default the script drops `vox_speaker_embedding` (binary blobs) to save
30-40% space. This keeps the historical validation archive and the recover
validation archive compact enough for local DuckDB analytics.

Usage:
  python scripts/pull_shards.py
  python scripts/pull_shards.py --bucket validationsrecoverfinal       --output data/recover_validation_shards
  python scripts/pull_shards.py --keep-all-columns
    )annotationsN)ThreadPoolExecutoras_completed)Pathz%(asctime)s %(message)sz%H:%M:%S)levelformatdatefmt)vox_speaker_embeddingc                  C  sJ   ddl m}  | tt jjd  tjdt	dt	dt	ddd	S )
Nr   load_dotenvz.envs3R2_ENDPOINT_URLR2_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYauto)endpoint_urlaws_access_key_idaws_secret_access_keyregion_name)
dotenvr   r   __file__resolveparentboto3clientosgetenvr
    r   scripts/pull_shards.pyget_s3    s   r   bucketstrprefixreturn
list[dict]c                 C  s^   g }|  d}|j||dD ]}|dg D ]}|d dr+||d |d d qq|S )z(List all .parquet objects under shards/.list_objects_v2)BucketPrefixContentsKey.parquetSize)keysize)get_paginatorpaginategetendswithappend)r   r    r"   shards	paginatorpageobjr   r   r   list_shards,   s   
r7   r,   
output_dirr   drop_columnsset[str]tuple[str, int, int]c                  s  | |dd}|| }|jjddd | r|d| jfS ddl}|jdt|d}	zzP| 	|||	 t
|	 j}
 r[tj|	 fd	d
t|	jD d}tj|t|dd nt
|	 | | j}||
|fW W zt|	 W S  ty}   Y S w  ty } z'td| d|  |ddfW  Y d}~W zt|	 W S  ty   Y S w d}~ww zt|	 W w  ty   Y w w )zbDownload a shard, drop heavy columns, write pruned parquet. Returns (key, orig_size, pruned_size).    Tparentsexist_okr   Nr*   )suffixdirc                   s   g | ]}| vr|qS r   r   .0cr9   r   r   
<listcomp>O   s    z&download_and_prune.<locals>.<listcomp>)columnszstd)compressionzFailed z: )replacer   mkdirexistsstatst_sizetempfilemktempr!   download_filer   pq
read_tableread_schemanameswrite_tabler   unlinkOSError	Exceptionloggerwarning)r   r    r"   r,   r8   r9   rel_path
local_pathrP   tmp	orig_sizetablepruned_sizeer   rF   r   download_and_prune7   sL   



rd   c                    s$  t jdd} | jdtdd | jdtdd | jdtd	d | jd
tdd | jddttdd | jdddd |  tj	j
ddd jdrRjnj djr^t ndd jD t }tdj d d t|j}tdd |D }tdt| d|d d d! rtd"t  ntd# fd$d% t }d&}d&}d&}d&}tjd' fd(d)|D }	t|	D ]x}
|
 \}}}|d*7 }|d&kr||7 }||7 }n |d&kr|d&kr|	|
 }|d+ d,d*  s|d*7 }|d- d&ks|t|krKt | }|d&kr*|| nd&}td.| dt| d/|d0d1|d d d2| d3|d4d5 qW d    n	1 sWw   Y  t | }td6| d7|d4d8|d d d9|d d d:|d d d;d*|t|d*  d< d4d=|  d S )>NzDownload parquet shards from R2)descriptionz--bucketzvalidation-results)typedefaultz--prefixzshards/z	--workers    z--outputzdata/validation_shardsz--drop-columnr2   z3Column to drop from downloaded parquet (repeatable))actionrg   helpz--keep-all-columns
store_truez4Do not prune any columns from the downloaded parquet)ri   rj   Tr>   /c                 S  s   h | ]}|r|qS r   r   rC   r   r   r   	<setcomp>x   s    zmain.<locals>.<setcomp>zListing shards in s3://z ...c                 s  s    | ]}|d  V  qdS )r-   Nr   rD   sr   r   r   	<genexpr>}   s    zmain.<locals>.<genexpr>zFound z	 shards (g    eAz.2fz
 GB in R2)zPruning columns: zKeeping all parquet columnsc                   s    t  }t| j| d dS )Nr,   )r    r"   r,   r8   r9   )r   rd   r    )shard	thread_s3)argsr9   r8   r"   r   r   _worker   s   zmain.<locals>._workerr   )max_workersc                   s   i | ]	}  ||qS r   )submitrn   )rt   poolr   r   
<dictcomp>   s    zmain.<locals>.<dictcomp>r=   r,   r<      [z] z.1fz shards/s | pruned z GB | failed z | z.0fro   z
Done: z shards in zs
  R2 size:    z GB
  Downloaded: z# GB (before pruning)
  On disk:    z GB
  Savings:    d   z%
  Failed:     ) argparseArgumentParseradd_argumentr!   intlistDEFAULT_DROP_COLUMNS
parse_argsr   outputrL   r"   r1   keep_all_columnssetdrop_columnr   r[   infor    r7   sumlensortedtimer   workersr   resultrK   rM   max)parserr   r3   total_r2_bytest0done
orig_totalpruned_totalfailedfuturesfutr,   origprunedrq   elapsedrater   )rt   rs   r9   r8   rw   r"   r   mainb   s   
"

r   __main__)r    r!   r"   r!   r#   r$   )r    r!   r"   r!   r,   r!   r8   r   r9   r:   r#   r;   )__doc__
__future__r   r|   loggingr   r   concurrent.futuresr   r   pathlibr   r   pyarrow.parquetparquetrS   basicConfigINFO	getLogger__name__r[   r   r   r7   rd   r   r   r   r   r   <module>   s(    


+V
