o
    iT                     @   sn  d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3 e4e5Z6e3G dd de#e"Z7de%fddZ8G dd de#e"Z9G dd dZ:dS )    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)GeoFailoverReason)ChannelT
EncodableTKeyT)experimentalc                   @   sh  e Zd ZdZdefddZdCddZd	d
 Zdd Zde	fddZ
deddfddZ	dDdedefddZdedefddZdefddZdedefddZd efd!d"Zd#efd$d%Zd&d' Zd(d) Zdd*dd+d,ed-geeee f f d.ed/ee  d0ed1ee f
d2d3Z!d4d5 Z"de#e$ef fd6d7Z%d8d9 Z&dedefd:d;Z'd<e(d=e)d>e)fd?d@Z*dAdB Z+dS )EMultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j|j	|j
| _|js%| n|j| _|jd u r2| n|j| _| j| j |j| _|j| _|j| _| jtg t| j| j| j| j|j|j| j| jd| _d| _t ! | _"t# | _$|| _%d | _&g | _'d | _(d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF))r&   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_delay_health_check_policyr%   default_failure_detectors_failure_detectorsr(   default_failover_strategy_failover_strategyset_databasesr,   _auto_fallback_intervalr+   _event_dispatcherr'   _command_retryupdate_supported_errorsConnectionRefusedErrorr	   r)   r*   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr$    rM   P/home/ubuntu/.local/lib/python3.10/site-packages/redis/asyncio/multidb/client.py__init__)   sN   






zMultiDBClient.__init__rL   returnc                    s   | j s|  I d H  | S N)rC   
initializerL   rM   rM   rN   
__aenter__W   s   zMultiDBClient.__aenter__c                    s:   | j r	| j   | jr| j  | jD ]}|  qd S rQ   )rI   cancelrK   rJ   )rL   exc_type	exc_value	tracebackhc_taskrM   rM   rN   	__aexit__\   s   



zMultiDBClient.__aexit__c                    s   |   I dH  t| j| j| j| _d}| jD ]\}}|j	
| j |j	jtjkr4|s4|| j_d}q|s;tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkrD   create_taskrG   run_recurring_asyncr2   _check_databases_healthrI   r-   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDrB   _active_databaser   rC   )rL   is_active_db_founddatabaseweightrM   rM   rN   rR   d   s(   
zMultiDBClient.initializec                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r-   rS   rM   rM   rN   get_databases   s   zMultiDBClient.get_databasesrg   Nc                    s   d}| j D ]\}}||krd} nq|std| |I dH  |jjtjkr?| j dd \}}| j	|t
jI dH  dS td)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r-   
ValueError_check_db_healthr_   rb   rc   rd   	get_top_nrB   set_active_databaser   MANUALr   )rL   rg   existsexisting_db_highest_weighted_dbrM   rM   rN   ro      s&   
z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                    s  |j dtdt di |jr| jjj|jfi |j }n"|jr7|jtdt d | jjj|jd}n
| jjdi |j }|j	du rJ|
 n|j	}t|||j|jd}z
