o
    ciGJ                     @   sh  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZ d dlmZ eeZ G d	d
 d
Z!da"da#de$fddZ%dd Z&dee fddZ'dedefddZ(G dd de)Z*dee
e+f de
fddZ,G dd dZ-edd ded! fd"d#Z.d$e+fd%d&Z/d'ed(ef de+fd)d*Z0d'ed(ef de+fd+d,Z1d-ee+ed(ef f d.ee+ed(ef f fd/d0Z2d-ee+ed(ef f d.ee+ed(ef f de+fd1d2Z3d-ee+ed(ef f d.ee+ed(ef f de+fd3d4Z4d5d6 Z5d7d8 Z6d9d: Z7d;d< Z8d=d> Z9dS )?    N)contextmanager)wraps)	Parameter)
ModuleType)
AnyCallableDict	GeneratorListMutableMappingOptionalSequenceUnioncast)is_class_methodis_function_or_methodis_static_method)get_runtime_contextc                   @   sP   e Zd ZdZh dZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )_OpenTelemetryProxya  
    This proxy makes it possible for tracing to be disabled when opentelemetry
    is not installed on the cluster, but is installed locally.

    The check for `opentelemetry`'s existence must happen where the functions
    are executed because `opentelemetry` may be present where the functions
    are pickled. This can happen when `ray[full]` is installed locally by `ray`
    (no extra dependencies) is installed on the cluster.
    >   traceContextcontext	propagatec                 C   s*   |t jv rt| d|  S td| )N_zAttribute does not exist: )r   allowed_functionsgetattrAttributeError)selfname r   S/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/tracing/tracing_helper.py__getattr__/   s   
z_OpenTelemetryProxy.__getattr__c                 C   
   |  dS )Nzopentelemetry.trace_try_importr   r   r   r    _trace5      
z_OpenTelemetryProxy._tracec                 C   r"   )Nzopentelemetry.contextr#   r%   r   r   r    _context8   r'   z_OpenTelemetryProxy._contextc                 C   r"   )Nzopentelemetry.propagater#   r%   r   r   r    
_propagate;   r'   z_OpenTelemetryProxy._propagatec                 C   s   |   }|r
|jjS d S N)r(   r   r   )r   r   r   r   r    _Context>   s   z_OpenTelemetryProxy._Contextc                 C   s$   |    |   |   |   d S r*   )r&   r(   r)   r+   r%   r   r   r    try_allE   s   z_OpenTelemetryProxy.try_allc                 C   s>   zt |W S  ty   tdd dv rtdY d S w )NRAY_TRACING_ENABLEDFalse)true1zInstall opentelemetry with 'pip install opentelemetry-api==1.0.0rc1' and 'pip install opentelemetry-sdk==1.0.0rc1' to enable tracing. See more at docs.ray.io/tracing.html)	importlibimport_moduleImportErrorosgetenvlower)r   moduler   r   r    r$   K   s   z_OpenTelemetryProxy._try_importN)__name__
__module____qualname____doc__r   r!   r&   r(   r)   r+   r,   r$   r   r   r   r    r   "   s    
r   Freturnc                   C   s   t S )zgChecks environment variable feature flag to see if tracing is turned on.
    Tracing is off by default.)_global_is_tracing_enabledr   r   r   r    _is_tracing_enabled\   s   r>   c                   C   s   da t at  d S )NT)r=   r   _opentelemetryr,   r   r   r   r    _enable_tracingb   s   r@   params_listc                 C   s8   t | D ]\}}|jtjkr| | |  | S q| S )z]Given a list of Parameters, if a kwargs Parameter exists,
    move it to the end of the list.)	enumeratekindr   VAR_KEYWORDappendpop)rA   iparamr   r   r    _sort_params_listi   s   rI   function	new_paramc                    sP   t | }t|j }t fdd|D r|S t| g }|j|d}|S )z/Add additional Parameter to function signature.c                 3   s    | ]	}|j  j kV  qd S r*   )r   ).0rH   rK   r   r    	<genexpr>x   s    z*_add_param_to_signature.<locals>.<genexpr>)
parameters)inspect	signaturelistrO   valuesanyrI   replace)rJ   rK   old_sigold_sig_list_repr
new_paramsnew_sigr   rM   r    _add_param_to_signatures   s   
rZ   c                   @   s   e Zd ZdS )_ImportFromStringErrorN)r8   r9   r:   r   r   r   r    r[      s    r[   
import_strc           	   
   C   s   t | ts| S | d\}}}|r|sd}t|j| dzt|}W n tyB } z|j|kr4|dd}t|j|dd}~ww |}z|	dD ]}t
