o
    i9                     @   s  d dl Z d dlZd dlmZmZ d dlmZ d dlmZm	Z	m
Z
mZmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ G d	d
 d
eZG dd deZG dd deZG dd deZG dd deZG dd dZG dd deZG dd deZG dd dZ G dd dZ!G dd dZ"G dd  d Z#G d!d" d"Z$G d#d$ d$e$Z%G d%d& d&eZ&G d'd( d(eZ'G d)d* d*eZ(G d+d, d,eZ)G d-d. d.eZ*G d/d0 d0eZ+G d1d2 d2eZ,dS )3    N)ABCabstractmethod)Enum)DictListOptionalTypeUnion)TokenInterface)CredentialProviderStreamingCredentialProvider)init_connection_countregister_pools_connection_count)check_protocol_versionc                   @   "   e Zd ZdZedefddZdS )EventListenerInterfacez7
    Represents a listener for given event object.
    eventc                 C      d S N selfr   r   r   ?/home/ubuntu/.local/lib/python3.10/site-packages/redis/event.pylisten      zEventListenerInterface.listenN__name__
__module____qualname____doc__r   objectr   r   r   r   r   r          r   c                   @   r   )AsyncEventListenerInterfacez>
    Represents an async listener for given event object.
    r   c                       d S r   r   r   r   r   r   r         z"AsyncEventListenerInterface.listenNr   r   r   r   r   r"      r!   r"   c                   @   s^   e Zd ZdZedefddZedefddZedee	e e
eeef  f fdd	Zd
S )EventDispatcherInterfacezf
    Represents a dispatcher that dispatches events to listeners
    associated with given event.
    r   c                 C   r   r   r   r   r   r   r   dispatch*   r   z!EventDispatcherInterface.dispatchc                    r#   r   r   r   r   r   r   dispatch_async.   r$   z'EventDispatcherInterface.dispatch_asyncmappingsc                 C   s   dS )zRegister additional listeners.Nr   )r   r(   r   r   r   register_listeners2   s   	z+EventDispatcherInterface.register_listenersN)r   r   r   r   r   r    r&   r'   r   r   r   r	   r   r"   r)   r   r   r   r   r%   $   s    r%   c                       s*   e Zd ZdZdedef fddZ  ZS )EventExceptionzM
    Exception wrapper that adds an event object into exception context.
    	exceptionr   c                    s   || _ || _t | d S r   )r+   r   super__init__)r   r+   r   	__class__r   r   r-   C   s   zEventException.__init__)r   r   r   r   	Exceptionr    r-   __classcell__r   r   r.   r   r*   >   s    r*   c                   @   st   e Zd Z	ddeeee ee f  fddZ	defddZ
defdd	Zd
eee eeeef  f fddZdS )EventDispatcherNevent_listenersc                 C   s`   t t gtt t gtt gtt gt	t
 gtt gi| _t | _d| _|r.| | dS dS )z]
        Dispatcher that dispatches events to listeners associated with given event.
        N)AfterConnectionReleasedEventReAuthConnectionListener(AfterPooledConnectionsInstantiationEvent"RegisterReAuthForPooledConnections&InitializeConnectionCountObservability'AfterSingleConnectionInstantiationEvent!RegisterReAuthForSingleConnection'AfterPubSubConnectionInstantiationEventRegisterReAuthForPubSub#AfterAsyncClusterInstantiationEvent"RegisterReAuthForAsyncClusterNodes!AsyncAfterConnectionReleasedEventAsyncReAuthConnectionListener_event_listeners_mapping	threadingLock_lock_async_lockr)   )r   r3   r   r   r   r-   K   s,   
zEventDispatcher.__init__r   c                 C   sR   | j  | jt|g }|D ]}|| qW d    d S 1 s"w   Y  d S r   )rD   rA   gettyper   r   r   	listenerslistenerr   r   r   r&   n   s   "zEventDispatcher.dispatchc              	      s   | j d u rt | _ | j 4 I d H " | jt|g }|D ]
}||I d H  qW d   I d H  d S 1 I d H s:w   Y  d S r   )rE   asynciorC   rA   rF   rG   r   rH   r   r   r   r'   u   s   

.zEventDispatcher.dispatch_asyncr(   c                 C   sp   | j + |D ]}|| jv rtt| j| ||  | j|< q|| | j|< qW d    d S 1 s1w   Y  d S r   )rD   rA   listset)r   r(   
event_typer   r   r   r)      s   
"z"EventDispatcher.register_listenersr   )r   r   r   r   r   r   r    r   r   r-   r&   r'   r	   r"   r)   r   r   r   r   r2   I   s     
#
r2   c                   @   s$   e Zd ZdZdd Zedd ZdS )r4   zA
    Event that will be fired before each command execution.
    c                 C   s
   || _ d S r   _connection)r   
connectionr   r   r   r-         
z%AfterConnectionReleasedEvent.__init__c                 C      | j S r   rO   r   r   r   r   rQ         z'AfterConnectionReleasedEvent.connectionN)r   r   r   r   r-   propertyrQ   r   r   r   r   r4      s
    r4   c                   @      e Zd ZdS )r?   Nr   r   r   r   r   r   r   r?          r?   c                   @   s   e Zd ZdZdZdS )
ClientType)sync)asyncN)r   r   r   SYNCASYNCr   r   r   r   rZ      s    rZ   c                   @   sf   e Zd ZdZ	ddededee fddZe	dd	 Z
e	d
efddZe	d
eedf fddZdS )r6   zQ
    Event that will be fired after pooled connection instances was created.
    Nconnection_poolsclient_typecredential_providerc                 C      || _ || _|| _d S r   )_connection_pools_client_type_credential_provider)r   r_   r`   ra   r   r   r   r-         
