o
    ni:9                     @   s  d Z ddlZddlZddlZddlZddlZddlmZ ddlmZm	Z	m
Z
mZmZ ddlZddlZddlZddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZmZ dd
lmZ ddl m!Z! G dd dej"Z#G dd dZ$G dd dZ%G dd dZ&dS )aB  streams: class that manages internal threads for each run.

StreamThread: Thread that runs internal.wandb_internal()
StreamRecord: All the external state for the internal thread (queues, etc)
StreamAction: Lightweight record for stream ops for thread safety
StreamMux: Container for dictionary of stream threads per runid
    N)Event)AnyCallableDictListOptional)wandb_internal_pb2)SettingsStatic)printer)progress)MailboxMailboxProbeMailboxProgressMailboxProgressAll)Run   )InterfaceRelayc                   @   s8   e Zd ZdZdedeeef ddfddZd
dd	Z	dS )StreamThreadz.Class to running internal process as a thread.targetkwargsreturnNc                 C   s(   t j|  d| _|| _|| _d| _d S )N	StreamThrT)	threadingThread__init__name_target_kwargsdaemon)selfr   r    r    W/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/wandb/sdk/service/streams.pyr   )   s
   
zStreamThread.__init__c                 C   s   | j di | j d S )Nr    )r   r   r   r    r    r!   run0   s   zStreamThread.runr   N)
__name__
__module____qualname____doc__r   r   strr   r   r#   r    r    r    r!   r   &   s    r   c                   @   s   e Zd ZU ded< ded< ded< eed< eed< eed< eed	< d
ededdfddZ	deddfddZ
dddZdddZdddZedefddZdddZd
eddfddZdS ) StreamRecordzqueue.Queue[pb.Record]	_record_qzqueue.Queue[pb.Result]	_result_q_relay_q_iface_thread	_settings_startedsettingsmailboxr   Nc                 C   sZ   d| _ || _t | _t | _t | _t }t	| j| j| j|d| jd| _
|| _d S )NF)record_qresult_qrelay_qprocessprocess_checkr3   )r1   _mailboxqueueQueuer+   r,   r-   multiprocessingcurrent_processr   r.   r0   )r   r2   r3   r7   r    r    r!   r   >   s   



zStreamRecord.__init__threadc                 C   s   || _ |  |   d S N)r/   start_wait_thread_active)r   r>   r    r    r!   start_threadO   s   zStreamRecord.start_threadc                 C   s   | j  jdd d S )Ntimeout)r.   deliver_statuswaitr"   r    r    r!   rA   T      z StreamRecord._wait_thread_activec                 C   s"   | j   | jr| j  d S d S r?   )r.   joinr/   r"   r    r    r!   rI   W   s   
zStreamRecord.joinc                 C   s   d| j _d S NT)r.   _dropr"   r    r    r!   drop\   s   zStreamRecord.dropc                 C      | j S r?   )r.   r"   r    r    r!   	interface_      zStreamRecord.interfacec                 C   s
   d| _ d S rJ   )r1   r"   r    r    r!   mark_startedc      
zStreamRecord.mark_startedc                 C   
   || _ d S r?   )r0   )r   r2   r    r    r!   updatef   s   
zStreamRecord.updater$   )r%   r&   r'   __annotations__r   r   r	   boolr   r   rB   rA   rI   rL   propertyrN   rP   rS   r    r    r    r!   r*   5   s"   
 



r*   c                   @   s~   e Zd ZU eed< eed< eed< eed< ddededee fd	d
ZdefddZ	dddZ
dddZedefddZdS )StreamAction_action
_stream_id
_processed_dataNaction	stream_iddatac                 C   s   || _ || _|| _t | _d S r?   )rX   rY   r[   r   rZ   )r   r\   r]   r^   r    r    r!   r   r   s   zStreamAction.__init__r   c                 C   s   d| j  d| j dS )NzStreamAction(,))rX   rY   r"   r    r    r!   __repr__x   rH   zStreamAction.__repr__c                 C      | j   d S r?   )rZ   rG   r"   r    r    r!   wait_handled{      zStreamAction.wait_handledc                 C   rb   r?   )rZ   setr"   r    r    r!   set_handled~   rd   zStreamAction.set_handledc                 C   rM   r?   )rY   r"   r    r    r!   r]      rO   zStreamAction.stream_idr?   r$   )r%   r&   r'   r)   rT   r   r   r   r   ra   rc   rf   rV   r]   r    r    r    r!   rW   l   s   
 

