o
    if0                     @   sB  d dl mZ d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZmZmZ d d	lmZmZmZmZmZ d d
l m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z+ d dl,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 G dd de.Z5G dd de-e5Z6dS )    )abstractmethod)iscoroutinefunction)datetime)Any	AwaitableCallableListOptionalUnion)RedisCluster)PipelinePubSub)AsyncDatabaseDatabase	Databases)AsyncActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYAsyncFailoverStrategyDefaultFailoverStrategyExecutorFailoverStrategyExecutor)AsyncFailureDetector)record_geo_failover)Retry)AsyncOnCommandsFailEventEventDispatcherInterface)State)BaseCommandExecutorCommandExecutor)DEFAULT_AUTO_FALLBACK_INTERVAL)GeoFailoverReason)KeyTc                   @   sV  e Zd ZeedefddZeedee fddZ	ededdfdd	Z
eedee fd
dZedededdfddZeedee fddZejededdfddZeedefddZeedefddZedd Zedd ZedefddZedeegdf fd d!Zed"efd#d$Zed%e de!fd&d'Z"dS )(AsyncCommandExecutorreturnc                 C      dS )zReturns a list of databases.N selfr(   r(   Z/home/ubuntu/.local/lib/python3.10/site-packages/redis/asyncio/multidb/command_executor.py	databases"      zAsyncCommandExecutor.databasesc                 C   r'   )z$Returns a list of failure detectors.Nr(   r)   r(   r(   r+   failure_detectors(   r-   z&AsyncCommandExecutor.failure_detectorsfailure_detectorNc                 C   r'   )z=Adds a new failure detector to the list of failure detectors.Nr(   r*   r/   r(   r(   r+   add_failure_detector.   s   z)AsyncCommandExecutor.add_failure_detectorc                 C   r'   )z"Returns currently active database.Nr(   r)   r(   r(   r+   active_database3   r-   z$AsyncCommandExecutor.active_databasedatabasereasonc                       dS )zSets the currently active database.

        Args:
            database: The new active database.
            reason: The reason for the failover.
        Nr(   )r*   r3   r4   r(   r(   r+   set_active_database9   s   
z(AsyncCommandExecutor.set_active_databasec                 C   r'   )z Returns currently active pubsub.Nr(   r)   r(   r(   r+   active_pubsubE   r-   z"AsyncCommandExecutor.active_pubsubpubsubc                 C   r'   )zSets currently active pubsub.Nr(   r*   r8   r(   r(   r+   r7   K   r-   c                 C   r'   )z#Returns failover strategy executor.Nr(   r)   r(   r(   r+   failover_strategy_executorQ   r-   z/AsyncCommandExecutor.failover_strategy_executorc                 C   r'   )zReturns command retry object.Nr(   r)   r(   r(   r+   command_retryW   r-   z"AsyncCommandExecutor.command_retryc                    r5   )z:Initializes a PubSub object on a currently active databaseNr(   r*   kwargsr(   r(   r+   r8   ]      zAsyncCommandExecutor.pubsubc                    r5   )z*Executes a command and returns the result.Nr(   )r*   argsoptionsr(   r(   r+   execute_commandb   r>   z$AsyncCommandExecutor.execute_commandcommand_stackc                    r5   )z)Executes a stack of commands in pipeline.Nr(   )r*   rB   r(   r(   r+   execute_pipelineg   r>   z%AsyncCommandExecutor.execute_pipelinetransactionc                    r5   )z1Executes a transaction block wrapped in callback.Nr(   )r*   rD   watchesr@   r(   r(   r+   execute_transactionl   s   z(AsyncCommandExecutor.execute_transactionmethod_namec                    r5   )z*Executes a given method on active pub/sub.Nr(   )r*   rG   r?   r=   r(   r(   r+   execute_pubsub_methods   r>   z*AsyncCommandExecutor.execute_pubsub_method
sleep_timec                    r5   )z!Executes pub/sub run in a thread.Nr(   )r*   rI   r=   r(   r(   r+   execute_pubsub_runx   r>   z'AsyncCommandExecutor.execute_pubsub_run)#__name__
__module____qualname__propertyr   r   r,   r   r   r.   r1   r	   r   r2   r#   r6   r   r7   setterr   r:   r   r;   r8   rA   tuplerC   r   r   rF   strrH   floatr   rJ   r(   r(   r(   r+   r%   !   s\    

r%   c                       s  e Zd Zeeefdee dede	de
dedededef fd	d
ZedefddZedee fddZdeddfddZedee fddZdededdfddZedee fddZejdeddfddZedefddZede	fd d!Zd"d# Zd$d% Zd&e fd'd(Z!dd)dd*d+e"d,ge#e$e%e$ f f d-e&d.ee' d/e(d0ee f
d1d2Z)d3e'fd4d5Z*	dGd6ede$fd7d8Z+	9dHd:e"d;e fd<d=Z,d>d? Z-d@dA Z.dBe fdCdDZ/dEdF Z0  Z1S )IDefaultCommandExecutorr.   r,   r;   failover_strategyevent_dispatcherfailover_attemptsfailover_delayauto_fallback_intervalc	           
         sn   t  | |D ]}	|	j| d q|| _|| _|| _t|||| _|| _d| _	d| _
i | _|   |   dS )a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)super__init__set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcher_schedule_next_fallback)
r*   r.   r,   r;   rT   rU   rV   rW   rX   fd	__class__r(   r+   r[      s   zDefaultCommandExecutor.__init__r&   c                 C      | j S N)r]   r)   r(   r(   r+   r,         z DefaultCommandExecutor.databasesc                 C   rj   rk   )r^   r)   r(   r(   r+   r.      rl   z(DefaultCommandExecutor.failure_detectorsr/   Nc                 C   s   | j | d S rk   )r^   appendr0   r(   r(   r+   r1      s   z+DefaultCommandExecutor.add_failure_detectorc                 C   rj   rk   )rb   r)   r(   r(   r+   r2      rl   z&DefaultCommandExecutor.active_databaser3   r4   c                    sd   | j }|| _ |d ur.||ur0t|||dI d H  | jt|| j | fi | jI d H  d S d S d S )N)	fail_fromfail_tor4   )rb   r   ra   dispatch_asyncr   rd   )r*   r3   r4   
old_activer(   r(   r+   r6      s&   z*DefaultCommandExecutor.set_active_databasec                 C   rj   rk   rc   r)   r(   r(   r+   r7      rl   z$DefaultCommandExecutor.active_pubsubr8   c                 C   s
   || _ d S rk   rr   r9   r(   r(   r+   r7      s   
c                 C   rj   rk   )r`   r)   r(   r(   r+   r:      rl   z1DefaultCommandExecutor.failover_strategy_executorc                 C   rj   rk   )r_   r)   r(   r(   r+   r;      rl   z$DefaultCommandExecutor.command_retryc                 K   sD   | j d u r t| jjtrtd| jjjdi || _ || _d S d S )Nz(PubSub is not supported for RedisClusterr(   )rc   
isinstancerb   clientr   
ValueErrorr8   rd   r<   r(   r(   r+   r8      s   

zDefaultCommandExecutor.pubsubc                    s$    fdd} | I d H S )Nc                     s0   j jj i I d H }  I d H  | S rk   )rb   rt   rA   _register_command_executionresponser?   r@   r*   r(   r+   callback   s   z8DefaultCommandExecutor.execute_command.<locals>.callback_execute_with_failure_detection)r*   r?   r@   rz   r(   ry   r+   rA      s   z&DefaultCommandExecutor.execute_commandrB   c                    s"    fdd} | I d H S )Nc               	      s   j j 4 I d H +}  D ]\}}| j|i | q|  I d H } I d H  |W  d   I d H  S 1 I d H s<w   Y  d S rk   )rb   rt   pipelinerA   executerv   )pipecommandr@   rx   rB   r*   r(   r+   rz      s   0z9DefaultCommandExecutor.execute_pipeline.<locals>.callbackr{   )r*   rB   rz   r(   r   r+   rC      s   	z'DefaultCommandExecutor.execute_pipelineF
shard_hintvalue_from_callablewatch_delayfuncr   rE   r   r   r   c                   s(    fdd} |I d H S )Nc                     s<   j jj gR dI d H } dI d H  | S )Nr   r(   )rb   rt   rD   rv   rw   r   r*   r   r   r   rE   r(   r+   rz     s   z<DefaultCommandExecutor.execute_transaction.<locals>.callbackr{   )r*   r   r   r   r   rE   rz   r(   r   r+   rF      s   z*DefaultCommandExecutor.execute_transactionrG   c                    s,    fdd}j |g R  I d H S )Nc                     sN   t j} t| r|  i I d H }n|  i } I d H  |S rk   )getattrr7   r   rv   )methodrx   r?   r=   rG   r*   r(   r+   rz     s   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callbackr{   )r*   rG   r?   r=   rz   r(   r   r+   rH     s   
z,DefaultCommandExecutor.execute_pubsub_methodrI   c                    s$    fdd} |I d H S )Nc                      s   j j dI d H S )N)poll_timeoutexception_handlerr8   )rc   runr(   r   r8   r*   rI   r(   r+   rz      s   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callbackr{   )r*   rI   r   r8   rz   r(   r   r+   rJ     s   z)DefaultCommandExecutor.execute_pubsub_runr(   rz   cmdsc                    s6    fddj fddfddI dH S )zO
        Execute a commands execution callback with failure detection.
        c                      s     I d H    I d H S rk   )_check_active_databaser(   )rz   r*   r(   r+   wrapper0  s   zGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapperc                      s     S rk   r(   r(   )r   r(   r+   <lambda>6  s    zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>c                    s   j | g R  S rk   )_on_command_fail)error)r   r*   r(   r+   r   7  s    N)r_   call_with_retry)r*   rz   r   r(   )rz   r   r*   r   r+   r|   )  s   

z6DefaultCommandExecutor._execute_with_failure_detectionc                    sh   | j du s| j jjtjks| jdkr0| jt kr2| 	| j
 I dH tjI dH  |   dS dS dS )zB
        Checks if active a database needs to be updated.
        Nr   )rb   circuitstateCBStateCLOSED_auto_fallback_interval_next_fallback_attemptr   nowr6   r`   r~   r#   	AUTOMATICrf   r)   r(   r(   r+   r   :  s   


z-DefaultCommandExecutor._check_active_databasec                    s   | j t||I d H  d S rk   )ra   rp   r   )r*   r   r?   r(   r(   r+   r   L  s   z'DefaultCommandExecutor._on_command_failcmdc                    s"   | j D ]
}||I d H  qd S rk   )r^   register_command_execution)r*   r   detectorr(   r(   r+   rv   Q  s   
z2DefaultCommandExecutor._register_command_executionc                 C   s4   t | j}t }t }| jt|gt||gi dS )z0
        Registers necessary listeners.
        N)r   r^   r   r   ra   register_listenersr   r   )r*   failure_listenerresubscribe_listenerclose_connection_listenerr(   r(   r+   re   U  s   
z.DefaultCommandExecutor._setup_event_dispatcher)NN)r(   )2rK   rL   rM   r   r   r"   r   r   r   r   r   r   intrR   r[   rN   r,   r.   r1   r	   r   r2   r#   r6   r   r7   rO   r   r:   r;   r8   rA   rP   rC   r   r
   r   r   r$   rQ   boolrF   rH   rJ   r|   r   r   rv   re   __classcell__r(   r(   rh   r+   rS   ~   s    	*




rS   N)7abcr   asyncior   r   typingr   r   r   r   r	   r
   redis.asyncior   redis.asyncio.clientr   r   redis.asyncio.multidb.databaser   r   r   redis.asyncio.multidb.eventr   r   r   r   redis.asyncio.multidb.failoverr   r   r   r   r   &redis.asyncio.multidb.failure_detectorr   $redis.asyncio.observability.recorderr   redis.asyncio.retryr   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.command_executorr    r!   redis.multidb.configr"   redis.observability.attributesr#   redis.typingr$   r%   rS   r(   r(   r(   r+   <module>   s(     ]