o
    iR                     @   sR  d dl Z d dlmZ d dlmZ d dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dl
mZ d dl
mZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* ddl+m,Z, ddl-m.Z. ddl/m0Z0 ddl/m1Z1 ddl2m3Z3 ddl4m5Z5 dd Z6	 e*e7Z8dZ9d Z:d!Z;	 e<e=e>e>f Z?G d"d# d#e@ZAed$d%e=fd&e>fd'e=fgZBed(d)e=fd%e=fd&e>fd'e=fgZCed*d+ee?eAf fd,eeBe>f fd-eeCe>f fgZDG d.d/ d/e,ZEG d0d1 d1ZFG d2d3 d3ZGd6d4d5ZHdS )7    N)defaultdict)partial)
NamedTuple)Optional)Union)compat)process_tags)register_on_exit_signal)DEFAULT_SERVICE_NAME)DDSketch)config)Lock)fnv1_64)fibonacci_backoff_with_jitter)__version__   )packb)get_connection)get_hostname)
get_logger)PeriodicService)_human_size   )decode_var_int_64)encode_var_int_64)SchemaBuilder)SchemaSamplerc                 C   s   t | dS )Nr   )gzipcompress)payload r    Z/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/internal/datastreams/processor.pygzip_compress&      r"   zdd-pathway-ctxzdd-pathway-ctx-base64   c                   @   s   e Zd ZdZdZdd ZdS )PathwayStatszAggregated pathway statistics.)full_pathway_latencyedge_latencypayload_sizec                 C   s   t  | _t  | _t  | _d S N)r   r&   r'   r(   selfr    r    r!   __init__K   s   zPathwayStats.__init__N)__name__
__module____qualname____doc__	__slots__r,   r    r    r    r!   r%   F   s    r%   PartitionKeytopic	partition
cluster_idConsumerPartitionKeygroupBucketpathway_statslatest_produce_offsetslatest_commit_offsetsc                       s*  e Zd ZdZ				d4dee dee dedef fd	d
Z	d5dedede	e dededededdfddZ
d6ddZd6ddZde	e fddZdeddfddZd7d d!Zdee ddfd"d#Zd$edd%fd&d'Zd$eeeef  dd%fd(d)Zd8d*d+Zd9d,d-Zd.d/ Zd0d1 Zd2d3 Z  ZS ):DataStreamsProcessorzeDataStreamsProcessor for computing, collecting and submitting data stream stats to the Datadog Agent.N      ?   	agent_urlintervaltimeoutretry_attemptsc                    s
  |d u rt tdpd}tt| j|d |ptj| _d| _	d| j| j	f | _
|| _t|d | _tdd | _t| _d	| jd
dd| _tt | _ttt| _t | _t | _d| _ i | _!t"|d| j# d|  d d| j$| _%t&t't(| d | )  d S )N_DD_TRACE_STATS_WRITER_INTERVALg      $@)r@   z/v0.1/pipeline_statsz%s%s    eAc                   S   s   t ttttttS r)   )r8   r   r%   intr    r    r    r!   <lambda>s   s    z/DataStreamsProcessor.__init__.<locals>.<lambda>pythonzapplication/msgpackr   )zDatadog-Meta-LangzDatadog-Meta-Tracer-VersionzContent-TypezContent-EncodingTg-?gS?r   )attemptsinitial_wait)obj)*floatosgetenvsuperr<   r,   agent_configtrace_agent_url
_agent_url	_endpoint_agent_endpoint_timeoutrE   _bucket_size_nsr   _bucketsr   _version_headersr   ensure_textr   	_hostnamer   _get_servicer
   _servicer   _lock	threadinglocal_current_context_enabled_schema_samplersr   r@   _flush_stats_flush_stats_with_backoffr	   r   _atexitstart)r+   r?   r@   rA   rB   	__class__r    r!   r,   b   s>   
zDataStreamsProcessor.__init__r   
hash_valueparent_hash	edge_tagsnow_secedge_latency_secfull_pathway_latency_secr(   returnc                 C   s   | j sdS t|d }| j: ||| j  }	d|||f}
| j|	 j|
 }|j| |j	| |j
