o
    Á¿i&  ã                   @   s
  U d dl Z d dlmZ d dlmZ d dlZd dlZd dlZd dlm	Z	 d dlm
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ ddlmZ ddlmZ ddlm Z  ddlm!Z! ddlm"Z" ddlm#Z# ddlm$Z$ ddlm%Z% ddlm&Z& ddl'm(Z( ee)ƒZ*e			d6de+de+de+de	e+ d e	e
eef  d!e,fd"d#„ƒZ-G d$d%„ d%ƒZ.da/e	e. e0d&< d'e.fd(d)„Z1d*ed'dfd+d,„Z2d7d-e+d.e	e d'dfd/d0„Z3d1ed'dfd2d3„Z4d1ed'dfd4d5„Z5dS )8é    N)Úcontextmanager)Úchain)ÚOptional)ÚUnion)ÚJobInfo)Úconfig)Útracer)ÚContext)ÚSpan)Ú	ERROR_MSG)Ú	SPAN_KIND)ÚSpanKind)Ú
get_logger)ÚLocké   )ÚDD_PARTIAL_VERSION)ÚDD_WAS_LONG_RUNNING)ÚRAY_COMPONENT)ÚRAY_JOB_MESSAGE)ÚRAY_JOB_STATUS)ÚRAY_STATUS_FAILED)ÚRAY_STATUS_FINISHED)ÚRAY_STATUS_RUNNING)ÚRAY_SUBMISSION_ID_TAG)Ú!_inject_ray_span_tags_and_metricsTÚ	span_nameÚserviceÚ	span_typeÚresourceÚchild_ofÚactivatec              	   c   sž    t j| |||||d9}| ttj¡ t|ƒ t|ƒ zz|V  W n ty1   |j	t
 ¡ Ž  ‚ w W t|ƒ nt|ƒ w W d  ƒ dS 1 sHw   Y  dS )zNContext manager that handles Ray span creation and long-running span lifecycle)Únamer   r   r   r   r    N)r   Ú
start_spanÚ_set_tag_strr   r   ÚCONSUMERr   Ústart_long_running_spanÚBaseExceptionÚset_exc_infoÚsysÚexc_infoÚstop_long_running_span)r   r   r   r   r   r    Úspan© r,   ú]/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/ray/span_manager.pyÚlong_running_ray_span%   s"   €
ÿ
þÿ"ór.   c                   @   sÜ   e Zd Zddd„Zdedee fdd„Zddd	„Zdeddfd
d„Z	dede
ddfdd„Zdedefdd„Zdeddfdd„Zd dedee ddfdd„Zdeddfdd„Zdeddfdd„Zdedee ddfdd„ZdS )!ÚRaySpanManagerÚreturnNc                 C   sV   i | _ i | _i | _tƒ | _d| _z	t | j¡ W d S  t	y*   t
jddd Y d S w )NFz(SpanManager initializing during shutdownT)r)   )Ú_timersÚ
_job_spansÚ_root_spansr   Ú_lockÚ_is_shutting_downÚatexitÚregisterÚcleanup_on_exitr&   ÚlogÚdebug)Úselfr,   r,   r-   Ú__init__@   s   þzRaySpanManager.__init__r+   c                 C   s
   |  t¡S ©N)Úget_tagr   )r;   r+   r,   r,   r-   Ú_get_submission_idP   s   
z!RaySpanManager._get_submission_idc                 C   s¨   | j = d| _t| j ¡ ƒD ]\}}| ¡  qg }| j ¡ D ]}| t| ¡ ƒ¡ q| j 	¡  | j 	¡  | j
 	¡  W d  ƒ n1 sCw   Y  |D ]}|  |¡ qJdS )z.Clean up all resources when the process exits.TN)r4   r5   Úlistr1   ÚitemsÚcancelr2   ÚvaluesÚextendÚclearr3   Ú_finish_span)r;   Ú_ÚtimerÚspans_to_closeÚ
spans_dictr+   r,   r,   r-   r8   S   s   


óÿzRaySpanManager.cleanup_on_exitc                 C   sr  t  ¡ }| t¡d u r| t|¡ | tt¡ |  |¡}| tt¡ | t|¡ | 	¡  t
j}g }|jB |j|jv ro|j|j }g }g }|jD ]}|jrY|j|jkrY| |¡ qH| |¡ qH||jd d …< | jt|ƒ8  _W d   ƒ n1 syw   Y  |g| }	z"|	}
t|j|j|j|j|jgƒD ]}| |
¡pœg }
|
s£ W d S q”W n ty°   |	}
Y nw |j |
¡ d S r=   )ÚtimeÚtime_nsÚ
get_metricr   Ú
set_metricr#   r   r   Ú_recreate_job_spanÚfinishr   Ú_span_aggregatorr4   Útrace_idÚ_tracesÚspansÚfinishedÚspan_idÚappendÚnum_finishedÚlenr   Údd_processorsÚuser_processorsÚsampling_processorÚtags_processorÚservice_name_processorÚprocess_traceÚ	ExceptionÚwriterÚwrite)r;   r+   Úpartial_versionÚpartial_spanÚ
aggregatorÚfinished_spansÚtraceÚremaining_spansÚsÚspans_to_writerT   Útpr,   r,   r-   Ú_emit_partial_spang   sP   

