o
    iU                     @   sv  d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 e 4e5Z6e3G dd deeZ7defddZ8G dd deeZ9G dd dZ:dS )    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)	NoBackoff)PubSubWorkerThread)CoreCommandsRedisModuleCommands)MaintNotificationsConfig)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)Database	DatabasesSyncDatabase)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)GeoFailoverReason)Retry)experimentalc                   @   s(  e Zd ZdZdefddZdd Zdefdd	Zd
e	ddfddZ
	d8dedefddZde	de	fddZd
efddZd
e	defddZdefddZdefdd Zd!d" Zd#d$ Zd%ed&gdf fd'd(Zd)d* Zd
e	defd+d,Zdeeef fd-d.Zd/d0 Zd1e d2e!d3e!fd4d5Z"d6d7 Z#dS )9MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j|j	|j
| _|js%| n|j| _|jd u r2| n|j| _| j| j |j| _|j| _|j| _| jtf t| j| j| j| j|j|j| j| jd| _d| _t ! | _"t# | _$|| _%d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)&r$   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policyr#   default_failure_detectors_failure_detectorsr&   default_failover_strategy_failover_strategyset_databasesr*   _auto_fallback_intervalr)   _event_dispatcherr%   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r'   r(   command_executorinitialized	threadingRLock_hc_lockr   _bg_scheduler_config)selfr"    rH   H/home/ubuntu/.local/lib/python3.10/site-packages/redis/multidb/client.py__init__+   sH   






zMultiDBClient.__init__c                 C   sp   |    | j| j| j d}| jD ]\}}|j| j |jj	t
jkr,|s,|| j_d}q|s3tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        FTz4Initial connection failed - no active database foundN)_perform_initial_health_checkrE   run_recurringr0   _check_databases_healthr+   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr@   _active_databaser   rA   )rG   is_active_db_founddatabaseweightrH   rH   rI   
initializeU   s"   
zMultiDBClient.initializereturnc                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r+   rG   rH   rH   rI   get_databasesw   s   zMultiDBClient.get_databasesrV   Nc                 C   sv   d}| j D ]\}}||krd} nq|std| | |jjtjkr7| j dd \}}|tj	f| j
_dS td)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r+   
ValueError_check_db_healthrN   rQ   rR   rS   	get_top_nr   MANUALr@   active_databaser   )rG   rV   existsexisting_db_highest_weighted_dbrH   rH   rI   set_active_database}   s$   
z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                 C   s  t dt d|jd< d|jvrtdd|jd< |jr(| jjj|jfi |j}n"|jr@|jt dt d | jjj|jd}n
| jjdi |j}|j	du rS|
 n|j	}t|||j|jd	}z| | W n tys   |sq Y nw | jd
