o
    i>                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZ d d
lmZ dd Zzd dlmZ W n eyU   dZY nw zd dlmZ W n eyi   dZY nw e
e ZddiZdedefddZdeeef fddZe di  dd Z!de"dede"e#ef fddZ$dd Z%d d! Z&d"d# Z'd$d% Z(d&d' Z)d(d) Z*d*d+ Z+d,d- Z,d.d/ Z-d0d1 Z.d2d3 Z/dS )4    N)config)unwrap)wrap)get_argument_value)parse_version)LANGGRAPH_ASTREAM_OUTPUT)LANGGRAPH_SPAN_TRACES_ASTREAM)LangGraphIntegration)tracerc                  C   s   ddl m}  t| ddS )Nr   version__version__ )	langgraphr   getattrr    r   \/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/contrib/internal/langgraph/patch.pyget_version   s   r   Pregel)ParentCommandzlanggraph._internal._runnablezlanggraph.utils.runnablemodule_namereturnc                 C   s   t | | S )zhNormalize the module name to the original module name used for langgraph<0.6.0 to avoid breaking changes)LANGGRAPH_MODULE_MAPget)r   r   r   r   _get_module_name'   s   r   c                   C   s   ddiS )Nr   *r   r   r   r   r   _supported_versions,   s   r   r   c                 C   s,   t | dg }|r|d nd}t |dd|fS )zKGets the name of the first step in a RunnableSeq instance as the node name.stepsr   Nname)r   )instancer   
first_stepr   r   r   _get_node_name3   s   r"   argskwargsc                 C   sh   t | \}}|dv rd|fS trt|ts|dkr0t||ddddp#i }d|di d	< d|fS d|fS )
a=  
    Determines if a node should be traced. If the first step is a writing or routing step, or
    the node represents a subgraph, we should not trace it. If the node is a subgraph, mark it
    as such in the config metadata for use in `traced_pregel_loop_tick`.

    Returns a tuple of (should_trace, node_name)
    )_write_routeF	LangGraph   r   T)optionalmetadata_dd.subgraph)r"   LangGraphPregel
isinstancer   r   )r    r#   r$   	node_namer!   r   r   r   r   _should_trace_node:   s   r/   c           
      C   s   t j}t|||\}}|s| |i |S |jdt|j|jj|f dd}d}z8z	| |i |}W n tyN }	 zt	du sBt
|	t	sI|jt    d}	~	ww W |j||||dd |  |S |j||||dd |  w )a  
    Traces an invocation of a RunnableSeq, which represents a node in a graph.
    It represents the sequence containing node invocation (function, graph, callable), the channel write,
    and then any routing logic.

    We utilize `instance.steps` to grab the first step as the node.

    One caveat is that if the node represents a subgraph (LangGraph), we should skip tracing at this step, as
    we will trace the graph invocation separately with `traced_pregel_stream`.
    %s.%s.%sTsubmit_to_llmobsNnoder#   r$   response	operationr   _datadog_integrationr/   tracer   
__module__	__class____name__	ExceptionLangGraphParentCommandErrorr-   set_exc_infosysexc_infollmobs_set_tagsfinish
funcr    r#   r$   integrationshould_tracer.   spanresulter   r   r   traced_runnable_seq_invokeL   s.   
rK   c           
         s   t j}t|||\}}|s| |i |I dH S |jdt|j|jj|f dd}d}z;z| |i |I dH }W n tyU }	 zt	du sIt
|	t	sP|jt    d}	~	ww W |j||||dd |  |S |j||||dd |  w )z,Async version of traced_runnable_seq_invoke.Nr0   Tr1   r3   r4   r7   rD   r   r   r   traced_runnable_seq_ainvoken   s0   
rL   c              
      s   t jt| \}}|s|  i S jdt|j|jj|f ddt	d dz	|  i W n t
yS   jt   j ddd    w  fdd}| S )	z
    This function returns a generator wrapper that yields the results of RunnableSeq.astream(),
    ending the span after the stream is consumed, otherwise following the logic of traced_runnable_seq_ainvoke().
    r0   Tr1   Nr3   r4   c               
     s   d } d }d}	 z*  I d H } |r,z|d u r| n||  }W n ty+   | }d}Y nw | }| V  W nB tyJ   j |dd   Y d S  tyt } ztd u sZt|tsajt	
   j d dd    d }~ww q)NTFr3   r4   )	__anext__	TypeErrorStopAsyncIterationrB   rC   r=   r>   r-   r?   r@   rA   )itemr5   add_supportedrJ   r#   rF   r$   rI   rH   r   r   _astream   s:   
