o
    i]                     @   s  U d Z ddlmZ ddlmZmZmZmZ ddlmZm	Z	m
Z
mZmZ ddlmZmZ ddlmZ ddlmZ ddlmZmZ erSdd	lmZ dd
lmZ ddlmZ daee ed< dZdZ edgddd							dkde!de"dee! dee# dee! dee$ dee% dee# dee# ddfddZ&dd de"ddfd!d"Z'dld#d$Z(d%ed  ddfd&d'Z)d(e!ddfd)d*Z*d(e!de"ddfd+d,Z+		dmd-ee d.ee$ ddfd/d0Z,d1e!d2e!d3e%ddfd4d5Z-d(e!ddfd6d7Z.							8dndee! dee# d9ee! d:ee# d.ee$ dee# d;e%ddfd<d=Z/		dmd>ed?ee! d@ee% ddfdAdBZ0edCgdDdd			dodEe"dFee! dGee! dCee! ddf
dHdIZ1edCgdDdd		dmdGee! dCee! ddfdJdKZ2de!de#d9e!d:e#d2e!ddfdLdMZ3	dpdNee
 fdOdPZ4dldQdRZ5	dpdSed(ee! ddfdTdUZ6	dpdVe#dWee	 ddfdXdYZ7dZe#ddfd[d\Z8d]d^d_d^dWeddfd`daZ9dee fdbdcZ:dedd fdedfZ;dldgdhZ<de%fdidjZ=dS )qaT  
Simple, clean API for recording observability metrics.

This module provides a straightforward interface for Redis core code to record
metrics without needing to know about OpenTelemetry internals.

Usage in Redis core code:
    from redis.observability.recorder import record_operation_duration

    start_time = time.monotonic()
    # ... execute Redis command ...
    record_operation_duration(
        command_name='SET',
        duration_seconds=time.monotonic() - start_time,
        server_address='localhost',
        server_port=6379,
        db_namespace='0',
        error=None
    )
    )datetime)TYPE_CHECKINGCallableListOptional)AttributeBuilder	CSCReason	CSCResultGeoFailoverReasonPubSubDirection)CloseReasonRedisMetricsCollector)get_observability_instance)!get_observables_registry_instance)deprecated_argsstr_if_bytes)ConnectionPoolInterface)SyncDatabase)
OTelConfigN_metrics_collectorconnection_count	csc_items
batch_sizezXThe batch_size argument is no longer used and will be removed in the next major version.z7.2.1)args_to_warnreasonversioncommand_nameduration_secondsserver_addressserver_portdb_namespaceerroris_blockingretry_attemptsreturnc	           	      C   sT   t du rt a t du rdS zt j| |||||||||d
 W dS  ty)   Y dS w )a  
    Record a Redis command execution duration.

    This is a simple, clean API that Redis core code can call directly.
    If observability is not enabled, this returns immediately with zero overhead.

    Args:
        command_name: Redis command name (e.g., 'GET', 'SET')
        duration_seconds: Command execution time in seconds
        server_address: Redis server address
        server_port: Redis server port
        db_namespace: Redis database index
        error: Exception if command failed, None if successful
        is_blocking: Whether the operation is a blocking command
        batch_size: Number of commands in batch (for pipelines/transactions)
        retry_attempts: Number of retry attempts made

    Example:
        >>> start = time.monotonic()
        >>> # ... execute command ...
        >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
    N)
r   r   r   r   r    
error_typenetwork_peer_addressnetwork_peer_portr"   r#   )r   _get_or_create_collectorrecord_operation_duration	Exception)	r   r   r   r   r    r!   r"   r   r#    r+   P/home/ubuntu/.local/lib/python3.10/site-packages/redis/observability/recorder.pyr)   1   s(   )r)   connection_poolr   c                 C   D   t du rt a t du rdS z
t j| |d W dS  ty!   Y dS w )as  
    Record connection creation time.

    Args:
        connection_pool: Connection pool implementation
        duration_seconds: Time taken to create connection in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... create connection ...
        >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    Nr-   r   )r   r(   record_connection_create_timer*   r/   r+   r+   r,   r0   s   s   r0   c                  C   J   t du rt a t du rdS dd } z	t j| d W dS  ty$   Y dS w )zB
    Initialize observable gauge for connection count metric.
    Nc                 S   .   t  }|t}g }|D ]}||  q|S N)r   getCONNECTION_COUNT_REGISTRY_KEYextend__observables_registry	callbacksobservationscallbackr+   r+   r,   observable_callback      
