o
    i*                     @   s   d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ eeZd
adadadadd Zdd Zdd Zdd Zdd Zdd Zg dZdd ZdS )a  IAST (Interactive Application Security Testing) analyzes code for security vulnerabilities.

To add new vulnerabilities analyzers (Taint sink) we should update `IAST_PATCH` in
`ddtrace/appsec/iast/_patch_modules.py`

Create new file with the same name: `ddtrace/appsec/iast/taint_sinks/[my_new_vulnerability].py`

Then, implement the `patch()` function and its wrappers.

In order to have the better performance, the Overhead control engine (OCE) helps us to control the overhead of our
wrapped functions. We should create a class that inherit from `ddtrace.appsec._iast.taint_sinks._base.VulnerabilityBase`
and register with `ddtrace.appsec._iast.oce`.

@oce.register
class MyVulnerability(VulnerabilityBase):
    vulnerability_type = "MyVulnerability"
    evidence_type = "kind_of_Vulnerability"

Before that, we should decorate our wrappers with `wrap` method and
report the vulnerabilities with `report` method. OCE will manage the number of requests, number of vulnerabilities
to reduce the overhead.

@WeakHash.wrap
def wrapped_function(wrapped, instance, args, kwargs):
    WeakHash.report(
        evidence_value=evidence,
    )
    return wrapped(*args, **kwargs)
    N)forksafe)
get_logger)ModuleWatchdog)config   )iast_listen)oceTFc               
   C   s   t jsdS z>ddlm}  ddlm} tr$td | d dt _W dS | s4td | d W dS td | d dt _W dS  t	y^ } ztjd	|d
d W Y d}~dS d}~ww )u  
    Handle IAST state after fork to prevent segmentation faults.

    This fork handler works in conjunction with the C++ pthread_atfork handler
    (registered in native.cpp) to provide complete fork-safety:

    **C++ pthread_atfork handler (automatic, runs first):**
    - Clears all taint maps (removes stale PyObject pointers from parent)
    - Resets taint_engine_context (recreates context array with fresh state)
    - Resets initializer (recreates memory pools)
    - Happens automatically for ALL forks before this Python handler runs

    **Python fork handler (this function, runs second):**
    - Detects fork type (early vs late)
    - Manages Python-level IAST_CONTEXT
    - Conditionally disables IAST for late forks (multiprocessing)

    Fork types:
    1. **Early forks (web framework workers)**: Gunicorn, uvicorn, etc. fork BEFORE
       IAST has active request contexts. Native state was reset by pthread_atfork.
       IAST remains enabled and works correctly in workers.

    2. **Late forks (multiprocessing)**: fork AFTER IAST has active contexts.
       Native state was reset by pthread_atfork, but we disable IAST in these
       processes for multiprocessing compatibility and to avoid confusion.

    Detection logic:
    - If is_iast_request_enabled() → Late fork → Disable IAST
    - If not is_iast_request_enabled() → Early fork → Keep IAST enabled

    This prevents segmentation faults in ALL fork scenarios while maintaining
    IAST coverage in web framework workers.
    Nr   )IAST_CONTEXT)is_iast_request_enabledzHIAST fork handler: Pytest mode detected, disabling IAST in child processFz~IAST fork handler: No active context (early fork/web worker). Native state auto-reset by pthread_atfork. IAST remains enabled.zIAST fork handler: Active context (late fork/multiprocessing). Native state auto-reset by pthread_atfork. Disabling IAST in child.zError in IAST fork handler: %sTexc_info)

asm_config_iast_enabled/ddtrace.appsec._iast._iast_request_context_baser	   r
   _iast_in_pytest_modelogdebugset	Exception)r	   r
   e r   Q/home/ubuntu/.local/lib/python3.10/site-packages/ddtrace/appsec/_iast/__init__.py_disable_iast_after_fork4   s2   "




