o
    i_:                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlZd dlmZ d dlmZ d dl	m
Z
 d dl	mZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d dl m"Z" d dl m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z* d dl+m,Z, ej-Z.ej/Z0e1edrej2ndZ3e1edrej4ndZ5ee6Z7e8de9ede%e j:ddd e%e j:d!d"d d# d$e;fd%d&Z<d$e9e;e;f fd'd(Z=e'e< Z>e>d)krej?j@ndZAe>d)krej?jBndZCG d*d+ d+ZDG d,d- d-ZEG d.d/ d/eEej/ZFG d0d1 d1eDej-ZGG d2d3 d3eEej4ZHG d4d5 d5eDej2ZId6d7 ZJd8d9 ZKd:d; ZLd<d= ZMd>d? ZNd@dA ZOdBdC ZPdDdE ZQdS )F    N)time)time_ns)config)Pin)_SPAN_MEASURED_KEY)	SPAN_KIND)trace_utils)SpanKind)	SpanTypes)kafka)core)	COMPONENT)MESSAGING_DESTINATION_NAME)MESSAGING_SYSTEM)
get_logger)schematize_messaging_operation)schematize_service_name)SpanDirection)ArgumentError)get_argument_value)set_argument_value)asbool)parse_version)HTTPPropagator)tracerSerializingProducerDeserializingConsumerr   DD_KAFKA_PROPAGATION_ENABLEDF)defaultDD_KAFKA_EMPTY_POLL_ENABLEDT)_default_servicedistributed_tracing_enabledtrace_empty_poll_enabledreturnc                   C   s   t tddS )N__version__ )getattrconfluent_kafka r(   r(   X/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/kafka/patch.pyget_version6   s   r*   c                   C   s   ddiS )Nr'   z>=1.9.2r(   r(   r(   r(   r)   _supported_versions:   s   r+   )      r   c                       s*   e Zd Zd fdd	Zdd ZeZ  ZS )TracedProducerMixinNc                    sT   |s|}t t| j|g|R i | |dd ur"|d| _d S |d| _d S )Nzbootstrap.serverszmetadata.broker.list)superr.   __init__get_dd_bootstrap_serversselfr   argskwargs	__class__r(   r)   r0   F   s   zTracedProducerMixin.__init__c                 C   s   dS )NTr(   )r4   r(   r(   r)   __bool__R   s   zTracedProducerMixin.__bool__N)__name__
__module____qualname__r0   r9   __nonzero____classcell__r(   r(   r7   r)   r.   E   s    r.   c                       s   e Zd Zd fdd	Z  ZS )TracedConsumerMixinNc                    sJ   |s|}t t| j|g|R i | |dd| _t|dd| _d S )Nzgroup.idr%   zenable.auto.commitT)r/   r@   r0   r1   	_group_idr   _auto_commitr3   r7   r(   r)   r0   Y   s
   zTracedConsumerMixin.__init__r:   )r;   r<   r=   r0   r?   r(   r(   r7   r)   r@   X   s    r@   c                   @      e Zd ZdS )TracedConsumerNr;   r<   r=   r(   r(   r(   r)   rD   a       rD   c                   @   rC   )TracedProducerNrE   r(   r(   r(   r)   rG   e   rF   rG   c                   @   rC   )TracedDeserializingConsumerNrE   r(   r(   r(   r)   rH   i   rF   rH   c                   @   rC   )TracedSerializingProducerNrE   r(   r(   r(   r)   rI   m   rF   rI   c                  C   s   t tddrd S dt_tt_tt_td urtt_	t
d urtt_ttfD ]	} t| dt q#ttfD ]}t|dt t|dt q1ttdt t tj t tj t tj	 t tj d S )N_datadog_patchFTproducepollcommitconsume)r&   r'   rJ   rG   ProducerrD   Consumer_SerializingProducerrI   r   _DeserializingConsumerrH   r   r   wraptraced_producetraced_poll_or_consumetraced_commitr   ontoproducerconsumerr(   r(   r)   patchq   s&   r[   c                  C   s   t tddr	dt_ttfD ]} t| jrt| d qt	t
fD ]}t|jr.t|d t|jr:t|d q tt	jrGtt	d tt_tt_td urTtt_td ur]tt_d S d S )NrJ   FrK   rL   rM   rN   )r&   r'   rJ   rG   rI   r   	iswrappedrK   unwraprD   rH   rL   rM   rN   	_ProducerrO   	_ConsumerrP   rQ   r   rR   r   rX   r(   r(   r)   unpatch   s*   
r`   c              
   C   s`  t |}|r| s| |i |S t||ddpd}td| z	t||dd}W n ty5   d }Y nw |ddp=d}|dd	}t||d
dddpNi }	tj	t
tjdtjdt|tjtjd}
t||}td| |ry|
tj| td|||t|t|
f |
ttj |
ttjj |
tt j! |
tj"| |r|
t#| td urt|trt$||||	}|d ur|
tj%| n|
tj%| |
&tj'| |
tj(t)|d u  |
*t+d |j,d ur|
tj-|j, tjj.rt||d
ddpi }	t/0|
j1|	 t2||d
d|	dd\}}| |i |W  d    S 1 s)w   Y  d S )Nr   topicr%   kafka_topicr,   valuekey	partition   headersToptionalr   provider	direction)service	span_typekafka_cluster_idzkafka.produce.start)override_unset)3r   get_fromenabledr   r   set_itemr   r1   r   tracer   kafkaxPRODUCEr   OUTBOUNDr   ext_servicer   r   r
   WORKER_get_cluster_id_set_tag_str
