o
    xi
c                     @  sv  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlmZmZmZmZ erXd dlmZ G dd deZG dd	 d	eZd dlZd dlZd d
lmZ d dlmZ d dlmZ ddlmZ ee Z!G dd deZ"G dd dZ#G dd de#Z$G dd de#Z%G dd dZ&G dd de#Z'G dd dZ(dZ)d'd%d&Z*dS )(    )annotationsN)TracebackType)TYPE_CHECKINGAnyCallable
NamedTuple)	TypedDictc                   @  s   e Zd ZU ded< ded< dS )ProcessedChunkintoffsetz	list[str]contentN__name__
__module____qualname____annotations__ r   r   R/home/ubuntu/.local/lib/python3.10/site-packages/wandb/sdk/internal/file_stream.pyr	         
 r	   c                   @  s&   e Zd ZU ded< ded< ded< dS )ProcessedBinaryChunkr
   r   strr   encodingNr   r   r   r   r   r      s   
 r   )util)
get_sentry)internal_api   )file_stream_utilsc                   @     e Zd ZU ded< ded< dS )Chunkr   filenamedataNr   r   r   r   r   r   )   r   r   c                   @  s,   e Zd ZddddZdddZdddZdS )DefaultFilePolicyr   start_chunk_idr
   returnNonec                 C  s   || _ d| _d S NF)	_chunk_idhas_debug_logselfr"   r   r   r   __init__/   s   
zDefaultFilePolicy.__init__chunkslist[Chunk]Cbool | ProcessedChunk | ProcessedBinaryChunk | list[ProcessedChunk]c                 C  s,   | j }|  j t|7  _ |dd |D dS )Nc                 S  s   g | ]}|j qS r   )r    ).0cr   r   r   
<listcomp>8   s    z4DefaultFilePolicy.process_chunks.<locals>.<listcomp>r   r   )r&   len)r)   r+   chunk_idr   r   r   process_chunks3   s   z DefaultFilePolicy.process_chunksr    r   c                 C  sv   | j s	tjdsd S t|}t|tsd S dd | D }dd |D }t	j
d|d  d| dd	 d
| _ d S )NWANDB_DEBUG_FILESTREAM_LOGc                 S  s"   g | ]\}}|t t|fqS r   )r2   jsondumpsr.   kvr   r   r   r0   D   s   " z0DefaultFilePolicy._debug_log.<locals>.<listcomp>c                 S  s&   g | ]\}}| d |d ddqS )z: i   z.5fz MBr   r8   r   r   r   r0   E   s   & zStep: _stepz | FrepeatT)r'   osenvirongetr6   loads
isinstancedictitemswandb	termerror)r)   r    loaded	key_sizeskey_msgr   r   r   
_debug_log;   s   


zDefaultFilePolicy._debug_logNr   r"   r
   r#   r$   )r+   r,   r#   r-   )r    r   )r   r   r   r*   r4   rJ   r   r   r   r   r!   .   s    
