o
    ciR                     @   s   d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
  m  mZ ddlm
  m  mZ ddlmZ erFddlmZ eeZde_G dd dZdS )	zaThis file implements a threaded stream controller to return logs back from
the ray clientserver.
    N)TYPE_CHECKING)log_once)WorkerFc                   @   s   e Zd ZdddefddZdejfddZdd
dZde	j
defddZdedefddZdedefddZdefddZdddZdddZd	S )LogstreamClientclient_workerr   metadatac                 C   s4   || _ || _t | _|  | _| j  d| _dS )zInitializes a thread-safe log stream over a Ray Client gRPC channel.

        Args:
            client_worker: The Ray Client worker that manages this client
            metadata: metadata to pass to gRPC requests
        N)	r   	_metadataqueueQueuerequest_queue_start_logthread
log_threadstartlast_req)selfr   r    r   N/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/client/logsclient.py__init__   s   



zLogstreamClient.__init__returnc                 C   s   t j| jdddS )Nr   T)targetargsdaemon)	threadingThread	_log_mainr   r   r   r   r   +   s   z LogstreamClient._start_logthreadNc              
   C   s
  d}| j js|rt | _| jr| j| j t| j j	}z|j
t| jjd | jd}W n ty;   td Y qw z|D ]}|jdk rO| j|j|jd | j|j|jd q?W d S  tjy| } z| |}|srW Y d }~d S W Y d }~nd }~ww | j jrd S d S )NF)r         ?r   levelmsg)r   _in_shutdownr	   r
   r   r   putray_client_pb2_grpcRayletLogStreamerStubchannel	Logstreamitergetr   
ValueErrortimesleepr   	stdstreamr   loggrpcRpcError_process_rpc_error)r   reconnectingstub
log_streamrecorder   r   r   r   .   s:   




zLogstreamClient._log_mainr4   c                 C   sV   | j |rtdrtd td td dS td | j js)t	d dS )	z
        Processes RPC errors that occur while reading from data stream.
        Returns True if the error can be recovered from, False otherwise.
        lost_reconnect_logszLog channel is reconnecting. Logs produced while the connection was down can be found on the head node of the cluster in `ray_client_server_[port].out`zLog channel dropped, retrying.r   TzShutting down log channel.zUnexpected exception:F)
r   _can_reconnectr   loggerwarningdebugr)   r*   r    	exception)r   r4   r   r   r   r/   M   s   



z"LogstreamClient._process_rpc_errorr   r   c                 C   s   t j||d dS )zLog the message from the log stream.
        By default, calls logger.log but this can be overridden.

        Args:
            level: The loglevel of the received log message
            msg: The content of the message
        r   N)r7   r,   )r   r   r   r   r   r   r,   b   s   zLogstreamClient.logc                 C   s&   |dkrt jnt j}t||dd dS )zLog the stdout/stderr entry from the log stream.
        By default, calls print but this can be overridden.

        Args:
            level: The loglevel of the received log message
            msg: The content of the message
         )fileendN)sysstderrstdoutprint)r   r   r   
print_filer   r   r   r+   l   s   zLogstreamClient.stdstreamc                 C   s4   t | t }d|_||_| j| || _d S )NT)	r7   setLevelray_client_pb2LogSettingsRequestenabledloglevelr   r!   r   )r   r   reqr   r   r   set_logstream_levelw   s   

z#LogstreamClient.set_logstream_levelc                 C   s(   | j d  | jd ur| j  d S d S )N)r   r!   r   joinr   r   r   r   close   s   
zLogstreamClient.closec                 C   s$   t  }d|_| j| || _d S )NF)rE   rF   rG   r   r!   r   )r   rI   r   r   r   disable_logs   s   
zLogstreamClient.disable_logs)r   N)__name__
__module____qualname__listr   r   r   r   r   r-   r.   boolr/   intstrr,   r+   rJ   rL   rM   r   r   r   r   r      s    


r   )__doc__r?   loggingr	   r   r)   r-   typingr   !ray.core.generated.ray_client_pb2core	generatedrE   &ray.core.generated.ray_client_pb2_grpcr"   ray.util.debugr   ray.util.client.workerr   	getLoggerrN   r7   	propagater   r   r   r   r   <module>   s     
