o
    i#                     @   s<  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ eeZd	d
 Zdd Zdd Zdd Zd,ddZdd Zd,ddZdd Zd,ddZdd Zdd Zdd  ZG d!d" d"eZd#d$ Z d%d& Z!ej"re	#d'e e	#d(e e	#d)e e	#d*e e	#d+e! dS dS )-    N)Any)parse)config)core)DsmPathwayCodec)_calculate_byte_size)
get_loggerc                 C   s$   | d }t |}|jddd S )zy
    :params: contains the params for the current botocore action

    Return the name of the queue given the params
    QueueUrl/   )r   urlparsepathrsplit)params	queue_urlurl r   Y/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/internal/datastreams/botocore.pyget_queue_name   s   
r   c                 C   s   | d }|S )zy
    :params: contains the params for the current botocore action

    Return the name of the topic given the params
    TopicArnr   )r   sns_arnr   r   r   get_topic_arn   s   r   c                 C   s   |  d|  dd}|S )zz
    :params: contains the params for the current botocore action

    Return the name of the stream given the params
    	StreamARN
StreamName )get)r   streamr   r   r   
get_stream'   s   r   c                 C   s   ddl m} d|}d}|dkrt|| }n|dkr!t|| }n	|dkr*t|| }|s2td| | jd	d
||g|d}t	
||  dS )z
    :endpoint_service: the name  of the service (i.e. 'sns', 'sqs', 'kinesis')
    :dsm_identifier: the identifier for the topic/queue/stream/etc

    Set the data streams monitoring checkpoint and inject context to carrier
    r   data_streams_processorztype:{}Nsqssnskinesisz3pathway being generated with unrecognized service: zdirection:outztopic:{}payload_size)r   r    formatcalculate_sqs_payload_sizecalculate_sns_payload_sizecalculate_kinesis_payload_sizelogdebugset_checkpointr   encode)
trace_dataendpoint_servicedsm_identifiermessage	processor	path_typer%   ctxr   r   r   inject_context2   s   

r5   c                 C   sj   t | dd}|t | di 7 }|r|t dd|di7 }|t | di 7 }|t | dd7 }|S )	NMessageBodyr   MessageAttributes_datadogString)DataTypeStringValueMessageSystemAttributesMessageGroupIdr   r   r1   r.   r%   r   r   r   r'   N   s   r'   c                 C   sf   t | dd}|t | di 7 }|t dd|di7 }|t | dd7 }|t | dd7 }|S )	NMessager   r7   r8   Binary)r:   BinaryValueSubjectr=   r>   r?   r   r   r   r(   Y   s   r(   c                 C   sP   t | dd}|t | dd7 }|t | dd7 }|r&|t d|i7 }|S )NDatar   ExplicitHashKeyPartitionKeyr8   r>   r?   r   r   r   r)   c   s   r)   c                 G   s8   t jrd|vri |d< |rt|d d|| d S d S d S )Nr8   r#   )r   _data_streams_enabledr5   )r4   r   dd_ctx_jsonrecordargsr   r   r   handle_kinesis_producen   s   rK   c                 C   s@   |s|}d }|dkrt |}n|dkrt|}t|||| d S )Nr!   r"   )r   r   r5   )r4   spanr/   r.   r   r1   r0   r   r   r   handle_sqs_sns_producev   s   
rM   c                 C   sL   d| vr|  ddgi d S d| d vr$|  dt| d dg i d S d S )NMessageAttributeNamesr8   )updatelist)r   r   r   r   handle_sqs_prepare   s
    rQ   c              	   C   s(  d}| }z|  d}|rt|}W n ttfy"   td Y nw | dp,| d}|s7td|  dS d|vrCtd|  dS |d }| dd	krc| dd
kratt|d 	 }|S d|v rpt|d }|S d|v r}t|d }|S d|v rt|d 	 }|S td|  |S )aK  
    Formats we're aware of:
        - message.Body.MessageAttributes._datadog.Value.decode() (SQS)
        - message.MessageAttributes._datadog.StringValue (SNS -> SQS)
        - message.MessageAttributes._datadog.BinaryValue.decode() (SNS -> SQS, raw)
        - message.messageAttributes._datadog.stringValue (SQS -> lambda)
    NBodyz7Unable to parse message body as JSON, treat as non-jsonr7   messageAttributeszDataStreams skipped message: %rr8   TypeNotificationrA   Valuer;   stringValuerB   z&DataStreams did not handle message: %r)
r   jsonloads
ValueError	TypeErrorr*   r+   base64	b64decodedecode)r1   context_jsonmessage_bodybodymessage_attributesdatadog_attrr   r   r   get_datastreams_context   sB   


rd   c           
   	   G   s   ddl m} t|}|dg D ]/}zt|}t|}t|| }	|	jdd| dg|d W q t	y?   t
jdd	d
 Y qw d S )Nr   r   Messagesdirection:intopic:ztype:sqsr$   z@Error receiving SQS message with data streams monitoring enabledTexc_info)r   r    r   r   rd   r'   r   r^   r,   	Exceptionr*   r+   )
_r   resultrJ   r2   
queue_namer1   r_   r%   r4   r   r   r   handle_sqs_receive   s   rn   c                   @   s   e Zd ZdS )StreamMetadataNotFoundN)__name__
__module____qualname__r   r   r   r   ro      s    ro   c                 C   s`   ddl m} t| }|std|  t t|}t|| }|j	dd| dg|||d d S )Nr   r   zIUnable to determine StreamARN and/or StreamName for request with params: rf   rg   ztype:kinesis)edge_start_sec_overridepathway_start_sec_overrider%   )
r   r    r   r*   r+   ro   r)   r   r^   r,   )r   time_estimater_   rI   r2   r   r%   r4   r   r   r   +record_data_streams_path_for_kinesis_stream   s   
rv   c                 G   s8   z
t |||| W d S  ty   tjddd Y d S w )Nz9Failed to report data streams monitoring info for kinesisTrh   )rv   rj   r*   warning)rk   r   ru   r_   rI   rJ   r   r   r   handle_kinesis_receive   s
   rx   zbotocore.kinesis.update_recordz botocore.sqs_sns.update_messageszbotocore.sqs.ReceiveMessage.prez botocore.sqs.ReceiveMessage.postz botocore.kinesis.GetRecords.post)N)$r\   rX   typingr   urllibr   ddtracer   ddtrace.internalr   &ddtrace.internal.datastreams.processorr   "ddtrace.internal.datastreams.utilsr   ddtrace.internal.loggerr   rp   r*   r   r   r   r5   r'   r(   r)   rK   rM   rQ   rd   rn   rj   ro   rv   rx   rG   onr   r   r   r   <module>   s@    



/