CLUSTER_IDdispatch
isinstancerQ   r   SERVICEr   integration_namer   r	   PRODUCERTOPICr   serialize_keyMESSAGE_KEYset_tag	PARTITION	TOMBSTONEstr
set_metricr   r2   	HOST_LISTr!   
Propagatorinjectcontextr   )funcinstancer5   r6   pinra   rc   message_keyre   rh   span
cluster_idserialized_keyr(   r(   r)   rT      s^   



&rT   c           	      C   s"  t |}|r| s| |i |S t }d }d }zJz	| |i |}W n ty3 } z|}|d }~ww W t|tjrFt|g|||| |S t|t	rUt||||| |S t
jjrbtd g|||| |S t|tjrtt|g|||| w t|t	rt||||| w t
jjrtd g|||| w w r:   )r   rr   rs   r   	Exceptionr   r'   Message_instrument_messagelistr   r   r"   )	r   r   r5   r6   r   start_nserrresulter(   r(   r)   rU      s<   


rU   c              	   C   s  d }t | r
| d nd }|d ur!tjjr!| r!tt| }tj	t
tjdtjdt|tjtj|d ur>|jd ur>|ntj dd}||_d }| D ],}	|	d urz|d urzt|t| }td| tdt|  td||	|f qN|ttj |t tjj! |t"t#j$ |r|tj%| |tj&t|d u |tj'|j( |d ur|) pd	}
|* pd
}t| }|tj+| |t,| t-d urt.|t-rt.|
tst.|
t/r|tj0|
 |1tj2|3  d}zt |dk}W n
 t4y   Y nw |tj5t| |1tj6| |7t8d |d ur3|j9t:;   W d    d S W d    d S 1 s?w   Y  d S )Nr   r   rk   T)namern   ro   child_ofactivaterp   rb   zkafka.consume.startr%   rf   Fr,   )<lenr   r   r!   rh   r   extractdictr   
start_spanr   rv   CONSUMEr   
PROCESSINGr   ry   r
   rz   trace_idcontext_provideractiver   r{   r   ra   r   rt   r~   r|   r   r   r   r   r   r	   CONSUMERr}   RECEIVED_MESSAGEGROUP_IDrA   rd   offsetr   r   rR   r   bytesr   r   r   re   	TypeErrorr   MESSAGE_OFFSETr   r   set_exc_infosysexc_info)messagesr   r   r   r   ctxfirst_messager   r   messager   message_offsetra   is_tombstoner(   r(   r)   r      sn   

3$r   c                 C   s   t |}|r| s| |i |S t|dd}|sGt||dddd}|d ur4t|dr4t|| }nt||dd	dd}|rGt||d j}t	d
| t
d|||f | |i |S )N_dd_cluster_idr%   r   r   Tri   ra   r,   offsetsrp   zkafka.commit.start)r   rr   rs   r&   r   hasattrr{   ra   r   rt   r~   )r   r   r5   r6   r   r   r   r   r(   r(   r)   rV   7  s   
rV   c                 C   s   t d ur?td urAt |tj|}t| dr5| jd ur5z	| ||}|W S  ty4   tdt| Y d S w t	dt| d S d S d S )N_key_serializerz(Failed to set Kafka Consumer key tag: %szNFailed to set Kafka Consumer key tag, no method available to serialize key: %s)
_SerializationContext_MessageFieldKEYr   r   r   logdebugr   warning)r   ra   rd   rh   r   r(   r(   r)   r   K  s   r   c                 C   s   | rt | dd r| jS t | dd}t | dk rdS t | dd d u r$dS z| j|dd}|r<t |d	d r?|j| _|jW S W dS W dS  tyT   t | _td
 Y dS w )Nr   _dd_cluster_id_failure_timer   i,  r%   list_topicsg      ?)ra   timeoutr   z:Failed to get Kafka cluster ID, will retry after 5 minutes)	r&   r   r   r   r   r   r   r   r   )r   ra   last_failurecluster_metadatar(   r(   r)   r{   Z  s*   r{   )Rosr   r   r   r'   ddtracer   ddtrace._trace.pinr   ddtrace.constantsr   r   ddtrace.contribr   ddtrace.extr	   r
   r   rv   ddtrace.internalr   ddtrace.internal.constantsr   r   r   ddtrace.internal.loggerr   ddtrace.internal.schemar   r   -ddtrace.internal.schema.span_attribute_schemar   ddtrace.internal.utilsr   r   r   ddtrace.internal.utils.formatsr   ddtrace.internal.utils.versionr   ddtrace.propagation.httpr   r   ddtrace.tracer   rO   r^   rP   r_   r   r   rQ   r   rR   r;   r   _addr   getenvr   r*   r+   KAFKA_VERSION_TUPLEserializationSerializationContextr   MessageFieldr   r.   r@   rD   rG   rH   rI   r[   r`   rT   rU   r   rV   r   r{   r(   r(   r(   r)   <module>   sz    

	9>