o
    xi'                     @  sf  d Z ddlmZ ddlZddlZddlZddlZddlZddl	m
Z
mZmZ ddlmZmZmZmZ ddlmZ ddlmZ ddlmZ erjdd	lmZ dd
lmZ ddlmZmZmZ ddlmZ G dd deZ eg df Z!eg df Z"edge#f Z$e%e&Z'G dd deZ(G dd deZ)G dd deZ*G dd deZ+ee(e)e*e+f Z,G dd dZ-dS )z*Batching file prepare requests to our API.    )annotationsN)MutableMappingMutableSequence
MutableSet)TYPE_CHECKINGCallable
NamedTupleUnion)	termerror)
upload_job)LogicalPath)	TypedDict)stats)file_streaminternal_apiprogress)SettingsStaticc                   @  s6   e Zd ZU ded< ded< ded< ded< ded	< d
S )ArtifactStatusboolfinalizeintpending_countcommit_requestedzMutableSet[PreCommitFn]pre_commit_callbacksz+MutableSet[concurrent.futures.Future[None]]result_futuresN__name__
__module____qualname____annotations__ r    r    N/home/ubuntu/.local/lib/python3.10/site-packages/wandb/filesync/step_upload.pyr      s   
 r   zprogress.ProgressFnc                   @  sF   e Zd ZU ded< ded< ded< ded< ded	< d
ed< ded< dS )RequestUploadstrpathr   	save_namez
str | Noneartifact_idmd5r   copiedzSaveFn | Nonesave_fndigestNr   r    r    r    r!   r"   '   s   
 r"   c                   @  s.   e Zd ZU ded< ded< ded< ded< d	S )
RequestCommitArtifactr#   r&   r   r   PreCommitFnbefore_commitzconcurrent.futures.Future[None]result_futureNr   r    r    r    r!   r+   1   s
   
 r+   c                   @  s   e Zd ZU ded< dS )RequestFinishzOnRequestFinishFn | NonecallbackNr   r    r    r    r!   r/   8   s   
 r/   c                   @  s   e Zd ZU ded< ded< dS )EventJobDoner"   jobzBaseException | NoneexcNr   r    r    r    r!   r1   <   s   
 r1   c                   @  s   e Zd Z	d0d1ddZd2ddZd3ddZd4ddZd4ddZd4ddZd5d!d"Z	d5d#d$Z
d6d'd(Zd5d)d*Zd2d+d,Zd7d.d/ZdS )8
StepUploadNapiinternal_api.Apir   stats.Statsevent_queuequeue.Queue[Event]max_threadsr   r   file_stream.FileStreamApisettingsSettingsStatic | NonereturnNonec                 C  sr   || _ || _|| _|| _tj| jd| _d| j_t	j
jd|d| _i | _g | _i | _|r4t|j| _d S d| _d S )N)targetTzwandb-upload)thread_name_prefixmax_workersF)_api_stats_event_queue_file_stream	threadingThread_thread_body_threaddaemon
concurrentfuturesThreadPoolExecutor_pool_running_jobs_pending_jobs
_artifactsr   silent)selfr5   r   r8   r:   r   r<   r    r    r!   __init__E   s   	zStepUpload.__init__c                 C  s   d }	 | j  }t|tr|j}n| | q	 z	| j dd}W n tjy-   d }Y nw |r6| | n| jsG| j	j
dd |rE|  d S q)NTg?F)wait)rE   get
isinstancer/   r0   _handle_eventqueueEmptyrP   rO   shutdown)rT   finish_callbackeventr    r    r!   rI   c   s,   


zStepUpload._thread_bodyr^   Eventc                 C  s  t |trZ|j}|jd urtjd|j|jd |jrA|jd u r2| j|j d  d8  < | 	|j n| j
s9td | |j|j | j|j | jrX| jd}| | d S d S t |tr|j| jvrk| |j d| j|j d< |j| j|j d	< | j|j d
 |j | j|j d |j | 	|j d S t |tr|jd ur|j| jvr| |j | j|j d  d7  < | | d S td|)NzFailed to upload file: %s)exc_infor      z<Uploading artifact file failed. Artifact won't be committed.r   Tr   r   r   r   zEvent has unexpected type: )rX   r1   r2   r3   logger	exceptionr$   r&   rR   _maybe_commit_artifactrS   r
   _fail_artifact_futuresrP   popr%   rQ   _start_upload_jobr+   _init_artifactr   addr-   r.   r"   	TypeErrorrT   r^   r2   r    r    r!   rY      sN   