r!   c                   @     e Zd ZdddZdS )	JsonlFilePolicyr+   r,   r#   r	   c                 C  s   | j }|  j t|7  _ g }|D ]9}t|jtjkrCdttj dtt|j d}tj|dd t j	|dd | 
|j q||j q||dS )Nz$Metric data exceeds maximum size of z ()Fr<   r1   )r&   r2   r    r   MAX_LINE_BYTESto_human_sizerE   rF   r   messagerJ   append)r)   r+   r3   
chunk_datachunkmsgr   r   r   r4   K   s   &zJsonlFilePolicy.process_chunksN)r+   r,   r#   r	   r   r   r   r4   r   r   r   r   rN   J       rN   c                   @  rM   )	SummaryFilePolicyr+   r,   r#   bool | ProcessedChunkc                 C  sd   |d j }t|tjkr,dttj d}tj|dd t j|dd | 	| dS d|gdS )Nz%Summary data exceeds maximum size of z. Dropping it.Fr<   r   r1   )
r    r2   r   rP   rQ   rE   rF   r   rR   rJ   )r)   r+   r    rV   r   r   r   r4   `   s   

z SummaryFilePolicy.process_chunksN)r+   r,   r#   rZ   rW   r   r   r   r   rY   _   rX   rY   c                   @  s4   e Zd ZU dZded< ded< ded< dd	d
ZdS )StreamCRStatea  Stream state that tracks carriage returns.

    There are two streams: stdout and stderr. We create two instances for each stream.
    An instance holds state about:
        found_cr:       if a carriage return has been found in this stream.
        cr:             most recent offset (line number) where we found \r.
                        We update this offset with every progress bar update.
        last_normal:    most recent offset without a \r in this stream.
                        i.e. the most recent "normal" line.
    boolfound_crz
int | Nonecrlast_normalr#   r$   c                 C  s   d| _ d | _d | _d S r%   )r^   r_   r`   r)   r   r   r   r*   {   s   
zStreamCRState.__init__Nr#   r$   )r   r   r   __doc__r   r*   r   r   r   r   r\   k   s   
 r\   c                      sJ   e Zd ZdZdd fddZedddZedddZdddZ  Z	S )CRDedupeFilePolicyaP  File stream policy for removing carriage-return erased characters.

    This is what a terminal does. We use it for console output to reduce the amount of
    data we need to send over the network (eg. for progress bars), while preserving the
    output's appearance in the web app.

    CR stands for "carriage return", for the character \r. It tells the terminal to move
    the cursor back to the start of the current line. Progress bars (like tqdm) use \r
    repeatedly to overwrite a line with newer updates. This gives the illusion of the
    progress bar filling up in real-time.
    r   r"   r
   r#   r$   c                   s.   t  j|d d | _d| _t | _t | _d S )N)r"   r   )superr*   _prev_chunkglobal_offsetr\   stderrstdoutr(   	__class__r   r   r*      s
   zCRDedupeFilePolicy.__init__consoledict[int, str]list[list[int]]c                 C  sv   t t|  }g }t|D ]*\}}|dkr|||g q|d d }||d kr1||d d< q|||g q|S )a  Compress consecutive line numbers into an interval.

        Args:
            console: Dict[int, str] which maps offsets (line numbers) to lines of text.
            It represents a mini version of our console dashboard on the UI.

        Returns:
            A list of intervals (we compress consecutive line numbers into an interval).

        Example:
            >>> console = {2: "", 3: "", 4: "", 5: "", 10: "", 11: "", 20: ""}
            >>> get_consecutive_offsets(console)
            [(2, 5), (10, 11), (20, 20)]
        r   r[      )sortedlistkeys	enumeraterS   )rl   offsets	intervalsinumlargestr   r   r   get_consecutive_offsets   s   z*CRDedupeFilePolicy.get_consecutive_offsetsrU   r   tuple[str, str]c                 C  sN   d}| j dd\}}|dkr||d 7 }|dd\}}||d 7 }||fS )aL  Split chunks.

        Args:
            chunk: object with two fields: filename (str) & data (str)
            `chunk.data` is a str containing the lines we want. It usually contains \n or \r or both.
            `chunk.data` has two possible formats (for the two streams - stdout and stderr):
                - "2020-08-25T20:38:36.895321 this is my line of text\nsecond line\n"
                - "ERROR 2020-08-25T20:38:36.895321 this is my line of text\nsecond line\nthird\n".

                Here's another example with a carriage return \r.
                - "ERROR 2020-08-25T20:38:36.895321 \r progress bar\n"

        Returns:
            A 2-tuple of strings.
            First str is prefix, either "ERROR {timestamp} " or "{timestamp} ".
            Second str is the rest of the string.

        Example:
            >>> chunk = Chunk(
            ...     filename="output.log",
            ...     data="ERROR 2020-08-25T20:38 this is my line of text\n",
            ... )
            >>> split_chunk(chunk)
            ("ERROR 2020-08-25T20:38 ", "this is my line of text\n")
          ro   ERROR)r    split)rU   prefixtokenrestr   r   r   split_chunk   s   zCRDedupeFilePolicy.split_chunkr+   r,   list[ProcessedChunk]c                   sD  i  t j}|D ]q}| |\}}||}|D ]`}|dr!| jn| j}|dra|jr4|jdur4|jn|j	p8d}	|	|_d|_||dd  d  |	< |
|dkr`||d
ddkr`d	|_q|rw|| d  | j< | j|_	|  jd7  _qq|  }
g }|
D ]\}}| j|  fd
dt||d D d}|| q|S )a  Process chunks.

        Args:
            chunks: List of Chunk objects. See description of chunk above in `split_chunk(...)`.

        Returns:
            List[Dict]. Each dict in the list contains two keys: an `offset` which holds the line number
            and `content` which maps to a list of consecutive lines starting from that offset.
            `offset` here means global line number in our console on the UI.

        Example:
            >>> chunks = [
                Chunk("output.log", "ERROR 2020-08-25T20:38 this is my line of text\nboom\n"),
                Chunk("output.log", "2020-08-25T20:38 this is test\n"),
            ]
            >>> process_chunks(chunks)
            [
                {"offset": 0, "content": [
                    "ERROR 2020-08-25T20:38 this is my line of text\n",
                    "ERROR 2020-08-25T20:38 boom\n",
                    "2020-08-25T20:38 this is test\n"
                    ]
                }
            ]
        zERROR Nr   Tro   
r{   Fc                   s   g | ]} | qS r   r   )r.   rv   rl   r   r   r0     s    z5CRDedupeFilePolicy.process_chunks.<locals>.<listcomp>r1   )r>   linesepr   r~   
startswithrh   ri   r^   r_   r`   countreplacerg   ry   r&   rangerS   )r)   r+   sepr/   r   logs_strlogslinestreamr   ru   retabprocessed_chunkr   r   r   r4      sF   



