o
    i                     @   s:  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dlm
Z
 d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ ee Z!d ddZ"dd Z#d ddZ$d ddZ%dd Z&dS )!    N)signals)config)_DD_PIN_NAME)Pin)_SPAN_MEASURED_KEY)	SPAN_KIND)trace_utils)trace_after_publish)trace_before_publish)trace_failure)trace_postrun)trace_prerun)trace_retry)SpanKind)	SpanTypes)core)ddtrace_after_in_parent)ddtrace_before_fork)
get_logger)tracerc              	   C   s   t | ddrdS d| _|pttjd tjd}||  tddttjd	d
d  tddttjd |tj	j
 tddttjd tddt tjjtdd tjjtdd tjjtdd tjjtdd tjjtdd tjjtdd | S )z\Attach the Pin class to the application and connect
    our handlers to Celery signals.
    __datadog_patchFNTworker_service_name)service_configzcelery.beatzScheduler.apply_entryapply_entryc                 S   s
   | d j S )Nr   )name)args r   W/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/celery/app.py<lambda>0   s   
 zpatch_app.<locals>.<lambda>zScheduler.ticktickzcelery.app.taskzTask.apply_asyncapply_asynczcelery.platformsclose_open_fds)weak)getattrr   r   r   celeryontor   wrap_traced_beat_functionbeat	Scheduler_traced_apply_async_function_patched_close_open_fdsr   task_prerunconnectr   task_postrunr   before_task_publishr
   after_task_publishr	   task_failurer   
task_retryr   apppinr   r   r   	patch_app   s0   
r7   c                 C   s   t | ddsdS d| _t| }|durt| t ttj	j
d ttj	j
d ttjjjd ttjd tjt tjt tjt tjt tjt tjt dS )zoRemove the Pin instance from the application and disconnect
    our handlers from Celery signal framework.
    r   FNr   r    r!   r"   )r$   r   r   get_fromdelattrr   r   unwrapr%   r)   r*   r5   taskTask	platformsr   r-   
disconnectr   r/   r   r0   r
   r1   r	   r2   r   r3   r   r4   r   r   r   unpatch_appH   s    

r?   c                    s    fdd}|S )Nc                    s   t |}|r| s| |i |S tjd tjt	|d#}r*||_
|ttj |td | |i |W  d    S 1 sHw   Y  d S )Nzcelery.beat.{})	span_typer      )r   r8   enabledr   traceformatr   WORKERr   ext_serviceresource_set_tag_strr   r   PRODUCER
set_metricr   )funcinstancer   kwargsr6   spanfn_nameintegration_configresource_fnr   r   _traced_beat_innerb   s   


$z1_traced_beat_function.<locals>._traced_beat_innerr   )rQ   rP   rR   rS   r   rO   r   r(   a   s   r(   c                 C   s   dd }|S )ao  
    When apply_async is called, it calls various Celery signals in order, which gets used
    to start and close the span.
    Example: before_task_publish starts the span while after_task_publish closes the span.
    If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the
    closing signals.
    The purpose of _traced_apply_async_function is to close the spans even if one of the closing
    signals don't get called over the course of the apply_task lifecycle.
    This is done by fetching the stored span and closing it if it hasn't already been closed by a
    closing signal.
    c                 S   s   t dM z8z!| |i |W W t d}|r!td| |  W  d    S  ty>   t d}|r=|jt	    w t d}|rQtd| |  w w 1 sUw   Y  d S )Ntask_context	task_spanzJThe after_task_publish signal was not called, so manually closing span: %r)
r   context_with_data	find_itemlogdebugfinish	Exceptionset_exc_infosysexc_info)rK   rL   r   rM   rU   r   r   r   _traced_apply_async_inner   s4   



	
z?_traced_apply_async_function.<locals>._traced_apply_async_innerr   )rQ   rP   rR   r_   r   r   r   r+   w   s   r+   c              	   C   sH   t d t  z| |i |}W t  t d |S t  t d w )z
    Celery closes all open file descriptors to isolate some fork child.
    This causes the native runtime to panic because it expects to have a valid fd.
    We call fork hook to avoid panics when the native runtime interacts with closed fds.
    z/Shutting down native runtime before closing fdsz+Restarting native runtime after closing fds)rX   rY   r   r   )rK   rL   r   rM   resultr   r   r   r,      s   

r,   )N)'r]   r%   r   ddtracer   ddtrace._trace.pinr   r   ddtrace.constantsr   r   ddtrace.contribr   'ddtrace.contrib.internal.celery.signalsr	   r
   r   r   r   r   ddtrace.extr   r   ddtrace.internalr   ddtrace.internal.forksafer   r   ddtrace.internal.loggerr   ddtrace.tracer   __name__rX   r7   r?   r(   r+   r,   r   r   r   r   <module>   s8    
*

%