o
    ni!                     @   s   d Z ddlZddlmZ ddlmZmZmZ ddlm	Z
 ddlmZ ddlmZ er0dd	lmZ eeZd
Zdddee fddZdddefddZdddefddZeG dd dZG dd dZG dd dZG dd deZG dd deZdS )ar  Flow Control.

States:
    FORWARDING
    PAUSING

New messages:
    pb.SenderMarkRequest    writer -> sender (empty message)
    pb.StatusReportRequest  sender -> writer (reports current sender progress)
    pb.SenderReadRequest    writer -> sender (requests read of transaction log)

Thresholds:
    Threshold_High_MaxOutstandingData      - When above this, stop sending requests to sender
    Threshold_Mid_StartSendingReadRequests - When below this, start sending read requests
    Threshold_Low_RestartSendingData       - When below this, start sending normal records

State machine:
    FORWARDING
      -> PAUSED if should_pause
         There is too much work outstanding to the sender thread, after the current request
         lets stop sending data.
    PAUSING
      -> FORWARDING if should_unpause
      -> PAUSING if should_recover
      -> PAUSING if should_quiesce

    N)	dataclass)TYPE_CHECKINGCallableOptional)wandb_internal_pb2)fsm   )SettingsStatic)Recordi   recordr
   returnc                 C   s&   |  d}|dkrd S | j d}|S )Nrecord_typerequestrequest_type)
WhichOneofr   )r   r   r    r   ]/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/wandb/sdk/internal/flow_control.py_get_request_type0   s
   
r   c                 C   s   | j jS N)controlflow_controlr   r   r   r   _is_control_record8      r   c                 C   s   | j jo| j j S r   )r   localr   r   r   r   r   _is_local_non_control_record<   s   r   c                   @   s2   e Zd ZU dZeed< dZeed< dZeed< dS )StateContextr   last_forwarded_offsetlast_sent_offsetlast_written_offsetN)__name__
__module____qualname__r   int__annotations__r   r   r   r   r   r   r   @   s   
 r   c                   @   s   e Zd ZU ejdef ed< 			ddededgdf dedge	f deg df d	ee	e	gdf d
e	de	de	ddfddZ
dddZdddZdS )FlowControlr
   _fsmr   settingsforward_recordNwrite_recordpause_markerrecover_records_threshold_bytes_high_threshold_bytes_mid_threshold_bytes_lowr   c	                 C   s   |dks|dks|dkr|j pt}	|	}|	d }|	d }||  kr&|ks)J  J t|||d}
t||||d}tj|
|gtt|
jt|
jgtt|j	t|j
t|jt|jt|jt|jgid| _d S )Nr         )r(   r*   threshold_pause)r(   r+   threshold_recoverthreshold_forward)statestable)x_network_bufferDEFAULT_THRESHOLDStateForwardingStatePausingr   FsmWithContextFsmEntry_should_pause_pause_should_unpause_unpause_should_recover_recover_should_quiesce_quiescer&   )selfr'   r(   r)   r*   r+   r,   r-   r.   	thresholdstate_forwardingstate_pausingr   r   r   __init__J   s^   
zFlowControl.__init__c                 C   s   d S r   r   rD   r   r   r   flush   s   zFlowControl.flushr   c                 C   s   | j | d S r   )r&   inputrD   r   r   r   r   flow      zFlowControl.flow)r   r   r   r   Nr   r
   r   N)r    r!   r"   r   r:   r   r$   r	   r   r#   rH   rJ   rM   r   r   r   r   r%   G   s4   
 	
	


Ar%   c                   @   s   e Zd ZU eed< dddZddd	Zdd
dZdddZdddZ	dddefddZ
dddeddfddZedefddZdS )StateShared_contextr   Nc                 C   s   t  | _d S r   )r   rR   rI   r   r   r   rH         zStateShared.__init__r   r
   c                 C   s   |j j}|r|| j_d S d S r   )r   