z!CRDedupeFilePolicy.process_chunksrK   rL   )rl   rm   r#   rn   )rU   r   r#   rz   )r+   r,   r#   r   )
r   r   r   rc   r*   staticmethodry   r   r4   __classcell__r   r   rj   r   rd      s    	"rd   c                   @  s   e Zd ZdZG dd deZG dd deZG dd deZdZ			
dJdKddZ	dLddZ
dLddZdMdd ZdMd!d"ZedNd$d%ZdNd&d'ZdOd)d*ZdLd+d,ZdLd-d.ZdPd1d2ZdQdRd8d9ZdSd;d<ZdLd=d>ZdTd@dAZdUdDdEZdVdHdIZd
S )WFileStreamApia  Pushes chunks of files to our streaming endpoint.

    This class is used as a singleton. It has a thread that serializes access to
    the streaming endpoint and performs rate-limiting and batching.

    TODO: Differentiate between binary/text encoding.
    c                   @  s   e Zd ZU ded< dS )zFileStreamApi.Finishr
   exitcodeNr   r   r   r   r   Finish)  s   
 r   c                   @  s   e Zd ZdS )zFileStreamApi.PreemptingN)r   r   r   r   r   r   r   
Preempting,  s    r   c                   @  r   )zFileStreamApi.PushSuccessr   artifact_id	save_nameNr   r   r   r   r   PushSuccess/  r   r   i'  r   Napiinternal_api.Apirun_idr   
start_timefloattimeoutsettingsdict | Noner#   r$   c                 C  s   |pt  }d | _|| _|| _|| _|| _t | _|pd}|dkr,t	j
| jj|d| j_|jjjj| j_| jj|jjjp>i  | jj|jjjpJi  | jj|jjjjpWi  i | _d| _t | _tj| jd| _d| j_d| j_|   d S )Nr   )r   )targetFileStreamThreadT) rC   	_exc_info	_settings_api_run_id_start_timerequestsSession_client	functoolspartialpostclient	transportsessionauthheadersupdatecookiesproxies_file_policies_dropped_chunksqueueQueue_queue	threadingThread_thread_except_body_threadnamedaemon_init_endpoint)r)   r   r   r   r   r   r   r   r   r*   5  s,   