zStepUpload._handle_eventr"   c                 C  s*   |j | jv r| j| d S | | d S N)r%   rP   rQ   append_spawn_upload)rT   r^   r    r    r!   rg      s   zStepUpload._start_upload_jobc                   s,    j  j< d fdd}j| dS )a}  Spawn an upload job, and handles the bookkeeping of `self._running_jobs`.

        Context: it's important that, whenever we add an entry to `self._running_jobs`,
        we ensure that a corresponding `EventJobDone` message will eventually get handled;
        otherwise, the `_running_jobs` entry will never get removed, and the StepUpload
        will never shut down.

        The sole purpose of this function is to make sure that the code that adds an entry
        to `self._running_jobs` is textually right next to the code that eventually enqueues
        the `EventJobDone` message. This should help keep them in sync.
        r>   r?   c                     sL   z   W jt t d d d S jt t d d w )Nra   )r3   )
_do_uploadrE   putr1   sysr`   r    r^   rT   r    r!   run_and_notify   s   >z0StepUpload._spawn_upload.<locals>.run_and_notifyNr>   r?   )rP   r%   rO   submit)rT   r^   rs   r    rr   r!   rn      s   zStepUpload._spawn_uploadc                 C  s@   t | j| j| j| j|j|j|j|j	|j
|j|j}|  d S rl   )r   	UploadJobrD   rC   rF   rS   r%   r$   r&   r'   r(   r)   r*   runrk   r    r    r!   ro      s   zStepUpload._do_uploadr&   r#   c                 C  s   dddt  t  d| j|< d S )NFr   )r   r   r   r   r   )setrR   )rT   r&   r    r    r!   rh      s   zStepUpload._init_artifactc              
   C  s   | j | }|d dkrT|d rVz|d D ]}|  q|d r$| j| W n' tyL } ztd| d tt| | || W Y d }~d S d }~ww | | d S d S d S )Nr   r   r   r   r   z%Committing artifact failed. Artifact z won't be finalized.)rR   rC   commit_artifact	Exceptionr
   r#   re   _resolve_artifact_futures)rT   r&   artifact_statuspre_callbackr3   r    r    r!   rd      s(   

z!StepUpload._maybe_commit_artifactr3   BaseExceptionc                 C  s.   | j | d }|D ]}|| q	|  d S Nr   )rR   set_exceptionclear)rT   r&   r3   rM   r.   r    r    r!   re   	     z!StepUpload._fail_artifact_futuresc                 C  s.   | j | d }|D ]}|d  q	|  d S r   )rR   
set_resultr   )rT   r&   rM   r.   r    r    r!   r{     r   z$StepUpload._resolve_artifact_futuresc                 C  s   | j   d S rl   )rJ   startrT   r    r    r!   r     s   zStepUpload.startr   c                 C  s
   | j  S rl   )rJ   is_aliver   r    r    r!   r     s   
zStepUpload.is_aliverl   )r5   r6   r   r7   r8   r9   r:   r   r   r;   r<   r=   r>   r?   rt   )r^   r_   r>   r?   )r^   r"   r>   r?   )r&   r#   r>   r?   )r&   r#   r3   r~   r>   r?   )r>   r   )r   r   r   rU   rI   rY   rg   rn   ro   rh   rd   re   r{   r   r   r    r    r    r!   r4   D   s    


-


#

	


r4   ).__doc__
__future__r   concurrent.futuresrL   loggingrZ   rq   rG   collections.abcr   r   r   typingr   r   r   r	   wandb.errors.termr
   wandb.filesyncr   wandb.sdk.lib.pathsr   r   r   wandb.sdk.internalr   r   r   "wandb.sdk.internal.settings_staticr   r   r,   OnRequestFinishFnr   SaveFn	getLoggerr   rb   r"   r+   r/   r1   r_   r4   r    r    r    r!   <module>   s8    