z1AfterPooledConnectionsInstantiationEvent.__init__c                 C   rS   r   )rc   rT   r   r   r   r_      rU   z9AfterPooledConnectionsInstantiationEvent.connection_poolsreturnc                 C   rS   r   rd   rT   r   r   r   r`      rU   z4AfterPooledConnectionsInstantiationEvent.client_typec                 C   rS   r   re   rT   r   r   r   ra      rU   z<AfterPooledConnectionsInstantiationEvent.credential_providerr   )r   r   r   r   r   rZ   r   r   r-   rV   r_   r`   r	   ra   r   r   r   r   r6      s     


r6   c                   @   sj   e Zd ZdZdedeejej	f fddZ
edd Zedefd	d
Zedeejej	f fddZdS )r9   z
    Event that will be fired after single connection instances was created.

    :param connection_lock: For sync client thread-lock should be provided,
    for async asyncio.Lock
    r`   connection_lockc                 C   rb   r   )rP   rd   _connection_lock)r   rQ   r`   rj   r   r   r   r-      rf   z0AfterSingleConnectionInstantiationEvent.__init__c                 C   rS   r   rO   rT   r   r   r   rQ      rU   z2AfterSingleConnectionInstantiationEvent.connectionrg   c                 C   rS   r   rh   rT   r   r   r   r`      rU   z3AfterSingleConnectionInstantiationEvent.client_typec                 C   rS   r   rk   rT   r   r   r   rj      rU   z7AfterSingleConnectionInstantiationEvent.connection_lockN)r   r   r   r   rZ   r	   rB   RLockrK   rC   r-   rV   rQ   r`   rj   r   r   r   r   r9      s    


 r9   c                   @   sr   e Zd Zdedeejejf fddZ	e
dd Ze
dd Ze
d	efd
dZe
d	eejejf fddZdS )r;   r`   rj   c                 C   s   || _ || _|| _|| _d S r   )_pubsub_connection_connection_poolrd   rk   )r   pubsub_connectionconnection_poolr`   rj   r   r   r   r-      s   
z0AfterPubSubConnectionInstantiationEvent.__init__c                 C   rS   r   )rn   rT   r   r   r   rp      rU   z9AfterPubSubConnectionInstantiationEvent.pubsub_connectionc                 C   rS   r   )ro   rT   r   r   r   rq      rU   z7AfterPubSubConnectionInstantiationEvent.connection_poolrg   c                 C   rS   r   rh   rT   r   r   r   r`      rU   z3AfterPubSubConnectionInstantiationEvent.client_typec                 C   rS   r   rl   rT   r   r   r   rj      rU   z7AfterPubSubConnectionInstantiationEvent.connection_lockN)r   r   r   rZ   r	   rB   rm   rK   rC   r-   rV   rp   rq   r`   rj   r   r   r   r   r;      s    


 r;   c                   @   sV   e Zd ZdZ	ddedee fddZedefdd	Z	ede