rW   c                   @   sN  e Zd ZU ejed< eeef ed< e	e
 ed< e	e
 ed< ded< eed< e	e ed< eed	< dQddZdRddZde
d
dfddZde
d
dfddZdeded
dfddZded
dfddZdeded
dfddZded
dfdd Zded
dfd!d"Zd#e
d
dfd$d%Zd
ee fd&d'Zded
efd(d)Zded
efd*d+Zd,ed
dfd-d.Zd,ed
dfd/d0Z d,ed
dfd1d2Z!d,ed
dfd3d4Z"d,ed
dfd5d6Z#d7e$d8ed
dfd9d:Z%d;e&d
dfd<d=Z'd>e(j)d?e*d
dfd@dAZ+dBeeef d#e
d
dfdCdDZ,d,ed
dfdEdFZ-d,ed
dfdGdHZ.d
efdIdJZ/dQdKdLZ0dQdMdNZ1dQdOdPZ2dS )S	StreamMux_streams_lock_streams_port_pidzqueue.Queue[StreamAction]	_action_q_stopped_pid_checked_tsr9   r   Nc                 C   sL   t  | _t | _d | _d | _t | _t	
 | _d | _t | _| j  d S r?   )r   Lockrh   dictri   rj   rk   r   rm   r:   r;   rl   rn   r   r9   enable_keepaliver"   r    r    r!   r      s   

zStreamMux.__init__r   c                 C   rM   r?   )rm   r"   r    r    r!   _get_stopped_event   rO   zStreamMux._get_stopped_eventportc                 C   rR   r?   )rj   )r   rs   r    r    r!   set_port   rQ   zStreamMux.set_portpidc                 C   rR   r?   )rk   )r   ru   r    r    r!   set_pid   rQ   zStreamMux.set_pidr]   r2   c                 C   &   t d||d}| j| |  d S )Naddr\   r]   r^   rW   rl   putrc   r   r]   r2   r\   r    r    r!   
add_stream      zStreamMux.add_streamc                 C   $   t d|d}| j| |  d S )Nr@   r\   r]   rz   r   r]   r\   r    r    r!   start_stream      zStreamMux.start_streamc                 C   rw   )NrS   ry   rz   r|   r    r    r!   update_stream   r~   zStreamMux.update_streamc                 C   r   )Ndelr   rz   r   r    r    r!   
del_stream   r   zStreamMux.del_streamc                 C   r   )NrL   r   rz   r   r    r    r!   drop_stream   r   zStreamMux.drop_stream	exit_codec                 C   s&   t dd|d}| j| |  d S )Nteardownnary   rz   )r   r   r\   r    r    r!   r      r~   zStreamMux.teardownc                 C   s<   | j  t| j }|W  d    S 1 sw   Y  d S r?   )rh   listri   keys)r   namesr    r    r!   stream_names   s   $zStreamMux.stream_namesc                 C   s4   | j  || jv W  d    S 1 sw   Y  d S r?   rh   ri   )r   r]   r    r    r!   
has_stream   s   $zStreamMux.has_streamc                 C   s8   | j  | j| }|W  d    S 1 sw   Y  d S r?   r   )r   r]   streamr    r    r!   
get_stream   s   
$zStreamMux.get_streamr\   c              	   C   s   t |j| jd}|j}ttjjjjt||j	|j
| j| jdd}|| | j || j|j< W d    d S 1 s:w   Y  d S )N)r3   )r2   r4   r5   rs   user_pid)r   r   )r*   r[   r9   r   wandb	wandb_sdkinternalwandb_internalrp   r+   r,   rj   rk   rB   rh   ri   rY   )r   r\   r   r2   r>   r    r    r!   _process_add   s    