d \}}| j||j | || dS )z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        r   )retriesbackoffretrymaint_notifications_configF)enabled)connection_poolN)clientrN   rW   health_check_urlr]   rH   )r   r	   client_kwargsr   from_urlrF   client_class	from_pool	set_retryrN   default_circuit_breakerr   rW   rp   r_   r   r+   r`   add_change_active_database)rG   r"   rh   ro   rN   rV   rf   highest_weightrH   rH   rI   add_database   sH   

zMultiDBClient.add_databasenew_databasehighest_weight_databasec                 C   s4   |j |j kr|jjtjkr|tjf| j_d S d S d S N)	rW   rN   rQ   rR   rS   r   	AUTOMATICr@   rb   )rG   r{   r|   rH   rH   rI   rx      s   z%MultiDBClient._change_active_databasec                 C   sP   | j |}| j dd \}}||kr$|jjtjkr&|tjf| j	_
dS dS dS )z<
        Removes a database from the database list.
        r]   r   N)r+   remover`   rN   rQ   rR   rS   r   ra   r@   rb   )rG   rV   rW   rf   ry   rH   rH   rI   remove_database   s   zMultiDBClient.remove_databaserW   c                 C   sh   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| || dS )z<
        Updates a database from the database list.
        NTr\   r]   r   )r+   r^   r`   update_weightrW   rx   )rG   rV   rW   rc   rd   re   rf   ry   rH   rH   rI   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r7   append)rG   r   rH   rH   rI   add_failure_detector   s   z"MultiDBClient.add_failure_detectorhealthcheckc                 C   s8   | j  | j| W d   dS 1 sw   Y  dS )z:
        Adds a new health check to the database.
        N)rD   r.   r   )rG   r   rH   rH   rI   add_health_check  s   "zMultiDBClient.add_health_checkc                 O   s    | j s|   | jj|i |S )zB
        Executes a single command and return its result.
        )rA   rX   r@   execute_commandrG   argsoptionsrH   rH   rI   r     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerZ   rH   rH   rI   pipeline  s   zMultiDBClient.pipelinefuncr   c                 O   s&   | j s|   | jj|g||R  S )z3
        Executes callable as transaction.
        )rA   rX   r@   execute_transaction)rG   r   watchesr   rH   rH   rI   transaction  s   zMultiDBClient.transactionc                 K   s   | j s|   t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )rA   rX   PubSub)rG   kwargsrH   rH   rI   pubsub%  s   zMultiDBClient.pubsubc                 C   sP   | j | j|}|s|jjtjkrtj|j_|S |r&|jjtjkr&tj|j_|S )zO
        Runs health checks on the given database until first failure.
        )r5   executer.   rN   rQ   rR   OPENrS   )rG   rV   
is_healthyrH   rH   rI   r_   0  s   

zMultiDBClient._check_db_healthc                    s   t tjd`  fddjD }i }z>t|jdD ]4}z|| }| ||< W q tyR } z|j}tj	|j
_tjd|jd d||< W Y d}~qd}~ww W n ty_   td	w W d   |S 1 skw   Y  |S )
zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workersc                    s    i | ]\}}  j||qS rH   )submitr_   ).0rV   re   executorrG   rH   rI   
<dictcomp>G  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>)timeoutz%Health check failed, due to exception)exc_infoFNz4Health check execution exceeds health_check_interval)r   lenr+   r   r0   resultr   rV   rR   r   rN   rQ   loggerdebugoriginal_exceptionTimeoutError)rG   futuresresultsfuturerV   eunhealthy_dbrH   r   rI   rM   @  sB   


z%MultiDBClient._check_databases_healthc                 C   s   |   }d}| jjtjkrd| v}n!| jjtjkr(t| t|d k}n| jjtj	kr5d| v }|s@t
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        TF   z:Initial health check failed. Initial health check policy: N)rM   rF   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumr   ONE_AVAILABLEr   )rG   r   r   rH   rH   rI   rK   e  s   z+MultiDBClient._perform_initial_health_checkrN   	old_state	new_statec                 C   s   |t jkr| |j d S |t jkr)|t jkr)td|j d | j	t
t| |t jkr?|t jkrAtd|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rR   	HALF_OPENr_   rV   rS   r   r   warningrE   run_oncer   _half_open_circuitinfo)rG   rN   r   r   rH   rH   rI   rP   }  s   
z/MultiDBClient._on_circuit_state_change_callbackc                 C   s.   | j r| j   | jjr| jjj  dS dS )z:
        Closes the client and all its resources.
        N)rE   stopr@   rb   ro   closerZ   rH   rH   rI   r     s
   
zMultiDBClient.close)T)$__name__
__module____qualname____doc__r   rJ   rX   r   r[   r   rg   r   boolrz   rx   r   r   floatr   r   r   r   r   r   r   r   r   r   r_   dictrM   rK   r   rR   rP   r   rH   rH   rH   rI   r!   $   sH    *"
8
		%
r!   rN   c                 C   s   t j| _d S r}   )rR   r   rQ   )rN   rH   rH   rI   r        r   c                   @   s   e Zd ZdZdefddZdddZdd	 Zd
d Zde	fddZ
defddZdddZdddZdddZdd Zdee fddZdS )r   zG
    Pipeline implementation for multiple logical Redis databases.
    ro   c                 C   s   g | _ || _d S r}   )_command_stack_client)rG   ro   rH   rH   rI   rJ     s   