edf fd
dZdS )r=   z
    Event that will be fired after async cluster instance was created.

    Async cluster doesn't use connection pools,
    instead ClusterNode object manages connections.
    Nnodesra   c                 C      || _ || _d S r   )_nodesre   )r   rr   ra   r   r   r   r-   
     
z,AfterAsyncClusterInstantiationEvent.__init__rg   c                 C   rS   r   )rt   rT   r   r   r   rr     rU   z)AfterAsyncClusterInstantiationEvent.nodesc                 C   rS   r   ri   rT   r   r   r   ra     rU   z7AfterAsyncClusterInstantiationEvent.credential_providerr   )r   r   r   r   dictr   r   r-   rV   rr   r	   ra   r   r   r   r   r=     s    

r=   c                   @   sF   e Zd ZdZdedefddZedefddZedefd	d
Z	dS )OnCommandsFailEventzD
    Event fired whenever a command fails during the execution.
    commandsr+   c                 C   rs   r   )	_commands
_exception)r   rx   r+   r   r   r   r-      ru   zOnCommandsFailEvent.__init__rg   c                 C   rS   r   )ry   rT   r   r   r   rx   (  rU   zOnCommandsFailEvent.commandsc                 C   rS   r   )rz   rT   r   r   r   r+   ,  rU   zOnCommandsFailEvent.exceptionN)
r   r   r   r   tupler0   r-   rV   rx   r+   r   r   r   r   rw     s    
rw   c                   @   rW   )AsyncOnCommandsFailEventNrX   r   r   r   r   r|   1  rY   r|   c                   @      e Zd ZdZdefddZdS )r5   zG
    Listener that performs re-authentication of given connection.
    r   c                 C   s   |j   d S r   rQ   re_authr   r   r   r   r   :  s   zReAuthConnectionListener.listenN)r   r   r   r   r4   r   r   r   r   r   r5   5      r5   c                   @   r}   )r@   zM
    Async listener that performs re-authentication of given connection.
    r   c                    s   |j  I d H  d S r   r~   r   r   r   r   r   C  s   z$AsyncReAuthConnectionListener.listenN)r   r   r   r   r?   r   r   r   r   r   r@   >  r   r@   c                   @   R   e Zd ZdZdd ZdefddZdd Zd	d
 Zde	fddZ
de	fddZdS )r7   z
    Listener that registers a re-authentication callback for pooled connections.
    Required by :class:`StreamingCredentialProvider`.
    c                 C   
   d | _ d S r   _eventrT   r   r   r   r-   M  rR   z+RegisterReAuthForPooledConnections.__init__r   c                 C   sb   t |jtr/|| _|jtjkr|j| j |j	| j
 d S |j| j |j	| j d S d S r   )