||}qKW |S  tyg   d}t|j||d	w )
zRGiven a string that is in format "<module>:<attribute>",
    import the attribute.:zEImport string "{import_str}" must be in format"<module>:<attribute>".)r\   Nz'Could not import module "{module_str}".)
module_str.z;Attribute "{attrs_str}" not found in module "{module_str}".)	attrs_strr^   )
isinstancestr	partitionr[   formatr1   r2   r3   r   splitr   r   )	r\   r^   r   r`   messager7   excinstanceattr_strr   r   r    _import_from_string   s8   

rj   c                   @   s<   e Zd Zdeeef fddZdeeef ddfddZdS )	_DictPropagatorr<   c                  C   s   i } t j|  | S )z*Inject trace context into otel propagator.)r?   r   injectcontext_dictr   r   r    inject_current_context   s   z&_DictPropagator.inject_current_contextrn   _opentelemetry.Contextc                 C   s   t tjtj| S )z,Given a trace context, extract as a Context.)r   r?   r   r   extractrm   r   r   r    rq      s   z_DictPropagator.extractN)r8   r9   r:   r   r   ro   rq   r   r   r   r    rk      s    rk   parent_contextrp   )NNNc              	   c   sL    | dur| }nt  }t j|}zdV  W t j| dS t j| w )z(Uses the Ray trace context for the span.N)r?   r   r   attachdetach)rr   new_contexttokenr   r   r    _use_context   s   rw   function_namec                 C   sv   t  }d| tt | | d}tjjj	j
tjjjkr(| }|r(||d< ttjjj	dd}|r9| |d< |S )zXGet the Attributes of the function that will be reported as attributes
    in the trace.rJ   )
ray.remoteray.functionray.pid
ray.job_idray.node_idzray.task_id	worker_idNray.worker_id)r   rb   r4   getpid
get_job_idget_node_idray_privateworkerglobal_workermodeWORKER_MODEget_task_idr   hex)rx   runtime_context	span_argstask_idr~   r   r   r    _function_hydrate_span_args   s   
	r   func.c                 C   
   |  dS )z>Returns the function span name that has span kind of producer. ray.remoter   r   r   r   r    _function_span_producer_name      
r   c                 C   r   )z>Returns the function span name that has span kind of consumer. ray.remote_workerr   r   r   r   r    _function_span_consumer_name   r   r   class_methodc                 C   s   t | r| j} t |r|j}t }d| ||  d| tt | | d}tj	j
jjtj	j
jkr=| }|r=||d< ttj	j
jdd}|rN| |d< |S )zUGet the Attributes of the actor that will be reported as attributes
    in the trace.actorr_   )ry   zray.actor_classzray.actor_methodrz   r{   r|   r}   ray.actor_idr~   Nr   )callabler8   r   rb   r4   r   r   r   r   r   r   r   r   r   get_actor_idr   r   )r   r   r   r   actor_idr~   r   r   r    _actor_hydrate_span_args   s*   
r   c                 C   0   t | ts| j} t |ts|j}|  d| dS )z;Returns the actor span name that has span kind of producer.r_   r   ra   rb   r8   r   r   r   r   r    _actor_span_producer_name  
   

r   c                 C   r   )z;Returns the actor span name that has span kind of consumer.r_   r   r   r   r   r   r    _actor_span_consumer_name  r   r   c                    s<   t  		d	dtdtttf dtdtdtf
 fdd}|S )
zfTrace the execution of a remote task. Inject
    the current span context into kwargs for propagation.Nargskwargs_args_kwargsr<   c                    s   t  r| jr|d urd|vsJ  | ||g|R i |S d|vs#J tjt}|jt| jtjj	j
t| jd t |d<  | ||g|R i |W  d    S 1 sXw   Y  d S )N_ray_trace_ctxrC   
attributes)r>   _is_cross_languager?   r   
get_tracerr8   start_as_current_spanr   _function_nameSpanKindPRODUCERr   rk   ro   )r   r   r   r   r   tracerr   r   r    _invocation_remote_span)  s   
$z9_tracing_task_invocation.<locals>._invocation_remote_spanNN)r   r   r   )r   r   r   r   r    _tracing_task_invocation%  s   
r   c              
      sb   t  s S t tjdtjjdd _t dddtdtt	t
tf  dtdtf fdd	}|S )
zWrap the function argument passed to RemoteFunction's __init__ so that
    future execution of that function will include tracing.
    Use the provided trace context from kwargs.
    r   Ndefaultr   r   r   r<   c              	      s   | d u r |i |S t jt} jd  j }tt| 6 |jt	|t jj
jt|d  |i |W  d    W  d    S 1 sIw   Y  W d    d S 1 sYw   Y  d S )Nr_   r   )r?   r   r   r8   r9   rw   rk   rq   r   r   r   CONSUMERr   )r   r   r   r   rx   rJ   r   r    _function_with_tracingU  s   Rz=_inject_tracing_into_function.<locals>._function_with_tracing)r>   rZ   rP   r   KEYWORD_ONLY__signature__r   r   r   r   rb   )rJ   r   r   r   r    _inject_tracing_into_functionF  s(   
r   c              
      s:   t  t dfdtdtttf dtdtf fdd}|S )z`Trace the creation of an actor. Inject
    the current span context into kwargs for propagation.Nr   r   r   r   c           
         s   |d u ri }t  sd|vsJ  | ||g|R i |S | jj}d}d|vs(J tjt}|jt||tjj	j
