o
    ir0                     @   s6  d dl mZmZ d dlmZmZ d dlmZmZmZm	Z	m
Z
 d dlmZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZmZmZmZ d d
l m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- G dd deZ.G dd de.Z/G dd de.Z0G dd de0e/Z1dS )    )ABCabstractmethod)datetime	timedelta)AnyCallableListOptionalTuple)PipelinePubSubPubSubWorkerThread)EventDispatcherInterfaceOnCommandsFailEvent)State)DEFAULT_AUTO_FALLBACK_INTERVAL)Database	DatabasesSyncDatabase)ActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYDefaultFailoverStrategyExecutorFailoverStrategyFailoverStrategyExecutor)FailureDetector)GeoFailoverReason)record_geo_failover)Retryc                   @   s>   e Zd ZeedefddZejededdfddZdS )CommandExecutorreturnc                 C      dS )zReturns auto-fallback interval.N selfr%   r%   R/home/ubuntu/.local/lib/python3.10/site-packages/redis/multidb/command_executor.pyauto_fallback_interval      z&CommandExecutor.auto_fallback_intervalr)   Nc                 C   r$   )zSets auto-fallback interval.Nr%   r'   r)   r%   r%   r(   r)   $   r*   )__name__
__module____qualname__propertyr   floatr)   setterr%   r%   r%   r(   r"      s    r"   c                   @   sR   e Zd ZefdefddZedefddZejde	ddfddZdd	d
Z
dS )BaseCommandExecutorr)   c                 C   s   || _ |  d S N_auto_fallback_intervalr+   r%   r%   r(   __init__,   s   zBaseCommandExecutor.__init__r#   c                 C      | j S r3   r4   r&   r%   r%   r(   r)   3      z*BaseCommandExecutor.auto_fallback_intervalNc                 C   
   || _ d S r3   r4   r+   r%   r%   r(   r)   7      
c                 C   s(   | j dk rd S t t| j d | _d S )Nr   )seconds)r5   r   nowr   _next_fallback_attemptr&   r%   r%   r(   _schedule_next_fallback;   s
   
z+BaseCommandExecutor._schedule_next_fallback)r#   N)r,   r-   r.   r   r0   r6   r/   r)   r1   intr>   r%   r%   r%   r(   r2   +   s    
r2   c                   @   s`  e Zd ZeedefddZeedee fddZ	ededdfdd	Z
eedee fd
dZejedeeef ddfddZeedee fddZejededdfddZeedefddZeedefddZedd Zedd ZedefddZedeegdf fddZed efd!d"Z ed#e!de"fd$d%Z#dS )&SyncCommandExecutorr#   c                 C   r$   )zReturns a list of databases.Nr%   r&   r%   r%   r(   	databasesE   r*   zSyncCommandExecutor.databasesc                 C   r$   )z$Returns a list of failure detectors.Nr%   r&   r%   r%   r(   failure_detectorsK   r*   z%SyncCommandExecutor.failure_detectorsfailure_detectorNc                 C   r$   )z=Adds a new failure detector to the list of failure detectors.Nr%   r'   rC   r%   r%   r(   add_failure_detectorQ      z(SyncCommandExecutor.add_failure_detectorc                 C   r$   )z"Returns currently active database.Nr%   r&   r%   r%   r(   active_databaseV   r*   z#SyncCommandExecutor.active_databasevaluec                 C   r$   )zSets the currently active database.

        Args:
            value: A tuple of (database, reason) where database is the new active
                   database and reason is the GeoFailoverReason for the change.
        Nr%   )r'   rH   r%   r%   r(   rG   \   s   	c                 C   r$   )z Returns currently active pubsub.Nr%   r&   r%   r%   r(   active_pubsubg   r*   z!SyncCommandExecutor.active_pubsubpubsubc                 C   r$   )zSets currently active pubsub.Nr%   r'   rJ   r%   r%   r(   rI   m   r*   c                 C   r$   )z#Returns failover strategy executor.Nr%   r&   r%   r%   r(   failover_strategy_executors   r*   z.SyncCommandExecutor.failover_strategy_executorc                 C   r$   )zReturns command retry object.Nr%   r&   r%   r%   r(   command_retryy   r*   z!SyncCommandExecutor.command_retryc                 K   r$   )z:Initializes a PubSub object on a currently active databaseNr%   )r'   kwargsr%   r%   r(   rJ      rF   zSyncCommandExecutor.pubsubc                 O   r$   )z*Executes a command and returns the result.Nr%   )r'   argsoptionsr%   r%   r(   execute_command   rF   z#SyncCommandExecutor.execute_commandcommand_stackc                 C   r$   )z)Executes a stack of commands in pipeline.Nr%   )r'   rR   r%   r%   r(   execute_pipeline   rF   z$SyncCommandExecutor.execute_pipelinetransactionc                 O   r$   )z1Executes a transaction block wrapped in callback.Nr%   )r'   rT   watchesrP   r%   r%   r(   execute_transaction   s   z'SyncCommandExecutor.execute_transactionmethod_namec                 O   r$   )z*Executes a given method on active pub/sub.Nr%   )r'   rW   rO   rN   r%   r%   r(   execute_pubsub_method   rF   z)SyncCommandExecutor.execute_pubsub_method
sleep_timec                 K   r$   )z!Executes pub/sub run in a thread.Nr%   )r'   rY   rN   r%   r%   r(   execute_pubsub_run   rF   z&SyncCommandExecutor.execute_pubsub_run)$r,   r-   r.   r/   r   r   rA   r   r   rB   rE   r	   r   rG   r1   r
   r   r   r   rI   r   rL   r!   rM   rJ   rQ   tuplerS   r   r   rV   strrX   r0   r   rZ   r%   r%   r%   r(   r@   D   sR    	

r@   c                       s  e Zd Zeeefdee dede	de
dedededef fd	d
ZedefddZedee fddZdeddfddZede	fddZedee fddZejdeeef ddfddZedee fddZejdeddfddZedefddZd d! Zd"efd#d$Z d%e!e"gdf fd&d'Z#d(d) Z$d*e%fd+d,Z&d>d.d/Z'd?d1e!d2efd3d4Z(d5d6 Z)d7d8 Z*d9efd:d;Z+d<d= Z,  Z-S )@DefaultCommandExecutorrB   rA   rM   failover_strategyevent_dispatcherfailover_attemptsfailover_delayr)   c	           
         sn   t  | |D ]}	|	j| d q|| _|| _|| _t|||| _|| _d| _	d| _
i | _|   |   dS )a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)superr6   set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcherr>   )
r'   rB   rA   rM   r^   r_   r`   ra   r)   fd	__class__r%   r(   r6      s   zDefaultCommandExecutor.__init__r#   c                 C   r7   r3   )re   r&   r%   r%   r(   rA      r8   z DefaultCommandExecutor.databasesc                 C   r7   r3   )rf   r&   r%   r%   r(   rB      r8   z(DefaultCommandExecutor.failure_detectorsrC   Nc                 C   s   | j | d S r3   )rf   appendrD   r%   r%   r(   rE      s   z+DefaultCommandExecutor.add_failure_detectorc                 C   r7   r3   )rg   r&   r%   r%   r(   rM      r8   z$DefaultCommandExecutor.command_retryc                 C   r7   r3   )rj   r&   r%   r%   r(   rG      r8   z&DefaultCommandExecutor.active_databaserH   c                 C   s^   |\}}| j }|| _ |d ur+||ur-t|||d | jt|| j | fi | j d S d S d S )N)	fail_fromfail_toreason)rj   r    ri   dispatchr   rl   )r'   rH   databasert   
old_activer%   r%   r(   rG      s&   c                 C   r7   r3   rk   r&   r%   r%   r(   rI      r8   z$DefaultCommandExecutor.active_pubsubrJ   c                 C   r9   r3   rx   rK   r%   r%   r(   rI      r:   c                 C   r7   r3   )rh   r&   r%   r%   r(   rL      r8   z1DefaultCommandExecutor.failover_strategy_executorc                    s    fdd} | S )Nc                     s"   j jj i }   | S r3   )rj   clientrQ   _register_command_executionresponserO   rP   r'   r%   r(   callback   s   
z8DefaultCommandExecutor.execute_command.<locals>.callback_execute_with_failure_detection)r'   rO   rP   r~   r%   r}   r(   rQ      s   z&DefaultCommandExecutor.execute_commandrR   c                    s    fdd} | S )Nc                     sd   j j "}  D ]\}}| j|i | q	|  }  |W  d    S 1 s+w   Y  d S r3   )rj   ry   pipelinerQ   executerz   )pipecommandrP   r|   rR   r'   r%   r(   r~     s   
$z9DefaultCommandExecutor.execute_pipeline.<locals>.callbackr   )r'   rR   r~   r%   r   r(   rS     s   	z'DefaultCommandExecutor.execute_pipelinerT   c                    s    fdd} |S )Nc                     s*   j jjgR i  } d | S Nr%   )rj   ry   rT   rz   r{   rP   r'   rT   rU   r%   r(   r~     s   
z<DefaultCommandExecutor.execute_transaction.<locals>.callbackr   )r'   rT   rU   rP   r~   r%   r   r(   rV     s   
z*DefaultCommandExecutor.execute_transactionc                    s    fdd} |S )Nc                      s*   j d u rjjjdi  _  _d S r   )rk   rj   ry   rJ   rl   r%   rN   r'   r%   r(   r~      s   
z/DefaultCommandExecutor.pubsub.<locals>.callbackr   )r'   rN   r~   r%   r   r(   rJ     s   
zDefaultCommandExecutor.pubsubrW   c                    s$    fdd}j |g R  S )Nc                     s(   t j} |  i }  |S r3   )getattrrI   rz   )methodr|   rO   rN   rW   r'   r%   r(   r~   )  s   
z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callbackr   )r'   rW   rO   rN   r~   r%   r   r(   rX   (  s   z,DefaultCommandExecutor.execute_pubsub_methodr   c                    s    fdd} |S )Nc                      s   j jfi  S r3   )rk   run_in_threadr%   rN   r'   rY   r%   r(   r~   2  s   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callbackr   )r'   rY   rN   r~   r%   r   r(   rZ   1  s   
z)DefaultCommandExecutor.execute_pubsub_runr%   r~   cmdsc                    s.    fddj fddfddS )zO
        Execute a commands execution callback with failure detection.
        c                      s        S r3   )_check_active_databaser%   )r~   r'   r%   r(   wrapper<  s   zGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapperc                      s     S r3   r%   r%   )r   r%   r(   <lambda>B  s    zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>c                    s   j | g R  S r3   )_on_command_fail)error)r   r'   r%   r(   r   C  s    )rg   call_with_retry)r'   r~   r   r%   )r~   r   r'   r   r(   r   7  s
   
z6DefaultCommandExecutor._execute_with_failure_detectionc                 G   s   | j t|| d S r3   )ri   ru   r   )r'   r   rO   r%   r%   r(   r   F  s   z'DefaultCommandExecutor._on_command_failc                 C   sX   | j du s| j jjtjks| jdkr(| jt kr*| j	
 tjf| _|   dS dS dS )zB
        Checks if active a database needs to be updated.
        Nr   )rj   circuitstateCBStateCLOSEDr5   r=   r   r<   rh   r   r   	AUTOMATICrG   r>   r&   r%   r%   r(   r   I  s   

z-DefaultCommandExecutor._check_active_databasecmdc                 C   s   | j D ]}|| qd S r3   )rf   register_command_execution)r'   r   detectorr%   r%   r(   rz   [  s   
z2DefaultCommandExecutor._register_command_executionc                 C   s4   t | j}t }t }| jt|gt||gi dS )z0
        Registers necessary listeners.
        N)r   rf   r   r   ri   register_listenersr   r   )r'   failure_listenerresubscribe_listenerclose_connection_listenerr%   r%   r(   rm   _  s   
z.DefaultCommandExecutor._setup_event_dispatcher)r#   r   )r%   ).r,   r-   r.   r   r   r   r   r   r   r!   r   r   r?   r0   r6   r/   rA   rB   rE   rM   r	   r   rG   r1   r
   r   r   rI   r   rL   rQ   r[   rS   r   r   rV   rJ   r\   rX   rZ   r   r   r   rz   rm   __classcell__r%   r%   ro   r(   r]      sf    	*
	
	r]   N)2abcr   r   r   r   typingr   r   r   r	   r
   redis.clientr   r   r   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.configr   redis.multidb.databaser   r   r   redis.multidb.eventr   r   r   r   redis.multidb.failoverr   r   r   r   r   redis.multidb.failure_detectorr   redis.observability.attributesr   redis.observability.recorderr    redis.retryr!   r"   r2   r@   r]   r%   r%   r%   r(   <module>   s$    \