| |I dH  W n tym   |sk Y nw | jdd \}}| j||j | ||I dH  dS )	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)connection_poolN)clientr_   rh   health_check_urlrk   rM   )client_kwargsupdater   r   from_urlrH   client_class	from_pool	set_retryr_   default_circuit_breakerr   rh   r{   rm   r   r-   rn   add_change_active_database)rL   r$   ru   rz   r_   rg   rt   highest_weightrM   rM   rN   add_database   sD   
zMultiDBClient.add_databasenew_databasehighest_weight_databasec                    s>   |j |j kr|jjtjkr| j|tjI d H  d S d S d S rQ   )	rh   r_   rb   rc   rd   rB   ro   r   	AUTOMATIC)rL   r   r   rM   rM   rN   r      s   z%MultiDBClient._change_active_databasec                    sZ   | j |}| j dd \}}||kr)|jjtjkr+| j|t	j
I dH  dS dS dS )z<
        Removes a database from the database list.
        rk   r   N)r-   removern   r_   rb   rc   rd   rB   ro   r   rp   )rL   rg   rh   rt   r   rM   rM   rN   remove_database   s   zMultiDBClient.remove_databaserh   c                    sp   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| ||I dH  dS )z<
        Updates a database from the database list.
        NTrj   rk   r   )r-   rl   rn   update_weightrh   r   )rL   rg   rh   rq   rr   rs   rt   r   rM   rM   rN   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r9   append)rL   r   rM   rM   rN   add_failure_detector  s   z"MultiDBClient.add_failure_detectorhealthcheckc              	      sN   | j 4 I dH  | j| W d  I dH  dS 1 I dH s w   Y  dS )z:
        Adds a new health check to the database.
        N)rF   r0   r   )rL   r   rM   rM   rN   add_health_check  s   .zMultiDBClient.add_health_checkc                    s.   | j s|  I dH  | jj|i |I dH S )zB
        Executes a single command and return its result.
        N)rC   rR   rB   execute_commandrL   argsoptionsrM   rM   rN   r     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerS   rM   rM   rN   pipeline  s   zMultiDBClient.pipelineF
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   s:   | j s|  I dH  | jj|g|R |||dI dH S )z3
        Executes callable as transaction.
        Nr   )rC   rR   rB   execute_transaction)rL   r   r   r   r   r   rM   rM   rN   transaction#  s   zMultiDBClient.transactionc                    s&   | j s|  I dH  t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)rC   rR   PubSub)rL   kwargsrM   rM   rN   pubsub9  s   zMultiDBClient.pubsubc                    s   z3i  g | _ | jD ]\}}t| |}| |< | j | q
tjtj| j ddi| jdI dH }W n tj	yA   t	dw  fddt
| j |D }| D ]'\}}t|trq|j}tj|j_tjd|jd	 d
||< qSt|trzd
||< qS|S )zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsT)timeoutNz4Health check execution exceeds health_check_intervalc                    s   i | ]	\}} | |qS rM   rM   ).0taskresult
task_to_dbrM   rN   
<dictcomp>\  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>z%Health check failed, due to exception)exc_infoF)rJ   r-   rD   r\   rm   r   wait_forgatherr2   TimeoutErrorzipitems
isinstancer   rg   rc   OPENr_   rb   loggerdebugoriginal_exception	Exception)rL   rg   rs   r   results
db_resultsr   unhealthy_dbrM   r   rN   r^   D  sB   





z%MultiDBClient._check_databases_healthc                    s   |   I dH }d}| jjtjkrd| v}n!| jjtjkr,t| t|d k}n| jjtj	kr9d| v }|sDt
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )r^   rH   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rL   r   
is_healthyrM   rM   rN   r[   p  s    z+MultiDBClient._perform_initial_health_checkc                    sX   | j | j|I dH }|s|jjtjkrtj|j_|S |r*|jjtjkr*tj|j_|S )zO
        Runs health checks on the given database until first failure.
        N)r7   executer0   r_   rb   rc   r   rd   )rL   rg   r   rM   rM   rN   rm     s   


zMultiDBClient._check_db_healthr_   	old_state	new_statec                 C   s   t  }|tjkrt | |j| _d S |tjkr0|tj	kr0t
d|j d |tt| |tjkrF|tjkrHt
d|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rD   get_running_looprc   	HALF_OPENr\   rm   rg   rK   rd   r   r   warning
call_laterr
   _half_open_circuitinfo)rL   r_   r   r   looprM   rM   rN   ra     s   

z/MultiDBClient._on_circuit_state_change_callbackc                    s&   | j jr| j jj I d H  d S d S rQ   )rB   active_databaserz   acloserS   rM   rM   rN   r     s   zMultiDBClient.aclose)rL   r#   rP   r#   )T),__name__
__module____qualname____doc__r   rO   rT   rZ   rR   r   ri   r   ro   r   boolr   r   r   floatr   r   r   r   r   r   r   r   r   r   r   r!   r   strr   r   dictr   r^   r[   rm   r   rc   ra   r   rM   rM   rM   rN   r#   "   sf    
.$
1
	

,
r#   r_   c                 C   s   t j| _d S rQ   )rc   r   rb   )r_   rM   rM   rN   r        r   c                   @   s   e Zd ZdZdefddZdddZd	d
 Zdd Zdd Z	de
fddZdefddZdddZdddZd ddZdd Zdee fddZdS )!r   zG
    Pipeline implementation for multiple logical Redis databases.
    rz   c                 C   s   g | _ || _d S rQ   )_command_stack_client)rL   rz   rM   rM   rN   rO     s   