isinstancera   r   r   r`   rZ   r]   on_next_re_authon_error_raise_on_error_re_auth_async_raise_on_error_asyncr   r   r   r   r   P  s   z)RegisterReAuthForPooledConnections.listenc                 C   s   | j jD ]}|| qd S r   r   r_   re_auth_callbackr   tokenpoolr   r   r   r   [  s   z+RegisterReAuthForPooledConnections._re_authc                    s$   | j jD ]
}||I d H  qd S r   r   r   r   r   r   r   _  s   z1RegisterReAuthForPooledConnections._re_auth_asyncerrorc                 C      t || jr   r*   r   r   r   r   r   r   r   c     z2RegisterReAuthForPooledConnections._raise_on_errorc                       t || jr   r   r   r   r   r   r   f     z8RegisterReAuthForPooledConnections._raise_on_error_asyncN)r   r   r   r   r-   r6   r   r   r   r0   r   r   r   r   r   r   r7   G  s    r7   c                   @   r   )r:   z
    Listener that registers a re-authentication callback for single connection.
    Required by :class:`StreamingCredentialProvider`.
    c                 C   r   r   r   rT   r   r   r   r-   p  rR   z*RegisterReAuthForSingleConnection.__init__r   c                 C   sl   t |jjtr4|| _|jtjkr"|jj| j	 |jj
| j d S |jj| j |jj
| j d S d S r   )r   rQ   ra   r   r   r`   rZ   r]   r   r   r   r   r   r   r   r   r   r   r   s  s   z(RegisterReAuthForSingleConnection.listenc                 C   sV   | j j | j jd|d|  | j j  W d    d S 1 s$w   Y  d S NAUTHoidr   rj   rQ   send_commandtry_get	get_valueread_responser   r   r   r   r   r     s   
"z*RegisterReAuthForSingleConnection._re_authc              	      sx   | j j4 I d H & | j jd|d| I d H  | j j I d H  W d   I d H  d S 1 I d H s5w   Y  d S r   r   r   r   r   r   r     s   
.z0RegisterReAuthForSingleConnection._re_auth_asyncr   c                 C   r   r   r   r   r   r   r   r     r   z1RegisterReAuthForSingleConnection._raise_on_errorc                    r   r   r   r   r   r   r   r     r   z7RegisterReAuthForSingleConnection._raise_on_error_asyncN)r   r   r   r   r-   r9   r   r   r   r0   r   r   r   r   r   r   r:   j  s    r:   c                   @   s>   e Zd Zdd ZdefddZdefddZd	efd
dZ	dS )r>   c                 C   r   r   r   rT   r   r   r   r-     rR   z+RegisterReAuthForAsyncClusterNodes.__init__r   c                 C   s6   t |jtr|| _|j| j |j| j d S d S r   )r   ra   r   r   r   r   r   r   r   r   r   r   r     s
   z)RegisterReAuthForAsyncClusterNodes.listenr   c                    s,   | j jD ]}| j j| |I d H  qd S r   )r   rr   r   )r   r   keyr   r   r   r     s   z+RegisterReAuthForAsyncClusterNodes._re_authr   c                    r   r   r   r   r   r   r   r     r   z2RegisterReAuthForAsyncClusterNodes._raise_on_errorN)
r   r   r   r-   r=   r   r
   r   r0   r   r   r   r   r   r>     s
    r>   c                   @   sZ   e Zd Zdd ZdefddZdefddZdefd	d
Zde	fddZ
de	fddZdS )r<   c                 C   s"   d | _ d | _d | _d | _d | _d S r   )rP   ro   rd   rk   r   rT   r   r   r   r-     s
   
z RegisterReAuthForPubSub.__init__r   c                 C   s   t |jjtrLt|j drN|| _|j| _|j| _	|j
| _|j| _| jtjkr:| jj| j | jj| j d S | jj| j | jj| j d S d S d S )N   )r   rp   ra   r   r   get_protocolr   rP   rq   ro   r`   rd   rj   rk   rZ   r]   r   r   r   r   r   r   r   r   r   r   r     s$   zRegisterReAuthForPubSub.listenr   c                 C   sZ   | j  | jd|d|  | j  W d    n1 s w   Y  | j| d S r   rk   rP   r   r   r   r   ro   r   r   r   r   r   r     s   z RegisterReAuthForPubSub._re_authc              	      s   | j 4 I d H # | jd|d| I d H  | j I d H  W d   I d H  n1 I d H s1w   Y  | j|I d H  d S r   r   r   r   r   r   r     s   
(z&RegisterReAuthForPubSub._re_auth_asyncr   c                 C   r   r   r   r   r   r   r   r     r   z'RegisterReAuthForPubSub._raise_on_errorc                    r   r   r   r   r   r   r   r     r   z-RegisterReAuthForPubSub._raise_on_error_asyncN)r   r   r   r-   r;   r   r
   r   r   r0   r   r   r   r   r   r   r<     s    		r<   c                   @   r}   )r8   zC
    Listener that initializes connection count observability.
    r   c                 C   s   t   t|j d S r   )r   r   r_   r   r   r   r   r     s   z-InitializeConnectionCountObservability.listenN)r   r   r   r   r6   r   r   r   r   r   r8     r   r8   )-rK   rB   abcr   r   enumr   typingr   r   r   r   r	   redis.auth.tokenr
   redis.credentialsr   r   redis.observability.recorderr   r   redis.utilsr   r   r"   r%   r0   r*   r2   r4   r?   rZ   r6   r9   r;   r=   rw   r|   r5   r@   r7   r:   r>   r<   r8   r   r   r   r   <module>   s<    

J		#-4