t||d&}t |d<  | ||g|R i |}	|d|	j  |	W  d    S 1 shw   Y  d S )Nr   __init__r   rC   r   r   )r>   __ray_metadata__
class_namer?   r   r   r8   r   r   r   r   r   rk   ro   set_attribute_ray_actor_idr   )
r   r   r   r   r   r   method_namer   spanresultr   r   r    #_invocation_actor_class_remote_spanr  s(   $zD_tracing_actor_creation.<locals>._invocation_actor_class_remote_span)r   tupler   r   )r   r   r   r   r    _tracing_actor_creationn  s   
!r   c                    s@   t  		d	dtt dtttf dtdtdtf
 fdd}|S )
z(Trace the invocation of an actor method.Nr   r   r   r   r<   c           	         s   t  r| jjr|d urd|vsJ  | ||g|R i |S | jjj}| j}d|vs,J tjt	}|j
t||tjjjt||d%}t |d< |d| jj   | ||g|R i |W  d    S 1 skw   Y  d S )Nr   r   r   )r>   _actor_ray_is_cross_language'_ray_actor_creation_function_descriptorr   _method_namer?   r   r   r8   r   r   r   r   r   rk   ro   r   r   r   )	r   r   r   r   r   r   r   r   r   r   r   r    _start_span  s$   	
$z5_tracing_actor_method_invocation.<locals>._start_spanr   )r   r   r   r   )r   r   r   r   r     _tracing_actor_method_invocation  s   
r   c                 C   s   dt dtf dtfdd}dt dtf dtfdd}t| t}|D ]H\}}t| |s/t|r0q"t|s:t|r;q"|dkr@q"t	|tj
d	tj
jd
d|_t|r\t|||}nt|||}t| || q"| S )zZGiven a class that will be made into an actor,
    inject tracing into all of the methods.r   .r<   c                    8   d ddt dt dtttt f  dt dt f
 fdd}|S )	Nr   r   r   r   r   r<   c             	      s   t  r|du r | g|R i |S tjt}tt|@ |jt	| j
j tjjjt| j
j d  | g|R i |W  d   W  d   S 1 sRw   Y  W d   dS 1 sbw   Y  dS zr
            Wrap the user's function with a function that
            will extract the trace context
            Nr   r>   r?   r   r   r8   rw   rk   rq   r   r   	__class__r   r   r   r   r   r   r   r   r   r   r    _resume_span  s    RzF_inject_tracing_into_class.<locals>.span_wrapper.<locals>._resume_spanr   r   r   rb   r   r   r   r   r    span_wrapper     z0_inject_tracing_into_class.<locals>.span_wrapperc                    r   )	Nr   r   r   r   r   r<   c             	      s   t  r|du r | g|R i |I dH S tjt}tt|E |jt	| j
j jtjjjt| j
j jd  | g|R i |I dH W  d   W  d   S 1 s[w   Y  W d   dS 1 skw   Y  dS r   r   r   r   r   r    r     s"   
	RzL_inject_tracing_into_class.<locals>.async_span_wrapper.<locals>._resume_spanr   r   r   r   r    async_span_wrapper  r   z6_inject_tracing_into_class.<locals>.async_span_wrapper__del__r   Nr   )r   r   rP   
getmembersr   r   r   isgeneratorfunctionisasyncgenfunctionrZ   r   r   r   iscoroutinefunctionr   setattr)_clsr   r   methodsr   r   wrapped_methodr   r   r    _inject_tracing_into_class  s*     


r   ):r1   rP   loggingr4   
contextlibr   	functoolsr   r   typesr   typingr   r   r   r	   r
   r   r   r   r   r   r   ray._private.workerray._private.inspect_utilr   r   r   ray.runtime_contextr   	getLoggerr8   loggerr   r=   r?   boolr>   r@   rI   rZ   	Exceptionr[   rb   rj   rk   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    <module>   sr    0
6
"
$

!()%