o
    `۷i&                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZmZmZ d dl	m
Z
mZ d dlmZ zd dlZW n ey;   dZY nw eeZG dd dZG dd deZG d	d
 d
eZG dd deZG dd deZG dd deZG dd deZG dd dZdS )    N)quoteunquoteurljoinurlparse)
httpclientioloop)SERVE_LOGGER_NAMEc                   @   s$   e Zd Zdd Zdd Zdd ZdS )
BrokerBasec                 O   sb   t |}|j| _|j| _|jdd  | _|j}|j}|r t|n|| _|r,t|| _d S || _d S )N   )	r   hostnamehostportpathvhostusernamepasswordr   )self
broker_url___purlr   r    r   O/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/broker.py__init__   s   zBrokerBase.__init__c                       t NNotImplementedErrorr   namesr   r   r   queues&      zBrokerBase.queuesc                 C   s   dS )z=Close any open connections. Override in subclasses as needed.Nr   r   r   r   r   close)   s   zBrokerBase.closeN)__name__
__module____qualname__r   r    r#   r   r   r   r   r	      s    r	   c                       s2   e Zd Zd fdd	Zdd Zedd Z  ZS )	RabbitMQNc              
      s   t  | |ptj | _| jpd| _| jpd| _| jdkr't	| jdp&dn| j| _| j
p/d| _
| jp5d| _|sOd| j
 d| j d| j d| j d	| j 
}z| | W n tye   td
| Y nw || _d S )N	localhosti8=  / guestzhttp://:@z/api/zInvalid broker api url: %s)superr   r   IOLoopinstanceio_loopr   r   r   r   r   r   validate_http_api
ValueErrorloggererrorhttp_api)r   r   r6   r1   r   	__class__r   r   r   /   s"    ,
zRabbitMQ.__init__c           
   
      s  t | jd| j }t| j}t|jpdp| j}t|jpdp"| j}t }z:z|j	|||ddddI d H }W n# t
jtjfy[ } ztd| g W  Y d }~W |  S d }~ww W |  n|  w |jdkr}t|j }	 fd	d
|	D S |  d S )Nzqueues/r*   g      ?g       @F)auth_usernameauth_passwordconnect_timeoutrequest_timeoutvalidate_certz'RabbitMQ management API call failed: %s   c                    s   g | ]
}|d   v r|qS )namer   .0xr   r   r   
<listcomp>[   s    z#RabbitMQ.queues.<locals>.<listcomp>)r   r6   r   r   r   r   r   r   AsyncHTTPClientfetchsocketr5   	HTTPErrorr4   r#   codejsonloadsbodydecoderethrow)
r   r   urlapi_urlr   r   http_clientresponseeinfor   rC   r   r    C   s6   


zRabbitMQ.queuesc                 C   s&   t |}|jdvrtd|j d S )N)httphttpszInvalid http api schema: )r   schemer3   )clsr6   rO   r   r   r   r2   ^   s   
zRabbitMQ.validate_http_apir   )r$   r%   r&   r   r    classmethodr2   __classcell__r   r   r7   r   r'   .   s
    r'   c                       s@   e Zd ZdZg dZ fddZdd Zdd Zd	d
 Z  Z	S )	RedisBasez)r         	   c                    s\   t  | d | _tstd|di }|d| j| _|d| j| _|dd| _	d S )Nzredis library is requiredbroker_optionspriority_stepssepglobal_keyprefixr*   )
r.   r   redisImportErrorgetDEFAULT_PRIORITY_STEPSr`   DEFAULT_SEPra   broker_prefix)r   r   r   kwargsr_   r7   r   r   r   i   s   zRedisBase.__init__c                 C   s4   || j vr	tddj|r|| j|f S |ddf S )NzPriority not in priority stepsz	{0}{1}{2}r*   )r`   r3   formatra   )r   queueprir   r   r   
_q_for_priw   s   
"zRedisBase._q_for_pric                    sL   g }|D ]  fddj D }| tfdd|D d q|S )Nc                    s   g | ]}j  | qS r   )rh   rm   )rA   rl   r?   r   r   r   rD      s    z$RedisBase.queues.<locals>.<listcomp>c                 3   s    | ]	} j |V  qd S r   )rc   llenr@   r"   r   r   	<genexpr>   s    z#RedisBase.queues.<locals>.<genexpr>)r?   messages)r`   appendsum)r   r   queue_statspriority_namesr   rn   r   r    }   s   zRedisBase.queuesc                 C   s"   | j dur| j   d| _ dS dS )zClose the Redis connection.N)rc   r#   r"   r   r   r   r#      s   


