o
    `۷i6O                     @   sh  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZ d dlmZ eeZ G d	d
 d
Z!da"da#de$fddZ%dd Z&dee fddZ'dedefddZ(G dd de)Z*dee
e+f de
fddZ,G dd dZ-edd ded! fd"d#Z.d$e+fd%d&Z/d'ed(ef de+fd)d*Z0d'ed(ef de+fd+d,Z1d-ee+ed(ef f d.ee+ed(ef f fd/d0Z2d-ee+ed(ef f d.ee+ed(ef f de+fd1d2Z3d-ee+ed(ef f d.ee+ed(ef f de+fd3d4Z4d5d6 Z5d7d8 Z6d9d: Z7d;d< Z8d=d> Z9dS )?    N)contextmanager)wraps)	Parameter)
ModuleType)
AnyCallableDict	GeneratorListMutableMappingOptionalSequenceUnioncast)is_class_methodis_function_or_methodis_static_method)get_runtime_contextc                   @   sP   e Zd ZdZh dZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )_OpenTelemetryProxya  
    This proxy makes it possible for tracing to be disabled when opentelemetry
    is not installed on the cluster, but is installed locally.

    The check for `opentelemetry`'s existence must happen where the functions
    are executed because `opentelemetry` may be present where the functions
    are pickled. This can happen when `ray[full]` is installed locally by `ray`
    (no extra dependencies) is installed on the cluster.
    >   traceContextcontext	propagatec                 C   s*   |t jv rt| d|  S td| )N_zAttribute does not exist: )r   allowed_functionsgetattrAttributeError)selfname r   U/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/util/tracing/tracing_helper.py__getattr__/   s   
z_OpenTelemetryProxy.__getattr__c                 C   
   |  dS )Nzopentelemetry.trace_try_importr   r   r   r    _trace5      
z_OpenTelemetryProxy._tracec                 C   r"   )Nzopentelemetry.contextr#   r%   r   r   r    _context8   r'   z_OpenTelemetryProxy._contextc                 C   r"   )Nzopentelemetry.propagater#   r%   r   r   r    
_propagate;   r'   z_OpenTelemetryProxy._propagatec                 C   s   |   }|r
|jjS d S N)r(   r   r   )r   r   r   r   r    _Context>   s   z_OpenTelemetryProxy._Contextc                 C   s$   |    |   |   |   d S r*   )r&   r(   r)   r+   r%   r   r   r    try_allE   s   z_OpenTelemetryProxy.try_allc                 C   s0   zt |W S  ty   t rtdY d S w )Na  Install OpenTelemetry with 'pip install opentelemetry-api==1.34.1 opentelemetry-sdk==1.34.1 opentelemetry-exporter-otlp==1.34.1' to enable tracing. See the Ray documentation for details: https://docs.ray.io/en/latest/ray-observability/user-guides/ray-tracing.html#installation)	importlibimport_moduleImportError_is_tracing_enabled)r   moduler   r   r    r$   K   s   z_OpenTelemetryProxy._try_importN)__name__
__module____qualname____doc__r   r!   r&   r(   r)   r+   r,   r$   r   r   r   r    r   "   s    
r   Freturnc                   C   s   t S )zgChecks environment variable feature flag to see if tracing is turned on.
    Tracing is off by default.)_global_is_tracing_enabledr   r   r   r    r0   \   s   r0   c                   C   s   da t at  d S )NT)r7   r   _opentelemetryr,   r   r   r   r    _enable_tracingb   s   r9   params_listc                 C   s8   t | D ]\}}|jtjkr| | |  | S q| S )z]Given a list of Parameters, if a kwargs Parameter exists,
    move it to the end of the list.)	enumeratekindr   VAR_KEYWORDappendpop)r:   iparamr   r   r    _sort_params_listi   s   rB   function	new_paramc                    sP   t | }t|j }t fdd|D r|S t| g }|j|d}|S )z/Add additional Parameter to function signature.c                 3   s    | ]	}|j  j kV  qd S r*   )r   ).0rA   rD   r   r    	<genexpr>x   s    z*_add_param_to_signature.<locals>.<genexpr>)
parameters)inspect	signaturelistrH   valuesanyrB   replace)rC   rD   old_sigold_sig_list_repr
new_paramsnew_sigr   rF   r    _add_param_to_signatures   s   
rS   c                   @   s   e Zd ZdS )_ImportFromStringErrorN)r2   r3   r4   r   r   r   r    rT      s    rT   
import_strc           	   
   C   s   t | ts| S | d\}}}|r|sd}t|j| dzt|}W n tyB } z|j|kr4|dd}t|j|dd}~ww |}z|	dD ]}t
||}qKW |S  tyg   d}t|j||d	w )
zRGiven a string that is in format "<module>:<attribute>",
    import the attribute.:zEImport string "{import_str}" must be in format"<module>:<attribute>".)rU   Nz'Could not import module "{module_str}".)
module_str.z;Attribute "{attrs_str}" not found in module "{module_str}".)	attrs_strrW   )
isinstancestr	partitionrT   formatr-   r.   r/   r   splitr   r   )	rU   rW   r   rY   messager1   excinstanceattr_strr   r   r    _import_from_string   s8   

rc   c                   @   s<   e Zd Zdeeef fddZdeeef ddfddZdS )	_DictPropagatorr6   c                  C   s   i } t j|  | S )z*Inject trace context into otel propagator.)r8   r   injectcontext_dictr   r   r    inject_current_context   s   z&_DictPropagator.inject_current_contextrg   _opentelemetry.Contextc                 C   s   t tjtj| S )z,Given a trace context, extract as a Context.)r   r8   r   r   extractrf   r   r   r    rj      s   z_DictPropagator.extractN)r2   r3   r4   r   r   rh   rj   r   r   r   r    rd      s    rd   parent_contextri   )NNNc              	   c   sL    | dur| }nt  }t j|}zdV  W t j| dS t j| w )z(Uses the Ray trace context for the span.N)r8   r   r   attachdetach)rk   new_contexttokenr   r   r    _use_context   s   rp   function_namec                 C   sv   t  }d| tt | | d}tjjj	j
tjjjkr(| }|r(||d< ttjjj	dd}|r9| |d< |S )zXGet the Attributes of the function that will be reported as attributes
    in the trace.rC   )
ray.remoteray.functionray.pid
ray.job_idray.node_idzray.task_id	worker_idNray.worker_id)r   r[   osgetpid
get_job_idget_node_idray_privateworkerglobal_workermodeWORKER_MODEget_task_idr   hex)rq   runtime_context	span_argstask_idrw   r   r   r    _function_hydrate_span_args   s   
	r   func.c                 C   
   |  dS )z>Returns the function span name that has span kind of producer. ray.remoter   r   r   r   r    _function_span_producer_name      
r   c                 C   r   )z>Returns the function span name that has span kind of consumer. ray.remote_workerr   r   r   r   r    _function_span_consumer_name   r   r   class_methodc                 C   s   t | r| j} t |r|j}t }d| ||  d| tt | | d}tj	j
jjtj	j
jkr=| }|r=||d< ttj	j
jdd}|rN| |d< |S )zUGet the Attributes of the actor that will be reported as attributes
    in the trace.actorrX   )rr   zray.actor_classzray.actor_methodrs   rt   ru   rv   ray.actor_idrw   Nrx   )callabler2   r   r[   ry   rz   r{   r|   r}   r~   r   r   r   r   get_actor_idr   r   )r   r   r   r   actor_idrw   r   r   r    _actor_hydrate_span_args   s*   
r   c                 C   0   t | ts| j} t |ts|j}|  d| dS )z;Returns the actor span name that has span kind of producer.rX   r   rZ   r[   r2   r   r   r   r   r    _actor_span_producer_name  
   

r   c                 C   r   )z;Returns the actor span name that has span kind of consumer.rX   r   r   r   r   r   r    _actor_span_consumer_name  r   r   c                    s<   t  		d	dtdtttf dtdtdtf
 fdd}|S )
zfTrace the execution of a remote task. Inject
    the current span context into kwargs for propagation.Nargskwargs_args_kwargsr6   c                    s   t  r| jr|d urd|vsJ  | ||g|R i |S d|vs#J tjt}|jt| jtjj	j
t| jd t |d<  | ||g|R i |W  d    S 1 sXw   Y  d S )N_ray_trace_ctxr<   
attributes)r0   _is_cross_languager8   r   
get_tracerr2   start_as_current_spanr   _function_nameSpanKindPRODUCERr   rd   rh   )r   r   r   r   r   tracerr   r   r    _invocation_remote_span)  s   
$z9_tracing_task_invocation.<locals>._invocation_remote_spanNN)r   r   r   )r   r   r   r   r    _tracing_task_invocation%  s   
r   c              
      sb   t  s S t tjdtjjdd _t dddtdtt	t
tf  dtdtf fdd	}|S )
zWrap the function argument passed to RemoteFunction's __init__ so that
    future execution of that function will include tracing.
    Use the provided trace context from kwargs.
    r   Ndefaultr   r   r   r6   c              	      s   | d u r |i |S t jt} jd  j }tt| 6 |jt	|t jj
jt|d  |i |W  d    W  d    S 1 sIw   Y  W d    d S 1 sYw   Y  d S )NrX   r   )r8   r   r   r2   r3   rp   rd   rj   r   r   r   CONSUMERr   )r   r   r   r   rq   rC   r   r    _function_with_tracingU  s   Rz=_inject_tracing_into_function.<locals>._function_with_tracing)r0   rS   rI   r   KEYWORD_ONLY__signature__r   r   r   r   r[   )rC   r   r   r   r    _inject_tracing_into_functionF  s(   
r   c              
      s:   t  t dfdtdtttf dtdtf fdd}|S )z`Trace the creation of an actor. Inject
    the current span context into kwargs for propagation.Nr   r   r   r   c           
         s   |d u ri }t  sd|vsJ  | ||g|R i |S | jj}d}d|vs(J tjt}|jt||tjj	j