| || j|	 j|
< W d   dS 1 sKw   Y  dS )a  
        on_checkpoint_creation is called every time a new checkpoint is created on a pathway. It records the
        latency to the previous checkpoint in the pathway (edge latency),
        and the latency from the very first element in the pathway (full_pathway_latency)
        the pathway is hashed to reduce amount of information transmitted in headers.

        :param hash_value: hash of the pathway, it's a hash of the edge leading to this point, and the parent hash.
        :param parent_hash: hash of the previous step in the pathway
        :param edge_tags: all tags associated with the edge leading to this step in the pathway
        :param now_sec: current time
        :param edge_latency_sec: latency of the direct edge between the previous point
            in the pathway, and the current step
        :param full_pathway_latency_sec: latency from the very start of the pathway.
        :return: Nothing
        NrD   ,)ra   rE   r]   rU   joinrV   r9   r&   addr'   r(   )r+   ri   rj   rk   rl   rm   rn   r(   now_nsbucket_time_nsaggr_keystatsr    r    r!   on_checkpoint_creation   s   "z+DataStreamsProcessor.on_checkpoint_creation c           	      C   st   t |d }t|||}| j! ||| j  }t|| j| j| | j| j|< W d    d S 1 s3w   Y  d S NrD   )rE   r2   r]   rU   maxrV   r:   )	r+   r3   r4   offsetrl   r5   rs   keyrt   r    r    r!   track_kafka_produce   s   "z(DataStreamsProcessor.track_kafka_producec           
      C   sv   t |d }t||||}| j! ||| j  }	t|| j|	 j| | j|	 j|< W d    d S 1 s4w   Y  d S ry   )rE   r6   r]   rU   rz   rV   r;   )
r+   r7   r3   r4   r{   rl   r5   rs   r|   rt   r    r    r!   track_kafka_commit   s   "z'DataStreamsProcessor.track_kafka_commitc              	   C   sf  g }g }| j  D ]\}}g }g }|| |j D ](\}}|\}	}
}dd |	dD |
||j |j |j d}|| q|j	 D ](\}}dd|j
 d|j dt|j g}|jri|d	|j  |||d
 qI|j D ]$\}}dd|j dt|j g}|jr|d	|j  |||d
 qw||| j||d q	|D ]}| j |= q|S )z!Serialize and update the buckets.c                 S   s   g | ]}t |qS r    )r   rY   ).0tagr    r    r!   
<listcomp>   s    z;DataStreamsProcessor._serialize_buckets.<locals>.<listcomp>rp   )EdgeTagsHash
ParentHashPathwayLatencyEdgeLatencyPayloadSizeztype:kafka_commitzconsumer_group:ztopic:z
partition:zkafka_cluster_id:)TagsValueztype:kafka_produce)StartDurationStatsBacklogs)rV   itemsappendr9   splitr&   to_protor'   r(   r;   r7   r3   strr4   r5   r:   rU   )r+   serialized_bucketsserialized_bucket_keysrt   bucketbucket_aggr_statsbacklogsru   	stat_aggrrk   ri   rj   serialized_bucketconsumer_keyr{   commit_tagsproducer_keyproduce_tagsr|   r    r    r!   _serialize_buckets   sV   



z'DataStreamsProcessor._serialize_bucketsr   c                 C   s   zt | j| j}|d| j|| j | }W n ty(   tj	d| j
dd  w |jdkr5td d S |jdkrJtd|j|j| | j
 d S t	d	tt|| j
 d S )
NPOSTz9failed to submit pathway stats to the Datadog agent at %sTexc_infoi  zHDatadog agent does not support data streams monitoring. Upgrade to 7.34+i  zXfailed to send data stream stats payload, %s (%s) (%s) response from Datadog agent at %szsent %s to %s)r   rQ   rT   requestrR   rX   getresponse	ExceptionlogdebugrS   statuserrorreasonreadr   len)r+   r   connrespr    r    r!   rc      s(   


z!DataStreamsProcessor._flush_statsc                 C   s   | j  |  }W d    n1 sw   Y  |s td d S | j| jd|| jd}tjr6t	
tj|d< tjrAt	
tj|d< tj }rJ||d< t|}t|}z| | W d S  tym   tjd| jdd	 Y d S w )
Nz,No data streams reported. Skipping flushing.rG   )ServiceTracerVersionLangr   HostnameEnvVersionProcessTagszHretry limit exceeded submitting pathway stats to the Datadog agent at %sTr   )r]   r   r   r   r\   rW   rZ   r   envr   rY   versionr   process_tags_listr   r"   rd   r   r   rS   )r+   serialized_statsraw_payloadp_tagsr   
compressedr    r    r!   periodic  s:   


