o
    iB                     @   s  U d Z ddlmZ ddlmZmZmZ ddlmZmZ ddl	m
Z
mZ ddlmZ ddlmZ ddlmZ erIdd	lmZ dd
lmZ ddlmZ daee ed< dZdee fddZded fddZ						dVdededee dee dee dee  dee! dee ddfddZ"dd deddfd!d"Z#dWd#d$Z$d%ed  ddfd&d'Z%d(eddfd)d*Z&d(ededdfd+d,Z'		dXd-ee
 d.ee  ddfd/d0Z(d1ed2ed3e!ddfd4d5Z)d(eddfd6d7Z*	8dYdeded9ed:ed.e ded;e!ddfd<d=Z+		dXd>ed?ee d@ee! ddfdAdBZ,		dXdCedDee dEee ddfdFdGZ-	dZdEee ddfdHdIZ.deded9ed:ed2eddfdJdKZ/dLdMdNdMdOeddfdPdQZ0dWdRdSZ1de!fdTdUZ2dS )[a  
Async-compatible API for recording observability metrics.

This module provides an async-safe interface for Redis async client code to record
metrics without needing to know about OpenTelemetry internals. It reuses the same
RedisMetricsCollector and configuration as the sync recorder.

Usage in Redis async client code:
    from redis.asyncio.observability.recorder import record_operation_duration

    start_time = time.monotonic()
    # ... execute Redis command ...
    await record_operation_duration(
        command_name='SET',
        duration_seconds=time.monotonic() - start_time,
        server_address='localhost',
        server_port=6379,
        db_namespace='0',
        error=None
    )
    )datetime)TYPE_CHECKINGListOptional)GeoFailoverReasonPubSubDirection)CloseReasonRedisMetricsCollector)get_observability_instance)!get_observables_registry_instance)str_if_bytes)ConnectionPool)AsyncDatabase)
OTelConfigN_async_metrics_collectorconnection_countreturnc                  C   sz   t durt S z#t  } | du s| jjsW dS |  tjtj	}t|| ja t W S  t
y3   Y dS  ty<   Y dS w )z
    Get or create the global metrics collector.

    Returns:
        RedisMetricsCollector instance if observability is enabled, None otherwise
    N)r   r
   get_provider_managerconfigenabled_telemetryget_meter_provider	get_meterr	   
METER_NAMEMETER_VERSIONImportError	Exception)managermeter r   X/home/ubuntu/.local/lib/python3.10/site-packages/redis/asyncio/observability/recorder.py_get_or_create_collector.   s    	
r    r   c                     s8   zt   } | du rW dS | jW S  ty   Y dS w )z
    Get the OTel configuration from the observability manager.

    Returns:
        OTelConfig instance if observability is enabled, None otherwise
    N)r
   r   r   r   )r   r   r   r   _get_configO   s   
r!   command_nameduration_secondsserver_addressserver_portdb_namespaceerroris_blockingretry_attemptsc           	         sN   t  }|du r
dS z|j| |||||||||d
 W dS  ty&   Y dS w )aM  
    Record a Redis command execution duration.

    This is an async-safe API that Redis async client code can call directly.
    If observability is not enabled, this returns immediately with zero overhead.

    Args:
        command_name: Redis command name (e.g., 'GET', 'SET')
        duration_seconds: Command execution time in seconds
        server_address: Redis server address
        server_port: Redis server port
        db_namespace: Redis database index
        error: Exception if command failed, None if successful
        is_blocking: Whether the operation is a blocking command
        retry_attempts: Number of retry attempts made

    Example:
        >>> start = time.monotonic()
        >>> # ... execute command ...
        >>> await record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
    N)
r"   r#   r$   r%   r&   
error_typenetwork_peer_addressnetwork_peer_portr(   r)   )r    record_operation_durationr   )	r"   r#   r$   r%   r&   r'   r(   r)   	collectorr   r   r   r-   _   s(   r-   connection_poolr   c                    >   t  }|du r
dS z
|j| |d W dS  ty   Y dS w )z
    Record connection creation time.

    Args:
        connection_pool: Connection pool implementation
        duration_seconds: Time taken to create connection in seconds
    N)r/   r#   )r    record_connection_create_timer   )r/   r#   r.   r   r   r   r1         r1   c                     sD   t  } | du r
dS dd }z	| j|d W dS  ty!   Y dS w )zB
    Initialize observable gauge for connection count metric.
    Nc                 S   s.   t  }|t}g }|D ]}||  q|S N)r   getCONNECTION_COUNT_REGISTRY_KEYextend)__observables_registry	callbacksobservationscallbackr   r   r   observable_callback   s   
z2init_connection_count.<locals>.observable_callback)r;   )r    init_connection_countr   )r.   r<   r   r   r   r=      s   
r=   connection_poolsc                    s\   t  }|du r
dS zddlm   fdd}t }|t| W dS  ty-   Y dS w )zG
    Add connection pools to connection count observable registry.
    Nr   )Observationc                     s6   g } D ]}|  D ]\}}|  ||d q