z2init_connection_count.<locals>.observable_callbackr<   )r   r(   init_connection_countr*   r=   r+   r+   r,   r@         
r@   connection_poolsc                    sb   t du rt a t du rdS zddlm   fdd}t }|t| W dS  ty0   Y dS w )zG
    Add connection pools to connection count observable registry.
    Nr   Observationc                     s6   g } D ]}|  D ]\}}|  ||d q
q| S )N
attributes)get_connection_countappend)r;   r-   countrG   rE   rC   r+   r,   connection_count_callback   s   zBregister_pools_connection_count.<locals>.connection_count_callback)r   r(   opentelemetry.metricsrE   r   registerr5   r*   )rC   rL   r9   r+   rK   r,   register_pools_connection_count   s   
rO   	pool_namec                 C   B   t du rt a t du rdS z	t j| d W dS  ty    Y dS w )z
    Record a connection timeout event.

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_timeout('ConnectionPool<localhost:6379>')
    NrP   )r   r(   record_connection_timeoutr*   rR   r+   r+   r,   rS         rS   c                 C   r.   )at  
    Record time taken to obtain a connection from the pool.

    Args:
        pool_name: Connection pool identifier
        duration_seconds: Wait time in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... wait for connection from pool ...
        >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    NrP   r   )r   r(   record_connection_wait_timer*   rU   r+   r+   r,   rV      s   rV   close_reasonr%   c                 C   r.   )a  
    Record a connection closed event.

    Args:
        close_reason: Reason for closing (e.g. 'error', 'application_close')
        error_type: Error type if closed due to error

    Example:
        >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
    NrW   r%   )r   r(   record_connection_closedr*   rX   r+   r+   r,   rY     s   rY   connection_namemaint_notificationrelaxedc                 C   F   t du rt a t du rdS zt j| ||d W dS  ty"   Y dS w )ab  
    Record a connection timeout relaxation event.

    Args:
        connection_name: Connection identifier
        maint_notification: Maintenance notification type
        relaxed: True to count up (relaxed), False to count down (unrelaxed)

    Example:
        >>> record_connection_relaxed_timeout('Connection<localhost:6379>', 'MOVING', True)
    NrZ   r[   r\   )r   r(   !record_connection_relaxed_timeoutr*   r^   r+   r+   r,   r_   *  s   r_   c                 C   rQ   )z
    Record a connection handoff event (e.g., after MOVING notification).

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_handoff('ConnectionPool<localhost:6379>')
    NrR   )r   r(   record_connection_handoffr*   rR   r+   r+   r,   r`   K  rT   r`   Tr&   r'   is_internalc              	   C   sN   t du rt a t du rdS zt j| ||||||d W dS  ty&   Y dS w )a  
    Record error count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        error_type: Error type (Exception)
        retry_attempts: Retry attempts
        is_internal: Whether the error is internal (e.g., timeout, network error)

    Example:
        >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
    Nr   r   r&   r'   r%   r#   ra   )r   r(   record_error_countr*   rb   r+   r+   r,   rc   f  s"   	rc   	directionchannelshardedc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )a5  
    Record a PubSub message (published or received).

    Args:
        direction: Message direction ('publish' or 'receive')
        channel: Pub/Sub channel name
        sharded: True if sharded Pub/Sub channel

    Example:
        >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
    N)rd   re   rf   )r   r(   _get_confighide_pubsub_channel_namesrecord_pubsub_messager*   )rd   re   rf   effective_channelconfigr+   r+   r,   ri     s$   ri   consumer_namez[The consumer_name argument is no longer used and will be removed in the next major version.lag_secondsstream_nameconsumer_groupc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )z
    Record the lag of a streaming message.

    Args:
        lag_seconds: Lag in seconds
        stream_name: Stream name
        consumer_group: Consumer group name
        consumer_name: Consumer name
    Nrm   rn   ro   )r   r(   rg   hide_stream_namesrecord_streaming_lagr*   )rm   rn   ro   rl   effective_stream_namerk   r+   r+   r,   rr     s$   rr   c                 C   s\  t du rt a t du rdS | sdS zt  }t }|duo!|j}t| trg| 	 D ]8\}}|r3dnt
