o
    `۷i                     @   s   d Z ddlZddlZddlZddlZddlZddlZddlm  m	  m
Z
 ddlm  m	  mZ ddlmZ ddlmZ ddlmZ eeZG dd dejZG dd	 d	Zd
d ZG dd dejZdS )zNThis file responds to log stream requests and forwards logs
with its handler.
    N)"global_worker_stdstream_dispatcher)print_worker_logs)CLIENT_SERVER_MAX_THREADSc                       s,   e Zd Z fddZdejfddZ  ZS )LogstreamHandlerc                    s   t    || _|| _d S N)super__init__queuelevel)selfr	   r
   	__class__ X/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/util/client/server/logservicer.pyr      s   

zLogstreamHandler.__init__recordc                 C   s2   t  }| |_|j|_|j|_| j| d S r   )	ray_client_pb2LogData
getMessagemsglevelnor
   namer	   put)r   r   logdatar   r   r   emit   s
   
zLogstreamHandler.emit)__name__
__module____qualname__r   logging	LogRecordr   __classcell__r   r   r   r   r      s    r   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
StdStreamHandlerc                 C   s   || _ tt | _d S r   )r	   struuiduuid4id)r   r	   r   r   r   r   $   s   zStdStreamHandler.__init__c                 C   sx   t  }|d r
dnd|_|d rdnd|_t }t|| | |_W d    n1 s/w   Y  | j	
| d S )Nis_errstderrstdout)r   r   r
   r   ioStringIOr   getvaluer   r	   r   )r   datar   filer   r   r   handle(   s   

zStdStreamHandler.handlec                 C   s   t | j| j d S r   )r   add_handlerr$   r/   r   r   r   r   register_global1   s   z StdStreamHandler.register_globalc                 C   s   t | j d S r   )r   remove_handlerr$   r1   r   r   r   unregister_global4   s   z"StdStreamHandler.unregister_globalN)r   r   r   r   r/   r2   r4   r   r   r   r   r    #   s
    	r    c              
   C   s$  t | }d }td}| }zjz4|D ]/}|d ur'|| || |  |js-d }qt| |j	}|
  || ||j	 qW n tjy_ } ztd|  W Y d }~nd }~ww W |d urs|| || |  | d  d S |d ur|| || |  | d  w )Nrayz8closing log thread grpc error reading request_iterator: )r    r   	getLoggergetEffectiveLevelsetLevelremoveHandlerr4   enabledr   loglevelr2   
addHandlergrpcRpcErrorloggerdebugr   )	log_queuerequest_iteratorstd_handlercurrent_handlerroot_loggerdefault_levelreqer   r   r   log_status_change_thread8   sB   







rI   c                       s$   e Zd Z fddZdd Z  ZS )LogstreamServicerc                    s   t    d| _t | _d S )Nr   )r   r   num_clients	threadingLockclient_lockr1   r   r   r   r   U   s   
zLogstreamServicer.__init__c           
      c   s   d}| j B td }| jd |kr/|tjj td| j d| d 	 W d    d S |  jd7  _d}t	d| j  W d    n1 sKw   Y  t
 }tjt||fdd	}|  z^zt|jd }|D ]}|d u rt n|V  qlW n tjy }	 ztd
|	  W Y d }	~	nd }	~	ww W |  | j  |r|  jd8  _W d    d S W d    d S 1 sw   Y  d S |  | j  |r|  jd8  _W d    w W d    w 1 sw   Y  w )NF      zLogstream: Num clients z has reached the threshold z. Rejecting new connection.Tz0New logs connection established. Total clients: )targetargsdaemonzClosing log channel: )rN   r   rK   set_coder=   
StatusCodeRESOURCE_EXHAUSTEDr?   warninginfor	   QueuerL   ThreadrI   startitergetr>   r@   join)
r   rB   contextinitialized	thresholdrA   thread
queue_iterr   rH   r   r   r   	LogstreamZ   sh   
	
"zLogstreamServicer.Logstream)r   r   r   r   rd   r   r   r   r   r   rJ   T   s    rJ   )__doc__r*   r   r	   rL   r"   r=   !ray.core.generated.ray_client_pb2core	generatedr   &ray.core.generated.ray_client_pb2_grpcray_client_pb2_grpcray._private.ray_loggingr   ray._private.workerr   ray.util.client.commonr   r6   r   r?   Handlerr   r    rI   RayletLogStreamerServicerrJ   r   r   r   r   <module>   s"    