zPipeline.__init__rY   c                 C      | S r}   rH   rZ   rH   rH   rI   	__enter__     zPipeline.__enter__c                 C      |    d S r}   reset)rG   exc_type	exc_value	tracebackrH   rH   rI   __exit__  r   zPipeline.__exit__c                 C   $   z|    W d S  ty   Y d S w r}   r   	ExceptionrZ   rH   rH   rI   __del__  s
   zPipeline.__del__c                 C   s
   t | jS r}   )r   r   rZ   rH   rH   rI   __len__     
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTrH   rZ   rH   rH   rI   __bool__  s   zPipeline.__bool__Nc                 C   s
   g | _ d S r}   )r   rZ   rH   rH   rI   r     r   zPipeline.resetc                 C   s   |    dS )zClose the pipelineNr   rZ   rH   rH   rI   r        zPipeline.closec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   rH   rH   rI   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   rG   r   r   rH   rH   rI   r     s   zPipeline.execute_commandc                 C   s<   | j js	| j   z| j jt| jW |   S |   w )z0Execute all the commands in the current pipeline)r   rA   rX   r@   execute_pipelinetupler   r   rZ   rH   rH   rI   r     s   
zPipeline.execute)rY   r   rY   N)r   r   r   r   r!   rJ   r   r   r   intr   r   r   r   r   r   r   r   r   r   rH   rH   rH   rI   r     s    



r   c                   @   s   e Zd ZdZdefddZd.ddZd/d	d
Zd/ddZd/ddZ	e
defddZdd Zdd Zdd Zdd Zdd Zdd Zdd Z	 d0d!ed"efd#d$Z	 d0d!ed"efd%d&Z	 			d1d'ed(ed)ee d*edd+f
d,d-ZdS )2r   z2
    PubSub object for multi database client.
    ro   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrH   )r   r@   r   )rG   ro   r   rH   rH   rI   rJ     s   zPubSub.__init__rY   c                 C   r   r}   rH   rZ   rH   rH   rI   r     r   zPubSub.__enter__Nc                 C   r   r}   r   rZ   rH   rH   rI   r     s
   zPubSub.__del__c                 C   s   | j jdS )Nr   r   r@   execute_pubsub_methodrZ   rH   rH   rI   r     s   zPubSub.resetc                 C   r   r}   r   rZ   rH   rH   rI   r      r   zPubSub.closec                 C   s   | j jjjS r}   )r   r@   active_pubsub
subscribedrZ   rH   rH   rI   r     r   zPubSub.subscribedc                 G      | j jjdg|R  S )Nr   r   rG   r   rH   rH   rI   r     s
   zPubSub.execute_commandc                 O      | j jjdg|R i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   rH   rH   rI   r        zPubSub.psubscribec                 G   r   )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   rH   rH   rI   r     
   zPubSub.punsubscribec                 O   r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   rH   rH   rI   r   !  r   zPubSub.subscribec                 G   r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   rH   rH   rI   r   -  s   zPubSub.unsubscribec                 O   r   )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   rH   rH   rI   r   4  r   zPubSub.ssubscribec                 G   r   )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   rH   rH   rI   r   @  r   zPubSub.sunsubscribeF        ignore_subscribe_messagesr   c                 C      | j jjd||dS )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   rG   r   r   rH   rH   rI   r   I  
   
zPubSub.get_messagec                 C   r   )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   rH   rH   rI   r   Y  r   zPubSub.get_sharded_message
sleep_timedaemonexception_handlersharded_pubsubr
   c                 C   s   | j jj|||| |dS )N)r   r   r   r   )r   r@   execute_pubsub_run)rG   r   r   r   r   rH   rH   rI   run_in_threadi  s   zPubSub.run_in_thread)rY   r   r   )Fr   )r   FNF)r   r   r   r   r!   rJ   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rH   rH   rH   rI   r     sV    


	
	


r   );loggingrB   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr   redis.backoffr	   redis.clientr
   redis.commandsr   r   redis.maint_notificationsr   redis.multidb.circuitr   r   rR   redis.multidb.command_executorr   redis.multidb.configr   r   r   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.observability.attributesr   redis.retryr   redis.utilsr    	getLoggerr   r   r!   r   r   r   rH   rH   rH   rI   <module>   s:    
  wC