"zStreamMux._process_addc                 C   s<   | j  | j|j   W d    d S 1 sw   Y  d S r?   )rh   ri   rY   rP   r   r\   r    r    r!   _process_start   s   "zStreamMux._process_startc                 C   s@   | j  | j|j |j W d    d S 1 sw   Y  d S r?   )rh   ri   rY   rS   r[   r   r    r    r!   _process_update   s   "zStreamMux._process_updatec                 C   sB   | j  | j|j}|  W d    d S 1 sw   Y  d S r?   )rh   ri   poprY   rI   r   r\   r   r    r    r!   _process_del   s   
"zStreamMux._process_delc                 C   sf   | j & |j| jv r!| j|j}|  |  W d    d S W d    d S 1 s,w   Y  d S r?   )rh   rY   ri   r   rL   rI   r   r    r    r!   _process_drop   s   
"zStreamMux._process_dropprobe_handler   c                 C   sD   |  }|r|jddd}|sd S || |j }|| d S )Nr   F)rE   release)get_mailbox_handlerG   set_probe_resultrN   deliver_poll_exitset_mailbox_handle)r   r   r   handleresultr    r    r!   _on_probe_exit   s   

zStreamMux._on_probe_exitprogress_handlec                 C      d S r?   r    )r   r   r    r    r!   _on_progress_exit     zStreamMux._on_progress_exitprogress_printerprogress_all_handlec           	      C   st   g }|  }|D ]	}||  q|sJ |  r| j  g }|D ]}| }|r2||jj	 q#|
| d S r?   )get_progress_handlesextendget_probe_handles_check_orphanedrm   re   get_probe_resultappendresponsepoll_exit_responserS   )	r   r   r   probe_handlesprogress_handlesr   poll_exit_responsesr   r   r    r    r!   _on_progress_exit_all  s   
zStreamMux._on_progress_exit_allstreamsc              	   C   s  |sd S t  }g }i }i }| D ]\}}|jr|n|}	||	|< q| D ]}|j|}
|
| j |
	t
j| j|d ||
 q&t|}| jj|dt
| j|d}|s]J W d    n1 sgw   Y  | D ]`\}}|j }|j }|j }|j }|jdd}|sJ |jj}|jdd}|sJ |jj}|jdd}|sJ |jj}|jdd}|sJ |jj}tj|||||j|d |   qp| D ]}|   qd S )N)r   rC   )handlesrE   on_progress_allrD   )sampled_historyfinal_summaryr   internal_messages_responser2   r
   )!
printerlibnew_printeritemsr1   valuesrN   deliver_exitadd_progressr   	add_probe	functoolspartialr   r   r   r   r9   wait_allr   r   deliver_get_summarydeliver_request_sampled_historydeliver_internal_messagesrG   r   r   r   sampled_history_responseget_summary_responser   _footerr0   rI   )r   r   r   r
   exit_handlesstarted_streamsnot_started_streamsr]   r   dr   r   
got_result_sidpoll_exit_handlefinal_summary_handlesampled_history_handleinternal_messages_handler   r   r   r   r   r    r    r!   _finish_all  sl   







zStreamMux._finish_allc                 C   s~   |j }| j | j }W d    n1 sw   Y  | || | j t | _W d    n1 s3w   Y  | j  d S r?   )r[   rh   ri   copyr   rp   rm   re   )r   r\   r   streams_copyr    r    r!   _process_teardownk  s   
zStreamMux._process_teardownc                 C   s   |j dkr| | d S |j dkr| | d S |j dkr$| | d S |j dkr0| | d S |j dkr<| | d S |j dkrH| | d S td|j  )Nrx   rS   r@   r   rL   r   zUnsupported action: )rX   r   r   r   r   r   r   AssertionErrorr   r    r    r!   _process_actionu  s&   











zStreamMux._process_actionc                 C   s>   | j sdS t }| jr|| jd k rdS || _t| j  S )NFr   )rk   timern   psutil
pid_exists)r   time_nowr    r    r!   r     s   zStreamMux._check_orphanedc                 C   sx   | j  s5|  r| j   z	| jjdd}W n
 tjy!   Y q w | | |	  | j
  | j  r| j  d S )N   rD   )rm   is_setr   re   rl   getr:   Emptyr   rf   	task_donerI   r   r    r    r!   _loop  s   




zStreamMux._loopc              
   C   s,   z|    W d S  ty } z|d }~ww r?   )r   	Exception)r   er    r    r!   loop  s   zStreamMux.loopc                 C   r   r?   r    r"   r    r    r!   cleanup  r   zStreamMux.cleanupr$   )r   r   )3r%   r&   r'   r   ro   rT   r   r)   r*   r   intr   floatr   r   rr   rt   rv   r	   r}   r   r   r   r   r   r   r   rU   r   r   rW   r   r   r   r   r   r   r   r   r   r   ProgressPrinterr   r   r   r   r   r   r   r   r   r    r    r    r!   rg      sV   
 




O



rg   )'r(   r   r<   r:   r   r   r   typingr   r   r   r   r   r   r   
wandb.utilwandb.protor   pb"wandb.sdk.internal.settings_staticr	   wandb.sdk.libr
   r   r   wandb.sdk.lib.mailboxr   r   r   r   wandb.sdk.wandb_runr   interface.interface_relayr   r   r   r*   rW   rg   r    r    r    r!   <module>   s,    7