zDataStreamsProcessor.periodicc                 C   s   |    | | d S r)   )r   stop)r+   rA   r    r    r!   shutdown2  s   zDataStreamsProcessor.shutdowndataDataStreamsCtxc              
   C   s   z4t d|d d d }|dd  }t|\}}t|\}}t| |t|d t|d }|| j_|W S  ttt j	fyE   | 
  Y S w )N<Q   r        @@)structunpackr   r   rK   r`   valueEOFError	TypeErrorr   new_pathway)r+   r   ri   pathway_start_mscurrent_edge_start_msctxr    r    r!   decode_pathway6  s   z#DataStreamsProcessor.decode_pathwayc                 C   s>   |s|   S t|tr|d}n|}t|}| |}|S Nutf-8)r   
isinstancer   encodebase64	b64decoder   )r+   r   binary_pathwayencoded_pathwaydata_streams_contextr    r    r!   decode_pathway_b64C  s   


z'DataStreamsProcessor.decode_pathway_b64c                 C   s&   |st   }t| d||}|| j_|S )z
        type: (Optional[int]) -> DataStreamsCtx
        :param now_sec: optional start time of this path. Use for services like Kinesis which
                           we aren't getting path information for.
        r   )timer   r`   r   )r+   rl   r   r    r    r!   r   P  s
   z DataStreamsProcessor.new_pathwayc                 C   sh   |st   }t| jdr| jj}n|  }|| j_d|v r)|t| tt 7 }|j||||d |S )a@  
        type: (list[str], Optional[int], Optional[int]) -> DataStreamsCtx
        :param tags: a list of strings identifying the pathway and direction
        :param now_sec: The time in seconds to count as "now" when computing latencies
        :param payload_size: The size of the payload being sent in bytes
        r   zdirection:out)rl   r(   span)	r   hasattrr`   r   r   r   
encode_b64PROPAGATION_KEY_BASE_64set_checkpoint)r+   tagsrl   r(   r   r   r    r    r!   r   ]  s   
z#DataStreamsProcessor.set_checkpointc                 C   &   t   d }| j|t }||S Ni  )r   rb   
setdefaultr   
try_sampler+   r3   now_mssamplerr    r    r!   try_sample_schemas     
z&DataStreamsProcessor.try_sample_schemac                 C   r   r   )r   rb   r   r   
can_sampler   r    r    r!   can_sample_schemay  r   z&DataStreamsProcessor.can_sample_schemac                 C   s   t ||S r)   )r   
get_schema)r+   schema_nameiteratorr    r    r!   r     r#   zDataStreamsProcessor.get_schema)NNr=   r>   )r   )rx   )ro   Nr)   )Nr   N)r-   r.   r/   r0   r   r   rK   rE   r,   listrw   r}   r~   dictr   bytesrc   r   r   r   r   r   r   r   r   r   r   __classcell__r    r    rg   r!   r<   _   s\    1	

(
		6
 

r<   c                
   @   sb   e Zd Zdededededdf
ddZdefd	d
Zde	fddZ
dd Z					dddZdS )r   	processorri   pathway_start_seccurrent_edge_start_secro   Nc                 C   sR   || _ || _|| _|| _ttt| _	ttj
pd| _
d| _d| _|| _d S )Nnonerx   r   )r   r   r   hashr   rY   r   r[   r
   servicer   previous_directionclosest_opposite_direction_hash%closest_opposite_direction_edge_start)r+   r   ri   r   r   r    r    r!   r,     s   
zDataStreamsCtx.__init__c                 C   s2   t d| jtt| jd  tt| jd  S )Nr   r   )r   packr   r   rE   r   r   r*   r    r    r!   r     s   zDataStreamsCtx.encodec                 C   s    |   }t|}|d}|S r   )r   r   	b64encodedecode)r+   r   r   r   r    r    r!   r     s   

zDataStreamsCtx.encode_b64c                 C   s\   dd }|| j || j tj }|D ]}|||7 }qt|}ttd|td| S )Nc                 S   s   t | ddS )Nr   )encoding)r   )sr    r    r!   	get_bytes  r#   z/DataStreamsCtx._compute_hash.<locals>.get_bytesr   )r   r   r   base_hash_bytesr   r   r   )r+   r   rj   r   bt	node_hashr    r    r!   _compute_hash  s   zDataStreamsCtx._compute_hashr   c              	   C   s   |st   }t|}d}|D ]}|dr|} nq|| jkr4| j| _| jdkr/|| _|| _n| j| _n
|| _| j| _|| _|rC|| _|rH|| _| j}	| 	||	}
|r[|
dt|
 t|| j d}t|| j d}|
| _|| _| jj|
|	|||||d dS )a  
        type: (list[str], float, float, float) -> None

        :param tags: an list of tags identifying the pathway and direction
        :param now_sec: The time in seconds to count as "now" when computing latencies
        :param edge_start_sec_override: Use this to override the starting time of an edge
        :param pathway_start_sec_override: Use this to override the starting time of a pathway
        rx   z
direction:r   zpathway.hashg        )r(   N)r   sorted
startswithr   r   r   r   r   r   r  _set_tag_strr   rz   r   rw   )r+   r   rl   edge_start_sec_overridepathway_start_sec_overrider(   r   	directionr  rj   ri   rm   pathway_latency_secr    r    r!   r     sB   




zDataStreamsCtx.set_checkpoint)NNNr   N)r-   r.   r/   r<   rE   rK   r,   r   r   r   r   r  r   r    r    r    r!   r     s*    
r   c                   @   sD   e Zd ZdZedededdfddZedededefd	d
Z	dS )DsmPathwayCodecz
    DsmPathwayCodec is responsible for:
        - encoding and injecting DSM pathway context into produced message headers
        - extracting and decoding DSM pathway context from consumed message headers
    r   carrierro   Nc                 C   s(   t | tr
| r
| jsd S |  |t< d S r)   )r   r   r   r   r   )r   r  r    r    r!   r     s   zDsmPathwayCodec.encodedata_streams_processorc                 C   s   | s|  S d }t| v r|| t }n%t| v r9|| t }|jdkr9z	|| t }W n ty8   d }Y nw |s?|  S |S )Nr   )r   r   r   PROPAGATION_KEYr   r   r   )r  r  r   r    r    r!   r     s    
zDsmPathwayCodec.decode)
r-   r.   r/   r0   staticmethodr   r   r   r<   r   r    r    r    r!   r    s    r  c              
   C   s\   z|  t W d S  ty- } ztjr"tdt| W Y d }~d S W Y d }~d S d }~ww )Nz-Failed to shutdown data streams processor: %s)r   SHUTDOWN_TIMEOUTr   r   _data_streams_enabledr   warningrepr)rJ   er    r    r!   re   	  s   re   r)   )Ir   collectionsr   	functoolsr   r   rL   r   r^   r   typingr   r   r   ddtrace.internalr   r   ddtrace.internal.atexitr	   ddtrace.internal.constantsr
   ddtrace.internal.nativer    ddtrace.internal.settings._agentr   rO   !ddtrace.internal.settings._configddtrace.internal.threadsr   ddtrace.internal.utils.fnvr   ddtrace.internal.utils.retryr   ddtrace.versionr   	_encodingr   agentr   hostnamer   loggerr   r   r   writerr   r   r   r   schemas.schema_builderr   schemas.schema_samplerr   r"   r-   r   r  r   r  tupler   rE   PathwayAggrKeyobjectr%   r2   r6   r8   r<   r   r  re   r    r    r    r!   <module>   s~   
  &a%