zPipeline.__init__rL   rP   c                       | S rQ   rM   rS   rM   rM   rN   rT        zPipeline.__aenter__c                    s*   |   I d H  | j|||I d H  d S rQ   )resetr   rZ   rL   rV   rW   rX   rM   rM   rN   rZ     s   zPipeline.__aexit__c                 C   s   |    S rQ   )_async_self	__await__rS   rM   rM   rN   r     r   zPipeline.__await__c                    r   rQ   rM   rS   rM   rM   rN   r     r   zPipeline._async_selfc                 C   s
   t | jS rQ   )r   r   rS   rM   rM   rN   __len__  s   
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTrM   rS   rM   rM   rN   __bool__  s   zPipeline.__bool__Nc                    s   g | _ d S rQ   )r   rS   rM   rM   rN   r     s   
zPipeline.resetc                    s   |   I dH  dS )zClose the pipelineN)r   rS   rM   rM   rN   r     s   zPipeline.aclosec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   rM   rM   rN   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   rL   r   r   rM   rM   rN   r     s   zPipeline.execute_commandc                    sV   | j js| j  I dH  z| j jt| jI dH W |  I dH  S |  I dH  w )z0Execute all the commands in the current pipelineN)r   rC   rR   rB   execute_pipelinetupler   r   rS   rM   rM   rN   r     s   
 zPipeline.execute)rL   r   rP   r   rP   N)rP   r   )r   r   r   r   r#   rO   rT   rZ   r   r   intr   r   r   r   r   r   r   r   r   r   rM   rM   rM   rN   r     s    



r   c                   @   s   e Zd ZdZdefddZd&ddZd'd	d
Zdd Ze	de
fddZdefddZdedefddZdefddZdedefddZdd Z	d(de
dee fdd Zdd!d"d#eddfd$d%ZdS ))r   z2
    PubSub object for multi database client.
    rz   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrM   )r   rB   r   )rL   rz   r   rM   rM   rN   rO      s   zPubSub.__init__rP   c                    r   rQ   rM   rS   rM   rM   rN   rT     r   zPubSub.__aenter__Nc                    s   |   I d H  d S rQ   )r   r   rM   rM   rN   rZ     s   zPubSub.__aexit__c                    s   | j jdI d H S )Nr   r   rB   execute_pubsub_methodrS   rM   rM   rN   r     s   zPubSub.aclosec                 C   s   | j jjjS rQ   )r   rB   active_pubsub
subscribedrS   rM   rM   rN   r     s   zPubSub.subscribedr   c                    s   | j jjdg|R  I d H S )Nr   r   rL   r   rM   rM   rN   r     s   zPubSub.execute_commandr   c                    $   | j jjdg|R i |I dH S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr   r   rM   rM   rN   r        zPubSub.psubscribec                       | j jjdg|R  I dH S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr   r   rM   rM   rN   r   )     zPubSub.punsubscribec                    r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr   r   rM   rM   rN   r   2  r   zPubSub.subscribec                    r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr   r   rM   rM   rN   r   >  r   zPubSub.unsubscribeF        ignore_subscribe_messagesr   c                    s   | j jjd||dI dH S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r   r   Nr   )rL   r   r   rM   rM   rN   r   G  s   
zPubSub.get_messageg      ?)exception_handlerpoll_timeoutr   c                   s   | j jj||| dI dH S )a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer   r   N)r   rB   execute_pubsub_run)rL   r   r   rM   rM   rN   runW  s   z
PubSub.run)rP   r   r   )Fr   )r   r   r   r   r#   rO   rT   rZ   r   propertyr   r   r    r   r   r   r   r   r   r   r   r   r   r   r   rM   rM   rM   rN   r     s4    

	

r   );rD   loggingtypingr   r   r   r   r   r   redis.asyncio.clientr   &redis.asyncio.multidb.command_executorr	   redis.asyncio.multidb.configr
   r   r   r   redis.asyncio.multidb.databaser   r   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.asyncio.retryr   redis.backgroundr   redis.backoffr   redis.commandsr   r   redis.multidb.circuitr   r   rc   redis.multidb.exceptionr   r   r   redis.observability.attributesr   redis.typingr   r    r!   redis.utilsr"   	getLoggerr   r   r#   r   r   r   rM   rM   rM   rN   <module>   s8     
   D