q| S )N)
attributes)get_connection_countappend)r:   r/   countr@   r?   r>   r   r   connection_count_callback   s   zBregister_pools_connection_count.<locals>.connection_count_callback)r    opentelemetry.metricsr?   r   registerr5   r   )r>   r.   rE   r8   r   rD   r   register_pools_connection_count   s   
rH   	pool_namec                    <   t  }|du r
dS z	|j| d W dS  ty   Y dS w )ze
    Record a connection timeout event.

    Args:
        pool_name: Connection pool identifier
    NrI   )r    record_connection_timeoutr   rI   r.   r   r   r   rL         	rL   c                    r0   )z
    Record time taken to obtain a connection from the pool.

    Args:
        pool_name: Connection pool identifier
        duration_seconds: Wait time in seconds
    N)rI   r#   )r    record_connection_wait_timer   )rI   r#   r.   r   r   r   rO      r2   rO   close_reasonr*   c                    r0   )z
    Record a connection closed event.

    Args:
        close_reason: Reason for closing (e.g. 'error', 'application_close')
        error_type: Error type if closed due to error
    N)rP   r*   )r    record_connection_closedr   )rP   r*   r.   r   r   r   rQ     r2   rQ   connection_namemaint_notificationrelaxedc                    @   t  }|du r
dS z|j| ||d W dS  ty   Y dS w )z
    Record a connection timeout relaxation event.

    Args:
        connection_name: Connection identifier
        maint_notification: Maintenance notification type
        relaxed: True to count up (relaxed), False to count down (unrelaxed)
    N)rR   rS   rT   )r    !record_connection_relaxed_timeoutr   )rR   rS   rT   r.   r   r   r   rV   '     rV   c                    rJ   )z
    Record a connection handoff event (e.g., after MOVING notification).

    Args:
        pool_name: Connection pool identifier
    NrK   )r    record_connection_handoffr   rM   r   r   r   rX   B  rN   rX   Tr+   r,   is_internalc              	      sH   t  }|du r
dS z|j| ||||||d W dS  ty#   Y dS w )at  
    Record error count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        error_type: Error type (Exception)
        retry_attempts: Retry attempts
        is_internal: Whether the error is internal (e.g., timeout, network error)
    N)r$   r%   r+   r,   r*   r)   rY   )r    record_error_countr   )r$   r%   r+   r,   r*   r)   rY   r.   r   r   r   rZ   W  s"   	rZ   	directionchannelshardedc                    st   t  }|du r
dS |}|dur$t I dH }|dur |jr d}nt|}z|j| ||d W dS  ty9   Y dS w )z
    Record a PubSub message (published or received).

    Args:
        direction: Message direction ('publish' or 'receive')
        channel: Pub/Sub channel name
        sharded: True if sharded Pub/Sub channel
    N)r[   r\   r]   )r    r!   hide_pubsub_channel_namesr   record_pubsub_messager   )r[   r\   r]   r.   effective_channelr   r   r   r   r_   ~  s&   r_   lag_secondsstream_nameconsumer_groupc                    sj   t  }|du r
dS |}|durt I dH }|dur|jrd}z|j| ||d W dS  ty4   Y dS w )z
    Record the lag of a streaming message.

    Args:
        lag_seconds: Lag in seconds
        stream_name: Stream name
        consumer_group: Consumer group name
    Nra   rb   rc   )r    r!   hide_stream_namesrecord_streaming_lagr   )ra   rb   rc   r.   effective_stream_namer   r   r   r   rf     s$   rf   c                    s\  t  }|du r
dS | sdS zt  }t I dH }|duo!|j}t| trg|  D ]8\}}|r3dnt	|}|D ])}	|	D ]$}
|
\}}t	|}|
d\}}td|t|d  }|j|||d q=q9q+W dS | D ]7}t	|d }|rudn|}|d D ]$}
|
\}}t	|}|
d\}}td|t|d  }|j|||d q{qiW dS  ty   Y dS w )a  
    Record streaming lag from XREAD/XREADGROUP response.

    Parses the response and calculates lag for each message based on message ID timestamp.

    Args:
        response: Response from XREAD/XREADGROUP command
        consumer_group: Consumer group name (for XREADGROUP)
    N-g        i  rd   r      )r    r   now	timestampr!   re   
isinstancedictitemsr   splitmaxintrf   r   )responserc   r.   rj   r   re   rb   stream_messagesrg   messagesmessage
message_id_rk   ra   stream_entryr   r   r   "record_streaming_lag_from_response  s\   
ry   c                    sD   t  }|du r
dS z|j| ||||d W dS  ty!   Y dS w )a=  
    Record a maintenance notification count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')
    N)r$   r%   r+   r,   rS   )r    record_maint_notification_countr   )r$   r%   r+   r,   rS   r.   r   r   r   rz   	  s   rz   	fail_fromr   fail_toreasonc                    rU   )z
    Record a geo failover.

    Args:
        fail_from: Database failed from
        fail_to: Database failed to
        reason: Reason for the failover
    N)r{   r|   r}   )r    record_geo_failoverr   )r{   r|   r}   r.   r   r   r   r~   *  rW   r~   c                   C   s   da dS )zS
    Reset the global async collector (used for testing or re-initialization).
    N)r   r   r   r   r   reset_collectorE  s   r   c                     s   t  } | duS )zw
    Check if observability is enabled.

    Returns:
        True if metrics are being collected, False otherwise
    N)r    )r.   r   r   r   
is_enabledM  s   r   )NNNNNN)r   N)NN)Tr3   )3__doc__r   typingr   r   r   redis.observability.attributesr   r   redis.observability.metricsr   r	   redis.observability.providersr
   redis.observability.registryr   redis.utilsr   redis.asyncio.connectionr   redis.asyncio.multidb.databaser   redis.observability.configr   r   __annotations__r5   r    r!   strfloatrq   r   boolr-   r1   r=   rH   rL   rO   rQ   rV   rX   rZ   r_   rf   ry   rz   r~   r   r   r   r   r   r   <module>   s>   !	
4








)
'
$
D
!

