o
    i                     @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZ d d
lmZ efZdZdZdZdaeeZdd Zdd Zdd Zejrlede ede ede dS dS )    N)TopicPartition)config)core)DsmPathwayCodec)_calculate_byte_size)
get_logger)ArgumentError)get_argument_value)set_argument_value      keyFc              
      s^  ddl m td}td t||tddd}t||ttdd}|di }d	}	|	t	|7 }	|	t	|7 }	|	t	|7 }	d
d| dg}
 rN|

dt    j|
|	|d}tsct|| ||d< d}d}d z	t||||W n ty   |sd}d}t||||ddY nw  fdd}zt|||||\}}W d S  ty   |||< Y d S w )Nr   data_streams_processorkafka_topickafka_cluster_idvalueToptionalheadersr   zdirection:outtopic:
type:kafkakafka_cluster_id:payload_sizespanon_delivery   callback   c                    s~   | d u r#t | tr| nd} j| | |t  d n|  dkr2ts2dat	
d d ur=| | d S d S )N
cluster_idTzKafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.)
isinstanceoffset	INT_TYPEStrack_kafka_producetopic	partitiontimecodedisable_header_injectionlogerror)errmsgreported_offsetr"   r   	processor V/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/internal/datastreams/kafka.pywrapped_callback;   s   z3dsm_kafka_message_produce.<locals>.wrapped_callback) r   r   	find_itemr	   MESSAGE_ARG_POSITIONKEY_ARG_POSITIONKEY_KWARG_NAMEgetr   appendstrset_checkpointr+   r   encoder   r
   )instanceargskwargsis_serializingr   r'   messager   r   r   	edge_tagsctxon_delivery_kwargon_delivery_argr5   r3   r1   r4   dsm_kafka_message_produce   sF   

rI   c                 C   s  ddl m} dd | pg D }td}td}| j}d}t|dr,|| 7 }n|t|	 7 }|t|
 7 }|t|7 }t|| }	d	d
| d| dg}
|r^|
dt|  |	j|
||d | jrt| trv| d nd}| j| j| | |t |d d S d S )Nr   r   c                 S   s   i | ]	}|d  |d qS )r   r   r3   ).0headerr3   r3   r4   
<dictcomp>U   s    z-dsm_kafka_message_consume.<locals>.<dictcomp>r   r   r   lenzdirection:inzgroup:r   r   r   r   r    r!   )r6   r   r   r   r7   	_group_idhasattrrM   r   r   r   r   decoder<   r=   r>   _auto_commitr#   r$   r%   track_kafka_commitr'   r(   r)   )r@   rD   r   r2   r   r'   r"   groupr   rF   rE   r0   r3   r3   r4   dsm_kafka_message_consumeR   s>   



rT   c           	   	   C   s   ddl m} tdpd}t||dddd}g }|d ur7t| tr)| d nd	}t|	 |
 |g}n
t||dd
dp@g }|D ]}t|jtrN|jnd	}| j| j|j	|j
|t |d qCd S )Nr   r   r   r6   r   rD   Tr   r    offsetsr!   )r6   r   r   r7   r	   r#   r$   r%   r   r'   r(   rR   rN   r)   )	r@   rA   rB   r2   r"   rD   rU   r0   r$   r3   r3   r4   dsm_kafka_message_commit~   s&   rV   zkafka.produce.startzkafka.consume.startzkafka.commit.start)r)   confluent_kafkar   ddtracer   ddtrace.internalr   &ddtrace.internal.datastreams.processorr   "ddtrace.internal.datastreams.utilsr   ddtrace.internal.loggerr   ddtrace.internal.utilsr   r	   r
   intr%   r8   r9   r:   r+   __name__r,   rI   rT   rV   _data_streams_enabledonr3   r3   r3   r4   <module>   s0    9,