o
    i                  
   @   s  d dl Z d dlmZ d dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl"m$Z$ d dl%m&Z& d dl'm(Z) d dl*m+Z+ e,de-ede&e j.dddd  d!e/fd"d#Z0d!e-e/e/f fd$d%Z1d&d' Z2d(d) Z3d*d+ Z4d,d- Z5d.d/ Z6d0d1 Z7d2d3 Z8d4d5 Z9d6d7 Z:dS )8    N)time_ns)wrap_function_wrapper)config)	SPAN_KIND)trace_utils)SpanKind)	SpanTypes)CONSUME)GROUP_ID)	HOST_LIST)PRODUCE)SERVICE)TOPIC)core)	COMPONENT)MESSAGING_DESTINATION_NAME)MESSAGING_SYSTEM)schematize_messaging_operation)schematize_service_name)SpanDirection)get_argument_value)set_argument_value)asbool)unwrap)HTTPPropagatoraiokafkakafkaDD_KAFKA_PROPAGATION_ENABLEDF)default)_default_servicedistributed_tracing_enabledreturnc                   C   s   t tddS )N__version__ )getattrr    r%   r%   [/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/aiokafka/patch.pyget_version)   s   r'   c                   C   s   ddiS )Nr   z>=0.9.0r%   r%   r%   r%   r&   _supported_versions-   s   r(   c              
   C   s   t tjjt| t| ttt|iS N)	r   r   r   integration_namer   r   r   r   r   )topicbootstrap_serversr%   r%   r&   common_aiokafka_tags1   s   r-   c                 C   s"   t | |}|ttjt|i |S r)   )r-   updater   r   CONSUMERr
   )r+   r,   group_idtagsr%   r%   r&   common_consume_aiokafka_tags;   s   
r2   c           	      C   sn   t ||dd}t ||ddd}t ||dddpd }t ||dd	d}t ||d
ddp*g }| jj}||||||fS )Nr   r+      valueT   key   	partition   headers)r   client_bootstrap_servers)	instanceargskwargsr+   r4   r6   r8   r:   serversr%   r%   r&   
parse_sendF   s   rA   c                    s  t |||\}}}}}}	tjdttdtjdtjt	
d tjt||	dW td|||| |f t||dd|dd	\}}z| |i |I d H }
W n tyh } ztd
 t|||jfd f |d }~ww  fdd}|
| |
W  d    S 1 sw   Y  d S )Nzaiokafka.sendr   provider	direction)	span_name	span_typeservicer1   zaiokafka.send.startr9   r:   T)override_unsetaiokafka.send.completedc              
      sf   z|   }td d|f W d S  ty2 } ztd t|||jfd f W Y d }~d S d }~ww )NrI   )NNN)resultr   dispatch	Exceptiontype__traceback__)futurerJ   ectxr%   r&   sent_callbackd   s   ,z"traced_send.<locals>.sent_callback)rA   r   context_with_datar   r   r   OUTBOUNDr   WORKERr   ext_servicer   r   r-   rK   r   BaseExceptionrM   rN   add_done_callback)funcr=   r>   r?   r+   r4   r:   r8   r6   r,   rJ   rP   rS   r%   rQ   r&   traced_sendQ   s.   
$r[   c                    s|  t  }d }d }d }|j}|jj}	zpz | |i |I d H }tjjr1|jr1dd |jD }
t	|
}W n t
yB } z|}|d }~ww W tjddttdtjdtjtd tj|tt|dd |	|d}td	|||||f W d    |S 1 s{w   Y  |S tjddttdtjdtjtd tj|tt|dd |	|d}td	|||||f W d    w 1 sw   Y  w )
Nc                 S   s>   i | ]\}}|d ur|t |ttfr|jdddnt|qS )Nzutf-8ignore)errors)
isinstancebytes	bytearraydecodestr).0r6   valr%   r%   r&   
<dictcomp>}   s
    $z!traced_getone.<locals>.<dictcomp>zaiokafka.getoneFr   rB   r+   )
call_tracerE   rF   rG   distributed_contextr1   zaiokafka.getone.message)r   	_group_id_clientr<   r   r   r    r:   r   extractrL   r   rT   r   r	   r   INBOUNDr   rV   r   rW   r2   r$   rK   )rZ   r=   r>   r?   start_nserrmessage
parent_ctxr0   r,   
dd_headersrP   rR   r%   r%   r&   traced_getoneo   s`   



rq   c              
      s   |j }|jj}tjddttdtjdt	j
td tjtd ||d}| |i |I d H }td|||f |W  d    S 1 sCw   Y  d S )Nzaiokafka.getmanyFr   rB   )rf   rE   rF   rG   r1   zaiokafka.getmany.message)rh   ri   r<   r   rT   r   r	   r   rk   r   rV   r   rW   r   r   r2   rK   )rZ   r=   r>   r?   r0   r,   rR   messagesr%   r%   r&   traced_getmany   s    
$rs   c                    s,   | |i |I d H }t d|||f |S )Nzaiokafka.commit.end)r   rK   )rZ   r=   r>   r?   rJ   r%   r%   r&   traced_commit   s   rt   c                   C   sJ   t tddrd S dt_tddt tddt tddt tdd	t d S )
N_datadog_patchFTr   zAIOKafkaProducer.sendzAIOKafkaConsumer.getonezAIOKafkaConsumer.getmanyz#aiokafka.consumer.group_coordinatorzGroupCoordinator.commit_offsets)r$   r   ru   _wr[   rq   rs   rt   r%   r%   r%   r&   patch   s   rw   c                   C   sN   t tddsd S dt_ttjd ttjd ttjd ttjjjd d S )Nru   Fsendgetonegetmanycommit_offsets)	r$   r   ru   _uAIOKafkaProducerAIOKafkaConsumerconsumergroup_coordinatorGroupCoordinatorr%   r%   r%   r&   unpatch   s   r   );ostimer   r   wraptr   rv   ddtracer   ddtrace.constantsr   ddtrace.contribr   ddtrace.extr   r   ddtrace.ext.kafkar	   r
   r   r   r   r   ddtrace.internalr   ddtrace.internal.constantsr   r   r   ddtrace.internal.schemar   r   -ddtrace.internal.schema.span_attribute_schemar   ddtrace.internal.utilsr   r   ddtrace.internal.utils.formatsr   ddtrace.internal.utils.wrappersr   r|   ddtrace.propagation.httpr   _adddictgetenvrb   r'   r(   r-   r2   rA   r[   rq   rs   rt   rw   r   r%   r%   r%   r&   <module>   sZ    	
%