o
    -wi                     @   s   d dl Zd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlZd dlZd dlmZmZmZ d dlmZ e	rZd dlmZ d dlmZ d dlmZmZ d dlmZ eeZG d	d
 d
Z dS )    N)TYPE_CHECKINGOptionalTuple)statsstep_checksumstep_upload)LogicalPath)ArtifactManifest)SaveFn)file_streaminternal_api)SettingsStaticc                	   @   s   e Zd ZdZdZ	d1ddddded	 d
dfddZd2ddZd
ee	e
jf fddZd3de	d
dfddZd
e
jfddZd3dedede	fddZdddedd d
dfd!d"Zdd#ded$e	d%ejd&d'fd(d)Zd1d*eej fd+d,Zd2d-d.Zd
e	fd/d0ZdS )4
FilePushera0  Parallel file upload class.

    This manages uploading multiple files in parallel. It will restart a given file's
    upload job if it receives a notification that that file has been modified. The
    finish() method will block until all events have been processed and all uploads are
    complete.
    @   Napizinternal_api.Apir   zfile_stream.FileStreamApisettingsr   returnc                 C   s   || _ td| _t | _t | _	t | _
t| j | j| j	| j
| j| _| j  tj| j | j| j
| j||d| _| j  t | _tjdr]tj| jddd| _| j  d S d S )Nwandb)r   r   WANDB_DEBUGTFPStatsThread)targetdaemonname)_apitempfileTemporaryDirectory_tempdirr   Stats_statsqueueQueue_incoming_queue_event_queuer   StepChecksum_step_checksumstartr   
StepUploadMAX_UPLOAD_JOBS_step_upload	threadingEvent_stats_thread_stoposenvirongetThread_file_pusher_stats_stats_thread)selfr   r   r    r3   [/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/wandb/sdk/internal/file_pusher.py__init__$   s>   





zFilePusher.__init__c                 C   s:   | j  std| jj  td | j  rd S d S )NzFilePusher stats:    )r+   is_setloggerinfor   timesleepr2   r3   r3   r4   r0   S   s   

zFilePusher._file_pusher_statsc                 C   s   |   }| j }||fS N)is_aliver   summary)r2   runningr?   r3   r3   r4   
get_statusX   s   
zFilePusher.get_statusTprefixc                 C   s   d}g d}d}	 |   sd}| j }d|jd dd|jd dd	|jd dd
}||d  | }|d7 }tj|d|d |rCnt	d q	|jdkrV|jt
|j nd}|dkrgtjd|d  |d tjd|d d S )Nr   )-\|/FT g      0Az.2fzMB of zMB uploaded (zMB deduped)   r6   )newlinerB   g      ?g{Gz?z5W&B sync reduced upload amount by %.1f%%             d   )rB   zO                                                                               )r>   r   r?   uploaded_bytestotal_bytesdeduped_bytesr   termlogr:   r;   float)r2   rB   stepspinner_statesstopr?   linededupe_fractionr3   r3   r4   print_status]   s6   
0

zFilePusher.print_statusc                 C   s
   | j  S r=   )r   file_counts_by_categoryr<   r3   r3   r4   rV   z   s   
z"FilePusher.file_counts_by_category	save_namepathcopyc                 C   sN   t j|rt j|sdS t j|dkrdS t|||}| j| dS )a  Tell the file pusher that a file's changed and should be uploaded.

        Args:
            save_name: string logical location of the file relative to the run
                directory.
            path: actual string path of the file to upload on the filesystem.
        Nr   )	r,   rX   existsisfilegetsizer   RequestUploadr!   put)r2   rW   rX   rY   eventr3   r3   r4   file_changed}   s   	zFilePusher.file_changedmanifestr	   artifact_idsave_fnr
   c                 C   s   t |||}| j| d S r=   )r   RequestStoreManifestFilesr!   r^   )r2   ra   rb   rc   r_   r3   r3   r4   store_manifest_files   s   zFilePusher.store_manifest_files)finalizerf   before_commitresult_futurezconcurrent.futures.Future[None]c                C   s    t ||||}| j| d S r=   )r   RequestCommitArtifactr!   r^   )r2   rb   rf   rg   rh   r_   r3   r3   r4   commit_artifact   s   zFilePusher.commit_artifactcallbackc                 C   s*   t d | jt| | j  d S )Nzshutting down file pusher)r8   r9   r!   r^   r   RequestFinishr+   set)r2   rk   r3   r3   r4   finish   s   
zFilePusher.finishc                 C   s2   t d |  rtd |  s	| j  d S )Nzwaiting for file pusherg      ?)r8   r9   r>   r:   r;   r   cleanupr<   r3   r3   r4   join   s
   

zFilePusher.joinc                 C   s   | j  p	| j S r=   )r$   r>   r(   r<   r3   r3   r4   r>      s   zFilePusher.is_aliver=   )r   N)T)__name__
__module____qualname____doc__r'   r   r5   r0   r   boolr   SummaryrA   rU   FileCountsByCategoryrV   r   strr`   re   r   PreCommitFnrj   OnRequestFinishFnrn   rp   r>   r3   r3   r3   r4   r      sP    

/


r   )!concurrent.futures
concurrentloggingr,   r   r   r)   r:   typingr   r   r   r   
wandb.utilwandb.filesyncr   r   r   wandb.sdk.lib.pathsr   %wandb.sdk.artifacts.artifact_manifestr	   "wandb.sdk.artifacts.artifact_saverr
   wandb.sdk.internalr   r   "wandb.sdk.internal.settings_staticr   	getLoggerrq   r8   r   r3   r3   r3   r4   <module>   s&    