end_offsetrR   r   )rD   r   rT   r   r   r   _update_written_offset   s   z"StateShared._update_written_offsetc                 C   s   | j j| j _d S r   )rR   r   r   rI   r   r   r   _update_forwarded_offset   rN   z$StateShared._update_forwarded_offsetc                 C   s:   t |}|sd S d| }t| |d }|sd S || d S )N	_process_)r   getattr)rD   r   r   process_strprocess_handlerr   r   r   _process   s   
zStateShared._processc                 C   s   |j jj}|| j_d S r   )r   status_reportsent_offsetrR   r   )rD   r   r]   r   r   r   _process_status_report   s   
z"StateShared._process_status_reportc                 C   s   | j S r   rR   rL   r   r   r   on_exit   s   zStateShared.on_exitcontextc                 C   s
   || _ d S r   r_   )rD   r   ra   r   r   r   on_enter   s   
zStateShared.on_enterc                 C   s   | j j| j j S r   )rR   r   r   rI   r   r   r   _behind_bytes   s   zStateShared._behind_bytesrO   rP   )r    r!   r"   r   r$   rH   rU   rV   r[   r^   r`   rb   propertyr#   rc   r   r   r   r   rQ      s   
 




rQ   c                       s   e Zd ZU edgdf ed< eg df ed< eed< dedgdf deg df ded	df fd
dZddd	efddZdddZ	dddZ
  ZS )r8   r
   N_forward_record_pause_marker_threshold_pauser(   r*   r1   r   c                    s    t    || _|| _|| _d S r   )superrH   re   rf   rg   )rD   r(   r*   r1   	__class__r   r   rH      s   

zStateForwarding.__init__r   c                 C   s   | j | jkS r   )rc   rg   rL   r   r   r   r<      rS   zStateForwarding._should_pausec                 C   s   |    d S r   )rf   rL   r   r   r   r=      rS   zStateForwarding._pausec                 C   s2   |  | | | t|s| | |   d S r   )rU   r[   r   re   rV   rL   r   r   r   on_check   s
   


zStateForwarding.on_checkrP   )r    r!   r"   r   r$   r#   rH   boolr<   r=   rk   __classcell__r   r   ri   r   r8      s    
 

r8   c                
       s   e Zd ZU edgdf ed< eeegdf ed< eed< eed< dedgdf deeegdf d	ed
eddf
 fddZdddefddZdddZ	dddefddZ
dddZdddefddZdddZdddZ  ZS )r9   r
   Nre   _recover_records_threshold_recover_threshold_forwardr(   r+   r2   r3   r   c                    s&   t    || _|| _|| _|| _d S r   )rh   rH   re   rn   ro   rp   )rD   r(   r+   r2   r3   ri   r   r   rH      s
   

zStatePausing.__init__r   c                 C      | j | jk S r   )rc   rp   rL   r   r   r   r>      rS   zStatePausing._should_unpausec                 C      |  | d S r   rC   rL   r   r   r   r?         zStatePausing._unpausec                 C   rq   r   )rc   ro   rL   r   r   r   r@      rS   zStatePausing._should_recoverc                 C   rr   r   rs   rL   r   r   r   rA      rt   zStatePausing._recoverc                 C   s   t |S r   )r   rL   r   r   r   rB      r   zStatePausing._should_quiescec                 C   sB   | j j}| j j}||kr| || t|r| | |   d S r   )rR   r   r   rn   r   re   rV   )rD   r   startendr   r   r   rC      s   
zStatePausing._quiescec                 C   s   |  | | | d S r   )rU   r[   rL   r   r   r   rk     s   
zStatePausing.on_checkrP   )r    r!   r"   r   r$   r#   rH   rl   r>   r?   r@   rA   rB   rC   rk   rm   r   r   ri   r   r9      s.   
 


	r9   )__doc__loggingdataclassesr   typingr   r   r   wandb.protor   pbwandb.sdk.libr   settings_staticr	   wandb.proto.wandb_internal_pb2r
   	getLoggerr    loggerr7   strr   rl   r   r   r   r%   rQ   r8   r9   r   r   r   r   <module>   s(    
L)