zRedisBase.close)
r$   r%   r&   rg   rf   r   rm   r    r#   rZ   r   r   r7   r   r[   e   s    r[   c                       4   e Zd Z fddZdd Zdd Zdd Z  ZS )	Redisc                    sN   t  j|g|R i | | jpd| _| jpd| _| | j| _|  | _d S )Nr(   i  )r.   r   r   r   _prepare_virtual_hostr   _get_redis_clientrc   r   r   argsri   r7   r   r   r      s
   zRedis.__init__c              
   C   p   t |tjs6|r|dkrd}n|dr|dd  }zt|}W |S  ty5 } ztd| |d }~ww |S Nr)   r   r
   z-Database is int between 0 and limit - 1, not 
isinstancenumbersIntegral
startswithintr3   r   r   excr   r   r   rx      "   

zRedis._prepare_virtual_hostc                 C   s   | j | j| j| j| jdS )N)r   r   dbr   r   )r   r   r   r   r   r"   r   r   r   _get_redis_client_args   s   zRedis._get_redis_client_argsc                 C   s   t jdi |  S )Nr   )rc   rw   r   r"   r   r   r   ry      s   zRedis._get_redis_client)r$   r%   r&   r   rx   r   ry   rZ   r   r   r7   r   rw      s
    	rw   c                       rv   )	RedisSentinelc                    sv   t  j|g|R i | |di }|dd }| jpd| _| jp#d| _| | j| _| || _| 	||| _
d S )Nr_   broker_use_sslr(   ig  )r.   r   re   r   r   rx   r   _prepare_master_namemaster_namery   rc   )r   r   r{   ri   r_   r   r7   r   r   r      s   zRedisSentinel.__init__c              
   C   r|   r}   r~   r   r   r   r   rx      r   z#RedisSentinel._prepare_virtual_hostc              
   C   s2   z|d }W |S  t y } ztd|d }~ww )Nr   z+master_name is required for Sentinel broker)KeyErrorr3   )r   r_   r   r   r   r   r   r      s   

z"RedisSentinel._prepare_master_namec                 C   s\   | j |dd}t|trd|d< || tjj| j| j	fgfi |}|
| j}|S )Nsentinel_kwargs)r   r   Tssl)r   re   r   dictupdaterc   sentinelSentinelr   r   
master_forr   )r   r_   r   connection_kwargsr   redis_clientr   r   r   ry      s   

zRedisSentinel._get_redis_client)r$   r%   r&   r   rx   r   ry   rZ   r   r   r7   r   r      s
    
r   c                       s   e Zd Z fddZ  ZS )RedisSocketc                    s6   t  j|g|R i | tjd| j | jd| _d S )Nr)   )unix_socket_pathr   )r.   r   rc   rw   r   r   rz   r7   r   r   r      s   zRedisSocket.__init__)r$   r%   r&   r   rZ   r   r   r7   r   r      s    r   c                       s,   e Zd ZdZ fddZ fddZ  ZS )RedisSslz
    Redis SSL class offering connection to the broker over SSL.
    This does not currently support SSL settings through the url, only through
    the broker_use_ssl celery configuration.
    c                    s<   d|vrt d|di | _t j|g|R i | d S )Nr   z%rediss broker requires broker_use_ssl)r3   re   r   r.   r   rz   r7   r   r   r      s   zRedisSsl.__init__c                    s.   t   }d|d< t| jtr|| j |S )NTr   )r.   r   r   r   r   r   )r   client_argsr7   r   r   r      s
   
zRedisSsl._get_redis_client_args)r$   r%   r&   __doc__r   r   rZ   r   r   r7   r   r      s    r   c                   @   s    e Zd ZdZdd Zdd ZdS )BrokeraV  Factory returning the appropriate broker client based on URL scheme.

    Supported schemes:
    ``amqp`` or ``amqps``  -> :class:`RabbitMQ`
    ``redis``              -> :class:`Redis`
    ``rediss``             -> :class:`RedisSsl`
    ``redis+socket``       -> :class:`RedisSocket`
    ``sentinel``           -> :class:`RedisSentinel`
    c                 O   s   t |j}|dv rt|g|R i |S |dkr#t|g|R i |S |dkr2t|g|R i |S |dkrAt|g|R i |S |dkrPt|g|R i |S t)N)amqpamqpsrc   redisszredis+socketr   )r   rW   r'   rw   r   r   r   r   )rX   r   r{   ri   rW   r   r   r   __new__  s   
zBroker.__new__c                    r   r   r   r   r   r   r   r      r!   zBroker.queuesN)r$   r%   r&   r   r   r    r   r   r   r   r     s    
r   )rJ   loggingr   rG   urllib.parser   r   r   r   tornador   r   ray.serve._private.constantsr   rc   rd   	getLoggerr4   r	   r'   r[   rw   r   r   r   r   r   r   r   r   <module>   s*   
7.#0