r   c                   C   s.   t stjrtt da td dS dS dS )z
    Register the fork handler if IAST is enabled and it hasn't been registered yet.

    This is called during IAST initialization to ensure the fork handler is only
    registered once and only when IAST is actually being used.
    Tz#IAST fork safety handler registeredN)_fork_handler_registeredr   r   r   registerr   r   r   r   r   r   r   _register_fork_handler   s
   
	
r   c                  C   s   t jsdS ddl} ddlm} |  jjd }tj	| }z||\}}W n t
y5   tjddd Y dS w |s?td	 dS t||d
}t|}|j  |j|j t||j dS )ap  
    Patch the code inside the Flask main app source code file (typically "app.py") so
    Runtime Code Analysis (IAST) works also for the functions and methods defined inside it.
    This must be called on the top level or inside the `if __name__ == "__main__"`
    and must be before the `app.run()` call. It also requires `DD_IAST_ENABLED` to be
    activated.
    Nr   r   )astpatch_module__name__z'Unexpected exception while AST patchingTr   z9Main flask module not patched, probably it was not neededexec)r   r   inspect_ast.ast_patchingr   currentframef_back	f_globalssysmodulesr   r   r   compiletypes
ModuleType__dict__clearupdater   )r   r   module_namemodulemodule_pathpatched_astcompiled_code
new_moduler   r   r   ddtrace_iast_flask_patch   s(   



r2   c                  C   s`   t jr.ddlm}  ddlm} ddlm} trdS t	d t
| | da|  t  dS dS )z+Add IAST AST patching in the ModuleWatchdogr   _should_iast_patch_exec_iast_patched_module)initialize_native_stateNz$iast::instrumentation::starting IASTT)r   _iast_propagation_enabled&ddtrace.appsec._iast._ast.ast_patchingr4   ddtrace.appsec._iast._loaderr6   $ddtrace.appsec._iast._taint_trackingr7   r   r   r   register_pre_exec_module_hookr   )r4   r6   r7   r   r   r   enable_iast_propagation   s   

r=   c                   C   sn   t jsdS datjdpdtjd< tjdpdtjd< tjdp$dtjd< d	t _d
t _dt _t	
  dS )a  Configure IAST settings for pytest execution.

    This function sets up IAST configuration but does NOT create a request context.
    Request contexts should be created per-test or per-request to avoid threading issues.

    Also sets a global flag to indicate we're in pytest mode, which ensures IAST is
    disabled in forked child processes to prevent segfaults when tests use multiprocessing.
    NTDD_IAST_REQUEST_SAMPLINGz100.0 _DD_APPSEC_DEDUPLICATION_ENABLEDfalse#DD_IAST_VULNERABILITIES_PER_REQUEST1000g      Y@Fi  )r   r   r   osenvironget_iast_request_sampling_deduplication_enabled&_iast_max_vulnerabilities_per_requestsr   reconfigurer   r   r   r   _iast_pytest_activation   s   rJ   c                  C   sZ   ddl m}  ddlm} tsdS zt| | W dadS  ty,   t	d Y dadS w )zKRemove IAST AST patching from the ModuleWatchdog. Only for testing proposesr   r3   r5   Nz;IAST is already disabled and it's not in the ModuleWatchdogF)
r9   r4   r:   r6   r8   r   remove_pre_exec_module_hookKeyErrorr   warning)r4   r6   r   r   r   disable_iast_propagation   s   rN   )r2   r=   rN   c                   C   s   t rt  t  da dS dS )z&Lazily load the iast module listeners.FN)_IAST_TO_BE_LOADEDr   r   r   r   r   r   	load_iast
  s
   rP   )__doc__rC   r$   r'   ddtrace.internalr   ddtrace.internal.loggerr   ddtrace.internal.moduler   ddtrace.internal.settings.asmr   r   	_listenerr   _overhead_control_enginer   r   r   rO   r8   r   r   r   r   r2   r=   rJ   rN   __all__rP   r   r   r   r   <module>   s.    V(