|}|D ])}	|	D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q=q9q+W dS | D ]7}t
|d }|rudn|}|d D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q{qiW dS  ty   Y dS w )aQ  
    Record streaming lag from XREAD/XREADGROUP response.

    Parses the response and calculates lag for each message based on message ID timestamp.

    Args:
        response: Response from XREAD/XREADGROUP command
        consumer_group: Consumer group name (for XREADGROUP)
        consumer_name: Consumer name (for XREADGROUP)
    N-g        i  rp   r      )r   r(   r   now	timestamprg   rq   
isinstancedictitemsr   splitmaxintrr   r*   )responsero   rl   rv   rk   rq   rn   stream_messagesrs   messagesmessage
message_id_rw   rm   stream_entryr+   r+   r,   "record_streaming_lag_from_response  s\   
r   c                 C   sJ   t du rt a t du rdS zt j| ||||d W dS  ty$   Y dS w )a  
    Record a maintenance notification count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')

    Example:
        >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
    Nr   r   r&   r'   r[   )r   r(   record_maint_notification_countr*   r   r+   r+   r,   r   6  s   r   resultc                 C   rQ   )zm
    Record a Client Side Caching (CSC) request.

    Args:
        result: CSC result ('hit' or 'miss')
    Nr   )r   r(   record_csc_requestr*   r   r+   r+   r,   r   ]     r   c                  C   r1   )z;
    Initialize observable gauge for CSC items metric.
    Nc                 S   r2   r3   )r   r4   CSC_ITEMS_REGISTRY_KEYr6   r7   r+   r+   r,   r=     r>   z+init_csc_items.<locals>.observable_callbackr?   )r   r(   init_csc_itemsr*   rA   r+   r+   r,   r   u  rB   r   r<   c                    sd   t du rt a t du rdS ddlm   fdd}zt }|t| W dS  ty1   Y dS w )z
    Adds given callback to CSC items observable registry.

    Args:
        callback: Callback function that returns the cache size
        pool_name: Connection pool name for observability
    Nr   rD   c                      s     t jddgS )NrR   rF   )r   build_csc_attributesr+   rE   r<   rP   r+   r,   csc_items_callback  s
   
z7register_csc_items_callback.<locals>.csc_items_callback)r   r(   rM   rE   r   rN   r   r*   )r<   rP   r   r9   r+   r   r,   register_csc_items_callback  s   r   rJ   r   c                 C   r.   )z
    Record a Client Side Caching (CSC) eviction.

    Args:
        count: Number of evictions
        reason: Reason for eviction
    NrJ   r   )r   r(   record_csc_evictionr*   r   r+   r+   r,   r     s   r   bytes_savedc                 C   rQ   )z
    Record the number of bytes saved by using Client Side Caching (CSC).

    Args:
        bytes_saved: Number of bytes saved
    Nr   )r   r(   record_csc_network_savedr*   r   r+   r+   r,   r     r   r   	fail_fromr   fail_toc                 C   r]   )z
    Record a geo failover.

    Args:
        fail_from: Database failed from
        fail_to: Database failed to
        reason: Reason for the failover
    Nr   r   r   )r   r(   record_geo_failoverr*   r   r+   r+   r,   r     s   r   c                  C   sj   z!t   } | du s| jjsW dS |  tjtj}t|| jW S  t	y+   Y dS  t
y4   Y dS w )z
    Get or create the global metrics collector.

    Returns:
        RedisMetricsCollector instance if observability is enabled, None otherwise
    N)r   get_provider_managerrk   enabled_telemetryget_meter_provider	get_meterr   
METER_NAMEMETER_VERSIONImportErrorr*   )managermeterr+   r+   r,   r(     s   
r(   r   c                  C   s6   zt   } | du rW dS | jW S  ty   Y dS w )z
    Get the OTel configuration from the observability manager.

    Returns:
        OTelConfig instance if observability is enabled, None otherwise
    N)r   r   rk   r*   )r   r+   r+   r,   rg   "  s   
rg   c                   C   s   da dS )zM
    Reset the global collector (used for testing or re-initialization).
    N)r   r+   r+   r+   r,   reset_collector2  s   r   c                   C   s   t du rt a t duS )zw
    Check if observability is enabled.

    Returns:
        True if metrics are being collected, False otherwise
    N)r   r(   r+   r+   r+   r,   
is_enabled:  s   	r   )NNNNNNN)r$   N)NN)NNNNNNT)NNNr3   )>__doc__r   typingr   r   r   r   redis.observability.attributesr   r   r	   r
   r   redis.observability.metricsr   r   redis.observability.providersr   redis.observability.registryr   redis.utilsr   r   redis.connectionr   redis.multidb.databaser   redis.observability.configr   r   __annotations__r5   r   strfloatr}   r*   boolr)   r0   r@   rO   rS   rV   rY   r_   r`   rc   ri   rr   r   r   r   r   r   r   r   r   r(   rg   r   r   r+   r+   r+   r,   <module>   s   	
=

!
 

!

!

/
('J
(


&