zFileStreamApi.__init__c                 C  s<   | j  }|| j dj|d |d |d | jd| _d S )Nz1{base}/files/{entity}/{project}/{run}/file_streambase_urlentityproject)baser   r   run)r   r   r   r   formatr   	_endpoint)r)   r   r   r   r   r   Z  s   
zFileStreamApi._init_endpointc                 C  s   |    | j  d S N)r   r   startra   r   r   r   r   d  s   zFileStreamApi.startr   file_policyr!   c                 C  s   || j vr|| j |< dS dS )z@Set an upload policy for a file unless one has already been set.Nr   r)   r   r   r   r   r   set_default_file_policyh  s   
z%FileStreamApi.set_default_file_policyc                 C  s   || j |< d S r   r   r   r   r   r   set_file_policyo  s   zFileStreamApi.set_file_policyint | floatc                 C  s   | j jd }|S )Nheartbeat_seconds)r   dynamic_settings)r)   r   r   r   r   r   r  s   zFileStreamApi.heartbeat_secondsc                 C  sJ   t   | j }|dk rtd| jd S |dk rtd| jd S td| jS )N<   g      ?   ,  g      @   g      @)timer   maxr   )r)   run_timer   r   r   rate_limit_secondsx  s   z FileStreamApi.rate_limit_secondsrq   c                 C  s   t | j| j|  S r   )r   read_many_from_queuer   MAX_ITEMS_PER_PUSHr   ra   r   r   r   _read_queue  s   
zFileStreamApi._read_queuec           
   	   C  sb  t   }t   }g }t }d }|d u r|  }|D ]9}t|| jr$|}qt|| jr@t| jj| j	dd| j
t|dd t }qt|| jrM||j q|| qt   }|ru|sc|| |  kru|}|}| j||d}	g }|	rut }|| | jkr|}tt| jj| j	dd| j
t|ddtst }|d u st| jj| j	dt|j| j
t|dd d S )NFT)complete
preemptingdroppeduploadedr6   )r   r   failedr   r   )r   r   r   r   )r   setr   rB   r   r   request_with_retryr   r   r   r   rq   r   addr   rS   r   _sendr   	Exceptionr
   r   )
r)   posted_data_timeposted_anything_timeready_chunksr   finishedrD   itemcur_timesuccessr   r   r   _thread_body  sx   

<
zFileStreamApi._thread_bodyc                 C  sD   z|    W d S  ty!   t }|| _td t |  w )Nz&generic exception in filestream thread)r   r   sysexc_infor   logger	exceptionr   )r)   r   r   r   r   r     s   
z!FileStreamApi._thread_except_bodyresponseException | requests.Responsec                 C  s   t |trtd td|  |  jd7  _dS d}z| }W n	 ty,   Y nw t |trE|	d}t |trG| j
j| dS dS dS )z0Log dropped chunks and updates dynamic settings.z;Dropped streaming file chunk (see wandb/debug-internal.log)zdropped chunk ro   Nlimits)rB   r   rE   rF   r   r   r   r6   rC   r@   r   r   r   )r)   r   parsedr   r   r   r   _handle_response  s$   



zFileStreamApi._handle_responser+   r,   r   set[str] | Noner]   c           	   	   C  s   t |pg }i }|jdd d t|dd D ] \}}t |}| |t  | j| |||< || s8||= qtj	|t
jdD ]}| t| jj| j|| jd| jjd qA|oltt| jj| jdd| j|d	d
t S )Nc                 S     | j S r   r   r/   r   r   r   <lambda>       z%FileStreamApi._send.<locals>.<lambda>)keyc                 S  r  r   r  r  r   r   r   r    r  )	max_bytes)filesr   )r6   retry_callbackFr   r   )rq   sort	itertoolsgroupbyr   r!   r   r4   r   split_filesr   rP   r   r   r   r   r   r   r   r	  rB   r   )	r)   r+   r   uploaded_listr  r   file_chunksfile_chunks_listfsr   r   r   r     sF   



