o
    xi                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlZd dlZd dlmZmZmZ d dlmZ er\d dlmZ d dlmZ d dlmZmZ d d	lmZ eeZG d
d dZ dS )    )annotationsN)TYPE_CHECKING)statsstep_checksumstep_upload)LogicalPath)ArtifactManifest)SaveFn)file_streaminternal_api)SettingsStaticc                   @  s   e Zd ZdZdZ	d9d:ddZd;ddZd<ddZd=d>ddZd?ddZ	d=d@d d!Z
dAd'd(Zdd)dBd/d0Zd9dCd3d4Zd;d5d6ZdDd7d8ZdS )E
FilePushera0  Parallel file upload class.

    This manages uploading multiple files in parallel. It will restart a given file's
    upload job if it receives a notification that that file has been modified. The
    finish() method will block until all events have been processed and all uploads are
    complete.
    @   Napiinternal_api.Apir
   file_stream.FileStreamApisettingsSettingsStatic | NonereturnNonec                 C  s   || _ td| _t | _t | _	t | _
t| j | j| j	| j
| j| _| j  tj| j | j| j
| j||d| _| j  t | _tjdr]tj| jddd| _| j  d S d S )Nwandb)r
   r   WANDB_DEBUGTFPStatsThread)targetdaemonname)_apitempfileTemporaryDirectory_tempdirr   Stats_statsqueueQueue_incoming_queue_event_queuer   StepChecksum_step_checksumstartr   
StepUploadMAX_UPLOAD_JOBS_step_upload	threadingEvent_stats_thread_stoposenvirongetThread_file_pusher_stats_stats_thread)selfr   r
   r    r6   R/home/ubuntu/.local/lib/python3.10/site-packages/wandb/sdk/internal/file_pusher.py__init__&   s>   





zFilePusher.__init__c                 C  s:   | j  std| jj  td | j  rd S d S )NzFilePusher stats:    )r.   is_setloggerinfor!   timesleepr5   r6   r6   r7   r3   U   s   

zFilePusher._file_pusher_statstuple[bool, stats.Summary]c                 C  s   |   }| j }||fS N)is_aliver!   summary)r5   runningrC   r6   r6   r7   
get_statusZ   s   
zFilePusher.get_statusTprefixboolc                 C  s   d}g d}d}	 |   sd}| j }d|jd dd|jd dd	|jd dd
}||d  | }|d7 }tj|d|d |rCnt	d q	|jdkrV|jt
|j nd}|dkrgtjd|d  |d tjd|d d S )Nr   )-\|/FT g      0Az.2fzMB of zMB uploaded (zMB deduped)   r9   )newlinerF   g      ?g{Gz?z5W&B sync reduced upload amount by %.1f%%             d   )rF   zO                                                                               )rB   r!   rC   uploaded_bytestotal_bytesdeduped_bytesr   termlogr=   r>   float)r5   rF   stepspinner_statesstoprC   linededupe_fractionr6   r6   r7   print_status_   s6   
0

zFilePusher.print_statusstats.FileCountsByCategoryc                 C  s
   | j  S rA   )r!   file_counts_by_categoryr?   r6   r6   r7   r\   |   s   
z"FilePusher.file_counts_by_category	save_namer   pathstrcopyc                 C  sN   t j|rt j|sdS t j|dkrdS t|||}| j| dS )a  Tell the file pusher that a file's changed and should be uploaded.

        Args:
            save_name: string logical location of the file relative to the run
                directory.
            path: actual string path of the file to upload on the filesystem.
        Nr   )	r/   r^   existsisfilegetsizer   RequestUploadr$   put)r5   r]   r^   r`   eventr6   r6   r7   file_changed   s   	zFilePusher.file_changedmanifestr   artifact_idsave_fnr	   c                 C  s   t |||}| j| d S rA   )r   RequestStoreManifestFilesr$   re   )r5   rh   ri   rj   rf   r6   r6   r7   store_manifest_files   s   zFilePusher.store_manifest_files)finalizerm   before_commitstep_upload.PreCommitFnresult_futureconcurrent.futures.Future[None]c                C  s    t ||||}| j| d S rA   )r   RequestCommitArtifactr$   re   )r5   ri   rm   rn   rp   rf   r6   r6   r7   commit_artifact   s   zFilePusher.commit_artifactcallback$step_upload.OnRequestFinishFn | Nonec                 C  s*   t d | jt| | j  d S )Nzshutting down file pusher)r;   r<   r$   re   r   RequestFinishr.   set)r5   rt   r6   r6   r7   finish   s   
zFilePusher.finishc                 C  s2   t d |  rtd |  s	| j  d S )Nzwaiting for file pusherg      ?)r;   r<   rB   r=   r>   r   cleanupr?   r6   r6   r7   join   s
   

zFilePusher.joinc                 C  s   | j  p	| j S rA   )r'   rB   r+   r?   r6   r6   r7   rB      s   zFilePusher.is_aliverA   )r   r   r
   r   r   r   r   r   )r   r   )r   r@   )T)rF   rG   r   r   )r   r[   )r]   r   r^   r_   r`   rG   )rh   r   ri   r_   rj   r	   r   r   )ri   r_   rm   rG   rn   ro   rp   rq   )rt   ru   )r   rG   )__name__
__module____qualname____doc__r*   r8   r3   rE   rZ   r\   rg   rl   rs   rx   rz   rB   r6   r6   r6   r7   r      s     
/



r   )!
__future__r   concurrent.futures
concurrentloggingr/   r"   r   r,   r=   typingr   r   
wandb.utilwandb.filesyncr   r   r   wandb.sdk.lib.pathsr   %wandb.sdk.artifacts.artifact_manifestr   "wandb.sdk.artifacts.artifact_saverr	   wandb.sdk.internalr
   r   "wandb.sdk.internal.settings_staticr   	getLoggerr{   r;   r   r6   r6   r6   r7   <module>   s(    