z-traced_runnable_seq_astream.<locals>._astream)r   r8   r/   r9   r   r:   r;   r<   _set_ctx_itemr   r=   r?   r@   rA   rB   rC   )rE   r    r#   r$   rG   r.   rS   r   rR   r   traced_runnable_seq_astream   s(   rU   c                    sT   t j}| |i |I dH }|jr(t }|s|S |tpd}|r(|t| |S )a  
    Modifies the span tracing RunnableSeq.astream() to internally include its final output, as that iterator
    does not yield the final output in versions >=0.3.29. Instead, the final output is aggregated
    and returned as a single value by _consume_aiter().
    NF)	r   r8   llmobs_enabledr
   current_span_get_ctx_itemr   rT   r   )rE   r    r#   r$   rF   outputrH   from_astreamr   r   r   !traced_runnable_seq_consume_aiter   s   r[   c              
         t jt|ddjdt|j|jjf d|dz	|  i W n# tyG   j	t
   j i diddd    w  fd	d
}| S )a  
    Trace the streaming of a Pregel (CompiledGraph) instance.
    This operation represents the parent execution of an individual graph.
    This graph could be standalone, or embedded as a subgraph in a node of a larger graph.
    Under the hood, this graph will `tick` through until all computed tasks are completed.

    Calling `invoke` on a graph calls `stream` under the hood.
    r   r'   r0   Tr2   r    Ngraphr4   c               
   3   s    d } 	 z	t } | V  W nY ty6   t| tr| d n| }j i di|dd   Y d S  tyf } z%td u sFt|tsMjt	
   j i did dd    d }~ww qNTr   r^   r4   )nextStopIterationr-   tuplerB   rC   r=   r>   r?   r@   rA   rP   r5   rJ   r#   rF   r$   r   rI   rH   r   r   _stream   s@   
z%traced_pregel_stream.<locals>._streamr   r8   r   r9   r   r:   r;   r<   r=   r?   r@   rA   rB   rC   )rE   r    r#   r$   rf   r   re   r   traced_pregel_stream   s"   	 rh   c              
      r\   )z&Async version of traced_pregel_stream.r   r'   r0   Tr]   Nr^   r4   c               
     s   d } 	 z  I d H } | V  W nY ty9   t| tr | d n| }j i di|dd   Y d S  tyi } z%td u sIt|tsPjt	
   j i did dd    d }~ww qr_   )rM   rO   r-   rc   rB   rC   r=   r>   r?   r@   rA   rd   re   r   r   rS   $  s@   
z'traced_pregel_astream.<locals>._astreamrg   )rE   r    r#   r$   rS   r   re   r   traced_pregel_astream  s"    ri   c                 C   s&   t j}| |i |}|||| |S )N)r   r8   llmobs_handle_agent_manifest)rE   r    r#   r$   rF   agentr   r   r   patched_create_react_agentE  s   rl   c           	      C   sp   t j}|js| |i |S t|di }| |i |}t|di }t|di di dd}||||| |S )zPNo tracing is done, and processing only happens if LLM Observability is enabled.tasksr   r*   r+   F)r   r8   rV   r   r   llmobs_handle_pregel_loop_tick)	rE   r    r#   r$   rF   finished_tasksrI   
next_tasksis_subgraph_noder   r   r   patched_pregel_loop_tickN  s   rr   c               	   C   sr   t tdd} | stt z ddlm} t |dd}|s*t|dt t|dd W d S W d S  ttfy8   Y d S w )N_datadog_patchFr   prebuiltcreate_react_agentT)	r   r   _patch_graph_modulesru   r   rl   setattrImportErrorAttributeError)graph_patchedru   prebuilt_patchedr   r   r   patch\  s   r}   c                 C   s   d| _ ttjd}|| _ddlm} tdk r#ddlm	} ddl
m} nddlm} ddlm	} t|dt t|d	t t|d
t t|dt t|d
t t|dt tdkrotdk ret| jjdt d S t| jjdt d S d S )NT)integration_configr   r   r      r   
PregelLoopRunnableSeqinvokeainvokeastreamstreamtickr         _consume_aiter)rs   r	   r   r   r8   langgraph.pregelr   LANGGRAPH_VERSIONlanggraph.pregel.loopr   langgraph.utils.runnabler   langgraph._internal._runnablelanggraph.pregel._loopr   rK   rL   rU   rh   ri   rr   utilsrunnabler[   	_internal	_runnable)r   rF   r   r   r   r   r   r   rw   r  s(   rw   c                  C   s  t tddrldt_ddlm}  ddlm} tdk r&ddlm} ddl	m
} nddlm
} ddlm} t|d	 t|d
 t|d t|d t|d t|d tdkrgtdk r`ttjjd nttjjd ttd ttdrt tjddrdtj_t| d d S d S d S )Nrs   Fr   rt   r   r   r   r   r   r   r   r   r   r   r   r8   ru   rv   )r   r   rs   ru   r   r   r   r   r   r   r   r   r   r   r   r   r   r   delattrhasattr)ru   r   r   r   r   r   r   unpatch  s0   






r   )0r@   r   ddtracer   ddtrace.contrib.trace_utilsr   r   ddtrace.internal.utilsr   ddtrace.internal.utils.versionr   &ddtrace.llmobs._integrations.constantsr   r   &ddtrace.llmobs._integrations.langgraphr	   ddtrace.tracer
   r   r   r   r,   ry   langgraph.errorsr   r>   r   r   strr   dictr   _addr"   rc   boolr/   rK   rL   rU   r[   rh   ri   rl   rr   r}   rw   r   r   r   r   r   <module>   sR    
"<:3	