zFileStreamApi._sendpathc                   sT   | dd  t|}|  fdd|D  W d    d S 1 s#w   Y  d S )N/r[   c                   s   g | ]}t  |qS r   )r   )r.   r   r   r   r   r0   )  s    z-FileStreamApi.stream_file.<locals>.<listcomp>)r~   openr   )r)   r  fr   r  r   stream_file&  s   
"zFileStreamApi.stream_filec                 C  s   | j |   d S r   )r   putr   ra   r   r   r   enqueue_preempting+  s   z FileStreamApi.enqueue_preemptingr    c                 C  s   | j t|| dS )zPush a chunk of a file to the streaming endpoint.

        Args:
            filename: Name of file to append to.
            data: Text to append to the file.
        N)r   r  r   )r)   r   r    r   r   r   push.  s   zFileStreamApi.pushr   r   c                 C  s   | j | || dS )zNotification that a file upload has been successfully completed.

        Args:
            artifact_id: ID of artifact
            save_name: saved name of the uploaded file
        N)r   r  r   )r)   r   r   r   r   r   push_success7  s   zFileStreamApi.push_successr   r
   c                 C  sr   t d | j| | | j  t d | jr5t jd| jd | jd dur7| jd 	| jd dS dS )zClean up.

        Anything pushed after finish will be dropped.

        Args:
            exitcode: The exitcode of the watched process.
        zfile stream finish calledzfile stream finish is donezFileStream exception)r   ro   Nr   )
r   infor   r  r   r   joinr   errorwith_traceback)r)   r   r   r   r   finish@  s   


zFileStreamApi.finish)r   N)r   r   r   r   r   r   r   r   r   r   r#   r$   rb   )r   r   r   r!   r#   r$   )r#   r   )r#   rq   )r   r   r#   r$   r   )r+   r,   r   r   r#   r]   )r  r   r#   r$   )r   r   r    r   r#   r$   )r   r   r   r   r#   r$   )r   r
   r#   r$   )r   r   r   rc   r   r   r   r   r   r*   r   r   r   r   propertyr   r   r   r   r   r   r   r  r  r  r  r   r   r   r   r   r      s4    
%





	

M

,


		r   r   funcr   argsr   kwargsr#   -requests.Response | requests.RequestExceptionc                 O  s  | dd}| dd}d}d}	 z| |i |}|  |W S  tjjtjjtjjfy } zzt|tjjrJ|jdurJ|jj	dv rJ|W  Y d}~S ||krV|W  Y d}~S |d	7 }|t

 d
 |  }	t|tjjr|jdur|jj	dkrd|	dd}
|r||jj	|
 t|
 n	td|| || t|	 |d9 }|tkrt}W Y d}~n6d}~w tjjy } z$d}z| d }W n	 ty   Y nw td|  |W  Y d}~S d}~ww q)a  Perform a requests http call, retrying with exponential backoff.

    Args:
        func:        An http-requesting function to call, like requests.post
        max_retries: Maximum retries before giving up.
                     By default, we retry 30 times in ~2 hours before dropping the chunk
        *args:       passed through to func
        **kwargs:    passed through to func
    max_retries   r	  Nr   r   T>           ro   g      ?i  z,Filestream rate limit exceeded, retrying in z.1fz
 seconds. zWrequests_with_retry encountered retryable exception: %s. func: %s, args: %s, kwargs: %szunknown errorr  zrequests_with_retry error: )popraise_for_statusr   
exceptionsConnectionError	HTTPErrorTimeoutrB   r   status_coderandomr   r  warningr   sleepMAX_SLEEP_SECONDSRequestExceptionr6   r   r   )r"  r#  r$  r&  r	  r5  retry_countr   edelayerr_strerror_messager   r   r   r   W  sj   

r   )r"  r   r#  r   r$  r   r#   r%  )+
__future__r   r   r  r6   loggingr>   r   r3  r   r   r   typesr   typingr   r   r   r   r   r	   r   r   rE   r   wandb.analyticsr   wandb.sdk.internalr   libr   	getLoggerr   r   r   r!   rN   rY   r\   rd   r   r6  r   r   r   r   r   <module>   sH    
    6