t||d&}t |d<  | ||g|R i |}	|d|	j  |	W  d    S 1 shw   Y  d S )Nr   __init__r   r<   r   r   )r0   __ray_metadata__
class_namer8   r   r   r2   r   r   r   r   r   rd   rh   set_attribute_ray_actor_idr   )
r   r   r   r   r   r   method_namer   spanresultr   r   r    #_invocation_actor_class_remote_spanr  s(   $zD_tracing_actor_creation.<locals>._invocation_actor_class_remote_span)r   tupler   r   )r   r   r   r   r    _tracing_actor_creationn  s   
!r   c                    s@   t  		d	dtt dtttf dtdtdtf
 fdd}|S )
z(Trace the invocation of an actor method.Nr   r   r   r   r6   c           	         s   t  r| jjr|d urd|vsJ  | ||g|R i |S | jjj}| j}d|vs,J tjt	}|j
t||tjjjt||d%}t |d< |d| jj   | ||g|R i |W  d    S 1 skw   Y  d S )Nr   r   r   )r0   _actor_ray_is_cross_language'_ray_actor_creation_function_descriptorr   _method_namer8   r   r   r2   r   r   r   r   r   rd   rh   r   r   r   )	r   r   r   r   r   r   r   r   r   r   r   r    _start_span  s$   	
$z5_tracing_actor_method_invocation.<locals>._start_spanr   )r   r   r   r   )r   r   r   r   r     _tracing_actor_method_invocation  s   
r   c                 C   s   dt dtf dtfdd}dt dtf dtfdd}t| t}|D ]W\}}t| |s/t|r0q"t|s:t|r;q"|dkr@q"t	|}t
|tjd	tjjd
d|_t|ddrZq"t|rht|||}nt|||}d|_t| || q"| S )zZGiven a class that will be made into an actor,
    inject tracing into all of the methods.r   .r6   c                    8   d ddt dt dtttt f  dt dt f
 fdd}|S )	Nr   r   r   r   r   r6   c             	      s   t  r|du r | g|R i |S tjt}tt|@ |jt	| j