€ô
ýÿúÿz!RaySpanManager._emit_partial_spanÚsubmission_idrK   c                 C   s`   | j rdS ztj|| j|gd}d|_|| j|< | ¡  W dS  ty/   | j |d¡ Y dS w )z+This function should be called under a lockN)ÚargsT)	r5   Ú	threadingÚTimerÚ_resubmit_long_running_spansÚdaemonr1   Ústartr`   Úpop)r;   rm   rK   rH   r,   r,   r-   Ú_create_resubmit_timer’   s   
ÿz%RaySpanManager._create_resubmit_timerÚjob_spanc              
   C   sX   t |j|j|j|j|j|j|j|jd}| 	dt
¡ |j|_|j ¡ |_|j ¡ |_|S )N)r!   r   r   r   rR   rV   Ú	parent_idÚcontextÚ	component)r
   r!   r   r   r   rR   rV   rw   rx   r#   r   Ústart_nsÚ_metaÚcopyÚ_metrics)r;   rv   Únew_spanr,   r,   r-   rO   Ÿ   s   ø
z!RaySpanManager._recreate_job_spanc                 C   sˆ   | j rd S | j( || jvr	 W d   ƒ d S |  |ttjƒ¡ t| j|  ¡ ƒ}W d   ƒ n1 s3w   Y  |D ]}|  	|¡ q:d S r=   )
r5   r4   r2   ru   Úfloatr   Ú_long_running_flush_intervalr@   rC   rl   )r;   rm   Ú	job_spansr+   r,   r,   r-   rq   ±   s   
þüÿz+RaySpanManager._resubmit_long_running_spansÚjob_infoc                 C   s|   |  t¡d ur|jt= | td¡ | tt¡ |r8| t|j¡ | 	t
|j¡ t|jƒtkr8d|_| 	t|j¡ | ¡  d S )Nr   )rM   r   r}   rN   r   r#   r   r   ÚstatusÚset_tagr   ÚmessageÚstrr   Úerrorr   rP   )r;   r+   r‚   r,   r,   r-   rF   ¾   s   zRaySpanManager._finish_spanc                 C   sr   |   |¡}| j' || jvri | j|< |  |ttjƒ¡ || j| |j|jf< W d   ƒ d S 1 s2w   Y  d S r=   )	r?   r4   r2   ru   r   r   Ú$_long_running_initial_flush_intervalrR   rV   )r;   r+   rm   r,   r,   r-   Úadd_spanÏ   s   


"ûzRaySpanManager.add_spanÚspan_to_stopc                 C   s¸   |   |¡ |  |¡}|j|jf}| j? | j |¡}|s%	 W d   ƒ d S | |d ¡ |r6	 W d   ƒ d S | j |d ¡}|rC| 	¡  | j |d ¡ W d   ƒ d S 1 sUw   Y  d S r=   )
rF   r?   rR   rV   r4   r2   Úgetrt   r1   rB   )r;   rŠ   rm   Úspan_keyr   rH   r,   r,   r-   r*   Ù   s"   

ýù
"óz%RaySpanManager.stop_long_running_spanc                 C   sl   | j " | j| }| j |d ¡}|r| ¡  | j|= | j|= W d   ƒ n1 s(w   Y  | j||d d S )N)r‚   )r4   r3   r1   rt   rB   r2   rF   )r;   rm   r‚   rv   rH   r,   r,   r-   Ústop_long_running_jobî   s   

ø
z$RaySpanManager.stop_long_running_job)r0   Nr=   )Ú__name__Ú
__module__Ú__qualname__r<   r
   r   r†   r?   r8   rl   r   ru   rO   rq   r   rF   r‰   r*   r   r,   r,   r,   r-   r/   ?   s    

+
r/   Ú_ray_span_managerr0   c                   C   s   t d u rtƒ a t S r=   )r‘   r/   r,   r,   r,   r-   Úget_span_managerÿ   s   r’   rv   c                 C   sL   t ƒ }| | ¡}|j | |j|< W d   ƒ n1 sw   Y  t| ƒ d S r=   )r’   r?   r4   r3   r%   )rv   Úmanagerrm   r,   r,   r-   Ústart_long_running_job  s   
ÿr”   rm   r‚   c                 C   s   t ƒ  | |¡ d S r=   )r’   r   )rm   r‚   r,   r,   r-   r     s   r   r+   c                 C   ó   t ƒ  | ¡ d S r=   )r’   r‰   ©r+   r,   r,   r-   r%     ó   r%   c                 C   r•   r=   )r’   r*   r–   r,   r,   r-   r*     r—   r*   )NNTr=   )6r6   Ú
contextlibr   Ú	itertoolsr   r(   ro   rK   Útypingr   r   Ú ray.dashboard.modules.job.commonr   Úddtracer   r   Úddtrace._trace.contextr	   Úddtrace._trace.spanr
   Úddtrace.constantsr   r   Úddtrace.extr   Úddtrace.internal.loggerr   Úddtrace.internal.threadsr   Ú	constantsr   r   r   r   r   r   r   r   r   Úutilsr   rŽ   r9   r†   Úboolr.   r/   r‘   Ú__annotations__r’   r”   r   r%   r*   r,   r,   r,   r-   Ú<module>   sl   
 úÿþýüûú >
