o
    i                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 ddl
mZ ddl
mZ eZeg d	Zd
ededefddZde	deeef ddfddZd!ddZd!ddZd!ddZd!ddZd!ddZdd  ZdS )"    )Any)WeakValueDictionary)set_flattened_tags)HTTPPropagator)Span   )CTX_KEY)SPAN_KEY))compressionzcelery.compression)correlation_idzcelery.correlation_id)	countdownzcelery.countdown)delivery_infozcelery.delivery_info)etaz
celery.eta)exchangezcelery.exchange)expireszcelery.expires)hostnamecelery.hostname)idz	celery.id)priorityzcelery.priority)queuezcelery.queue)reply_tozcelery.reply_to)retrieszcelery.retries)routing_keyzcelery.routing_key)
serializerzcelery.serializer)	timelimitzcelery.timelimit)originr   )statezcelery.statekeyvaluereturnc                 C   sJ   |d u s|dkr
dS | dkrt dd |D rdS | dkr#|dkr#dS dS )	N Tr   c                 s   s    | ]}|d u V  qd S )N ).0_r!   r!   Y/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/celery/utils.py	<genexpr>0   s    z,should_skip_context_value.<locals>.<genexpr>r   r   F)all)r   r   r!   r!   r$   should_skip_context_value)   s   r'   spancontextNc                 C   sD   g }t D ]\}}||}t||rq|||f qt| | dS )z3Helper to extract meta values from a Celery ContextN)TAG_KEYSgetr'   appendr   )r(   r)   context_tagsr   tag_namer   r!   r!   r$   set_tags_from_context:   s   

r/   Fc                 C   sJ   i }t |j| t| td }|d u rt }t| t| ||||df< d S )Ndistributed_context)
propagatorinjectr)   getattrr   dictsetattr)tasktask_idr(   
is_publishtrace_headerscontext_dictr!   r!   r$   attach_span_contextH   s   r;   c                 C   s(   t | td}|du rdS |||dfS )HHelper to retrieve an active `Span` stored in a `Task`
    instance
    Nr0   )r3   r   r+   )r6   r7   r8   r:   r!   r!   r$   retrieve_span_contextU   s   r=   c                 C   s6   t | td}|du rt }t| t| ||||f< dS )a  Helper to propagate a `Span` for the given `Task` instance. This
    function uses a `WeakValueDictionary` that stores a Datadog Span using
    the `(task_id, is_publish)` as a key. This is useful when information must be
    propagated from one Celery signal to another.

    DEV: We use (task_id, is_publish) for the key to ensure that publishing a
         task from within another task does not cause any conflicts.

         This mostly happens when either a task fails and a retry policy is in place,
         or when a task is manually retried (e.g. `task.retry()`), we end up trying
         to publish a task with the same id as the task currently running.

         Previously publishing the new task would overwrite the existing `celery.run` span
         in the `weak_dict` causing that span to be forgotten and never finished.

         NOTE: We cannot test for this well yet, because we do not run a celery worker,
         and cannot run `task.apply_async()`
    N)r3   r	   r   r5   )r6   r7   r(   r8   	weak_dictr!   r!   r$   attach_spana   s
   r?   c                 C   s>   t | td}|du rdS z|||f= W dS  ty   Y dS w )zHelper to remove a `Span` in a Celery task when it's propagated.
    This function handles tasks where the `Span` is not attached.
    N)r3   r	   KeyErrorr6   r7   r8   r>   r!   r!   r$   detach_span|   s   rB   c                 C   s&   t | td}|du rdS |||fS )r<   N)r3   r	   r+   rA   r!   r!   r$   retrieve_span   s   rC   c                 C   sH   |  d}|  d}|rd|v r| dS |r d|v r"| dS dS dS )zHelper to retrieve the `Task` identifier from the message `body`.
    This helper supports Protocol Version 1 and 2. The Protocol is well
    detailed in the official documentation:
    http://docs.celeryproject.org/en/latest/internals/protocol.html
    headersbodyr   N)r+   )r)   rD   rE   r!   r!   r$   retrieve_task_id   s   



rF   )F)typingr   weakrefr   $ddtrace.contrib.internal.trace_utilsr   ddtrace.propagation.httpr   ddtrace.tracer   	constantsr   r	   r1   	frozensetr*   strboolr'   r4   r/   r;   r=   r?   rB   rC   rF   r!   r!   r!   r$   <module>   s&    