j tjjjt| j
j d  | g|R i |W  d   W  d   S 1 sRw   Y  W d   dS 1 sbw   Y  dS zr
            Wrap the user's function with a function that
            will extract the trace context
            Nr   r0   r8   r   r   r2   rp   rd   rj   r   r   	__class__r   r   r   r   r   r   r   r   r   r   r    _resume_span  s    RzF_inject_tracing_into_class.<locals>.span_wrapper.<locals>._resume_spanr   r   r   r[   r   r   r   r   r    span_wrapper     z0_inject_tracing_into_class.<locals>.span_wrapperc                    r   )	Nr   r   r   r   r   r6   c             	      s   t  r|du r | g|R i |I dH S tjt}tt|E |jt	| j
j jtjjjt| j
j jd  | g|R i |I dH W  d   W  d   S 1 s[w   Y  W d   dS 1 skw   Y  dS r   r   r   r   r   r    r     s"   
	RzL_inject_tracing_into_class.<locals>.async_span_wrapper.<locals>._resume_spanr   r   r   r   r    async_span_wrapper  r   z6_inject_tracing_into_class.<locals>.async_span_wrapper__del__r   Nr   __ray_tracing_wrapped__FT)r   r   rI   
getmembersr   r   r   isgeneratorfunctionisasyncgenfunctionunwraprS   r   r   r   r   iscoroutinefunctionr   r   setattr)_clsr   r   methodsr   r   unwrapped_methodwrapped_methodr   r   r    _inject_tracing_into_class  s2     



r   ):r-   rI   loggingry   
contextlibr   	functoolsr   r   typesr   typingr   r   r   r	   r
   r   r   r   r   r   r}   ray._private.workerray._private.inspect_utilr   r   r   ray.runtime_contextr   	getLoggerr2   loggerr   r7   r8   boolr0   r9   rB   rS   	ExceptionrT   r[   rc   rd   rp   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    <module>   sr    0
6
"
$

!()%