o
    iB                     @   s  d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d d	lmZ d d
lmZ d dlmZ erZd dlmZ d dlmZ d dlmZ d dlmZ eeZdedee dee deeeef  de
j deddfddZ!de
j deee" ee# ee f deddfddZ$ddde
j d ee% d!ed" dee# ddfd#d$Z&ddd%e
j d&ee'd'ed" f  ddfd(d)Z(dd*d+ed,eddfd-d.Z)ej*re
+d/e! e
+d0e$ e
+d1e& e
+d2e( e
+d3e) dS dS )4    N)TracebackType)TYPE_CHECKING)Any)Optional)config)core)DsmPathwayCodec)_calculate_byte_size)
get_logger)get_argument_value)AIOKafkaConsumer)GroupCoordinator)ConsumerRecord)TopicPartitiontopicvaluekeyheadersspan_ctx
_partitionreturnc                 C   s   ddl m} | }|sd S |j}d}	|	t|7 }	|	t|7 }	z	dd |D }
W n ty9   tjddd i }
Y nw |	t|
7 }	d	d
|  dg}|j||	|d}i }t	|| |
 D ]\}}|||	df q\d S )N   data_streams_processorr   c                 S   s   i | ]\}}||qS  r   ).0kvr   r   Y/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/internal/datastreams/aiokafka.py
<dictcomp>-   s    z+dsm_aiokafka_send_start.<locals>.<dictcomp>z5Error converting headers for payload size calculationT)exc_infozdirection:outtopic:
type:kafkapayload_sizespanutf-8) r   r%   r	   	Exceptionlogdebugset_checkpointr   encodeitemsappend)r   r   r   r   r   r   	processordsm_processorr%   r$   header_dict	edge_tagsctxdsm_headers
header_keyheader_valuer   r   r   dsm_aiokafka_send_start   s,   r7   _ctx_errorrecord_metadatac                 C   sX   ddl m} | }|sd S |d ur*t|jtr|jnd}||j|j|t  d S d S )Nr   r   )	r'   r   
isinstanceoffsetinttrack_kafka_producer   	partitiontime)r8   r9   r:   r/   r0   reported_offsetr   r   r   dsm_aiokafka_send_completed<   s   rC   instancer   	_start_nsmessager   c                 C   s   ddl m} | }|r|sd S |j}dd |jpg D }| j}	d}
|
t|j7 }
|
t|j7 }
|
t|7 }
t	||}|j
dd|	 d|j d	g|
|d
 | jrnt|jtr\|jd nd}|| j|j|j|t  d S d S )Nr   r   c                 S   s>   i | ]\}}|d ur|t |ttfr|jdddnt|qS )Nr&   ignore)errors)r<   bytes	bytearraydecodestr)r   r   valr   r   r   r   ]   s
    $z0dsm_aiokafka_message_consume.<locals>.<dictcomp>r   zdirection:inzgroup:r!   r"   r#   r;   )r'   r   r%   r   	_group_idr	   r   r   r   rK   r+   r   _enable_auto_commitr<   r=   r>   track_kafka_commitr@   rA   )rD   r   rE   rF   r9   r/   r0   r%   r   groupr$   r3   rB   r   r   r   dsm_aiokafka_message_consumeN   s.   rR   r3   messagesr   c                 C   s<   |d ur|  D ]\}}|D ]
}t| |d |d  qqd S d S )N)r-   rR   )rD   r3   rS   _recordsrecordr   r   r   "dsm_aiokafka_many_messages_consumew   s   rW   r   argskwargsc           	   	   C   s   ddl m} | }|sd S t||dddd}|rP| D ]5\}}t|tr(|}nt|tr?t|dkr?t|d tr?|d }nd}|| j	|j
|j|t  qd S d S )Nr   r   offsetsT)optionalr   r;   )r'   r   r   r-   r<   r>   tuplelenrP   group_idr   r@   rA   )	rD   rX   rY   r/   r0   rZ   tpr=   rB   r   r   r   dsm_aiokafka_message_commit   s   
$
r`   zaiokafka.send.startzaiokafka.send.completedzaiokafka.getone.messagezaiokafka.getmany.messagezaiokafka.commit.end),rA   typesr   typingr   r   r   ddtracer   ddtrace.internalr   &ddtrace.internal.datastreams.processorr   "ddtrace.internal.datastreams.utilsr	   ddtrace.internal.loggerr
   ddtrace.internal.utilsr   aiokafkar   #aiokafka.consumer.group_coordinatorr   aiokafka.structsr   r   __name__r)   rL   rI   listr\   ExecutionContextr7   typeBaseExceptionrC   r>   rR   dictrW   r`   _data_streams_enabledonr   r   r   r   <module>   s    
#

)
