o
    xiK                     @  s   d Z ddlmZ ddlZddlZddlZddlmZmZ ddl	m
Z
mZmZmZ e
r4ddlmZmZmZ G dd deZG d	d
 d
eZG dd deZeeef Zd%ddZejfd&ddZd'd!d"ZG d#d$ d$ZdS )(z*Batching file prepare requests to our API.    )annotationsN)MappingSequence)TYPE_CHECKINGCallable
NamedTupleUnion)ApiCreateArtifactFileSpecInputCreateArtifactFilesResponseFilec                   @  s   e Zd ZU ded< ded< dS )RequestPreparer
   	file_specqueue.Queue[ResponsePrepare]response_channelN__name__
__module____qualname____annotations__ r   r   O/home/ubuntu/.local/lib/python3.10/site-packages/wandb/filesync/step_prepare.pyr      s   
 r   c                   @  s   e Zd ZdS )RequestFinishN)r   r   r   r   r   r   r   r      s    r   c                   @  s>   e Zd ZU ded< ded< ded< ded< ded< d	ed
< dS )ResponsePreparestrbirth_artifact_idz
str | None
upload_urlzSequence[str]upload_headers	upload_idstorage_pathzdict[int, str] | Nonemultipart_upload_urlsNr   r   r   r   r   r      s   
 r   xfloatlowhighreturnc                 C  s   t |t| |S N)maxminr    r"   r#   r   r   r   _clamp)   s   r)   request_queuequeue.Queue[Request]
batch_timeinter_event_timemax_batch_sizeintclockCallable[[], float]%tuple[bool, Sequence[RequestPrepare]]c           
      C  s   | }|}|   }t|trdg fS |g}|dkrZt||k rZz#| j t|d|dd}	t|	tr5d|fW S ||	 || |  }W n tjyO   Y d|fS w |dkrZt||k sd|fS )NTr   g-q=r(   )timeoutF)get
isinstancer   lenr)   appendqueueEmpty)
r*   r,   r-   r.   r0   batch_start_timeremaining_timefirst_requestbatchrequestr   r   r   gather_batch-   s2   



r?   responser   c                 C  s`   |  d}|r|d ng }dd |D pd }t| d d | d | d |o(| d	|  d
|dS )NuploadMultipartUrlsuploadUrlPartsc                 S  s   i | ]	}|d  |d qS )
partNumber	uploadUrlr   ).0ur   r   r   
<dictcomp>U   s    z$prepare_response.<locals>.<dictcomp>artifactidrD   uploadHeadersuploadIDstoragePath)r   r   r   r   r   r   )r4   r   )r@   multipart_resp	part_listmultipart_partsr   r   r   prepare_responseR   s   

rP   c                   @  sd   e Zd ZdZ	d%d&ddZd'ddZd(ddZd)ddZd'ddZd'ddZ	d*d!d"Z
d'd#d$ZdS )+StepPreparezA thread that batches requests to our file prepare API.

    Any number of threads may call prepare() in parallel. The PrepareBatcher thread
    will batch requests up and send them all to the backend at once.
    Napir	   r,   r!   r-   r.   r/   r*   queue.Queue[Request] | Noner$   Nonec                 C  sB   || _ || _|| _|| _|pt | _tj| j	d| _
d| j
_d S )N)targetT)_api_inter_event_time_batch_time_max_batch_sizer8   Queue_request_queue	threadingThread_thread_body_threaddaemon)selfrR   r,   r-   r.   r*   r   r   r   __init__h   s   zStepPrepare.__init__c                 C  sf   	 t | j| j| j| jd\}}|r.| |}|D ]}|jd }|| }t|}|j	| q|r2d S q)NT)r*   r,   r-   r.   name)
r?   r[   rX   rW   rY   _prepare_batchr   rP   r   put)ra   finishr=   batch_responseprepare_requestrc   response_filer@   r   r   r   r^   x   s"   


zStepPrepare._thread_bodyr=   Sequence[RequestPrepare]-Mapping[str, CreateArtifactFilesResponseFile]c                 C  s   | j dd |D S )ax  Execute the prepareFiles API call.

        Args:
            batch: List of RequestPrepare objects
        Returns:
            dict of (save_name: ResponseFile) pairs where ResponseFile is a dict with
                an uploadUrl key. The value of the uploadUrl key is None if the file
                already exists, or a url string if the file should be uploaded.
        c                 S  s   g | ]}|j qS r   )r   )rE   reqr   r   r   
<listcomp>   s    z.StepPrepare._prepare_batch.<locals>.<listcomp>)rV   create_artifact_files)ra   r=   r   r   r   rd      s   zStepPrepare._prepare_batchr   r
   r   c                 C  s   t  }| jt|| |S r%   )r8   rZ   r[   re   r   )ra   r   response_queuer   r   r   prepare   s   zStepPrepare.preparec                 C  s   | j   d S r%   )r_   startra   r   r   r   rq      s   zStepPrepare.startc                 C  s   | j t  d S r%   )r[   re   r   rr   r   r   r   rf      s   zStepPrepare.finishboolc                 C  s
   | j  S r%   )r_   is_aliverr   r   r   r   rt      s   
zStepPrepare.is_alivec                 C  s   |    | j  d S r%   )rf   r_   joinrr   r   r   r   shutdown   s   zStepPrepare.shutdownr%   )rR   r	   r,   r!   r-   r!   r.   r/   r*   rS   r$   rT   )r$   rT   )r=   rj   r$   rk   )r   r
   r$   r   )r$   rs   )r   r   r   __doc__rb   r^   rd   rp   rq   rf   rt   rv   r   r   r   r   rQ   a   s    





rQ   )r    r!   r"   r!   r#   r!   r$   r!   )r*   r+   r,   r!   r-   r!   r.   r/   r0   r1   r$   r2   )r@   r   r$   r   )rw   
__future__r   r8   r\   timecollections.abcr   r   typingr   r   r   r   wandb.sdk.internal.internal_apir	   r
   r   r   r   r   Requestr)   	monotonicr?   rP   rQ   r   r   r   r   <module>   s$    	
	
%