o
    i                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" ddl#m$Z$ ddl%m&Z& ddl%m'Z' ddl%m(Z( ddl%m)Z) ddl%m*Z* de+fddZ,e-dee j.dd d!ej/pe j.d"e$d!d# e Z0de1e+e+f fd$d%Z2d&d' Z3d(d) Z4d*d+ Z5d,d- Z6dS ).    N)config)Pin)_SPAN_MEASURED_KEY)	SPAN_KIND)trace_utils)SpanKind)	SpanTypes)kombu)core)	COMPONENT)schematize_messaging_operation)schematize_service_name)SpanDirection)get_argument_value)asbool)unwrap)HTTPPropagator)tracer   )DEFAULT_SERVICE)
HEADER_POS)extract_conn_tags)get_body_length_from_args)get_exchange_from_args)get_routing_key_from_argsreturnc                   C   s
   t tjS )N)strr	   __version__ r   r   X/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/kombu/patch.pyget_version$   s   
r    r	   DD_KOMBU_DISTRIBUTED_TRACINGTdefaultDD_KOMBU_SERVICE_NAME)distributed_tracing_enabledservice_namec                   C   s   ddiS )Nr	   z>=4.6.6r   r   r   r   r   _supported_versions5   s   r'   c                  C   s   t tddrdS dt_tj} | ddt | ddt tjr d}nt	j
dtd	}tt|d
tjj tttjd d
tjj dS )zPatch the instrumented methods

    This duplicated doesn't look nice. The nicer alternative is to use an ObjectProxy on top
    of Kombu. However, it means that any "import kombu.Connection" won't be instrumented.
    _datadog_patchFNTr	   zProducer._publishzConsumer.receiver$   r"   )servicer&   )getattrr	   r(   wraptwrap_function_wrappertraced_publishtraced_receiver   r)   osgetenvr   r   r   onto	messagingProducerConsumer)_wprod_servicer   r   r   patch9   s   "r7   c                   C   s2   t tddrdt_ttjd ttjd d S d S )Nr(   F_publishreceive)r*   r	   r(   r   r3   r4   r   r   r   r   unpatch[   s
   r:   c           	      C   s  t |}|r| s| |i |S t||dd}tjt|jtj	d tj
ttjdtjd|jtjdQ}|ttj	j |ttj |td |jd }||_|tj| |t|j j! |tj"|jd  | |i |}t#$d	|||g |W  d    S 1 sw   Y  d S )
Nr   message)request_headers
int_configr	   provider	directionr)   	span_typeexchangerouting_keyzkombu.amqp.receive.post)%r   get_fromenabledr   r   activate_distributed_headersr   headersr   r	   tracer   kombuxRECEIVE_NAMEr   
PROCESSINGr)   r   WORKER_set_tag_strr   integration_namer   r   CONSUMER
set_metricr   delivery_inforesourceEXCHANGEset_tagsr   channel
connectionROUTING_KEYr
   dispatch)	funcinstanceargskwargspinr;   srC   resultr   r   r   r.   g   s.   

$r.   c                 C   s2  t |}|r| s| |i |S tjttjdtj	d|j
tjdl}|ttjj |ttj |td t|}||_|tj| |jrP||j |tjt| |t|jj  |tj!t"| tjj#rxt$%|j&|t'  t()d|||g | |i |W  d    S 1 sw   Y  d S )Nr	   r>   rA   r   zkombu.amqp.publish.pre)*r   rE   rF   r   rI   r   rJ   PUBLISH_NAMEr   OUTBOUNDr)   r   rM   rN   r   r   r	   rO   r   r   PRODUCERrQ   r   r   rS   rT   tagsrU   rX   r   r   rV   rW   BODY_LENr   r%   
propagatorinjectcontextr   r
   rY   )rZ   r[   r\   r]   r^   r_   exchange_namer   r   r   r-      s6   

$r-   )7r/   r	   r+   ddtracer   ddtrace._trace.pinr   ddtrace.constantsr   r   ddtrace.contribr   ddtrace.extr   r   rJ   ddtrace.internalr
   ddtrace.internal.constantsr   ddtrace.internal.schemar   r   -ddtrace.internal.schema.span_attribute_schemar   ddtrace.internal.utilsr   ddtrace.internal.utils.formatsr   ddtrace.internal.utils.wrappersr   ddtrace.propagation.httpr   ddtrace.tracer   	constantsr   utilsr   r   r   r   r   r   r    _addr0   r)   rf   dictr'   r7   r:   r.   r-   r   r   r   r   <module>   sP    ""