o
    i                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d d	lmZ eeZd
d Zdd ZejrSede ede dS dS )    )config)
HEADER_POS)PUBLISH_BODY_IDX)get_exchange_from_args)get_routing_key_from_args)core)DsmPathwayCodec)_calculate_byte_size)
get_loggerc                 C   s   ddl m} t| }t| }d}|t| t 7 }|t| t 7 }tt|	 }g }dd|fd|fdfD ]\}	}
|
d urF|
|	 d|
  q4| j|||d	}t|| t  d S )
N   data_streams_processorr   )	directionoutexchangehas_routing_key)typerabbitmq:payload_sizespan) r   r   r   r	   r   r   strboollowerappendset_checkpointr   encode)argskwargsr   	processorrouting_keydsm_identifierr   r   pathway_tagsprefixvaluectx r(   V/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/internal/datastreams/kombu.pyhandle_kombu_produce   s$   r*   c                 C   sz   ddl m} d}|t|j7 }|t|j7 }t|j| }t| jdkr+| jd j	nd}|j
dd| dg||d d S )	Nr   r   r   r   zdirection:inztopic:ztype:rabbitmqr   )r   r   r	   bodyheadersr   decodelenqueuesnamer   )instancemessager   r!   r   r'   queuer(   r(   r)   handle_kombu_consume(   s    r4   zkombu.amqp.publish.prezkombu.amqp.receive.postN)ddtracer   $ddtrace.contrib.internal.kombu.utilsr   r   r   r   ddtrace.internalr   &ddtrace.internal.datastreams.processorr   "ddtrace.internal.datastreams.utilsr	   ddtrace.internal.loggerr
   __name__logr*   r4   _data_streams_enabledonr(   r(   r(   r)   <module>   s     