o
    `۷i                     @   sh   d Z ddlmZmZmZmZmZ erddlmZ ded deeg df  fdd	Z	d
edefddZ
dS )z2Utility functions for expression-based operations.    )TYPE_CHECKINGAnyCallableListOptional)Exprexprsr   returnNc                    s   ddl m} g }| D ]}| }|| ||  q
|s dS i  |D ]}|j }| vr3g  |<  | | q$ fdd}|S )a  Create an init_fn to initialize all callable class UDFs in expressions.

    This function collects all _CallableClassUDF instances from the given expressions,
    groups them by their callable_class_spec key, and returns an init_fn that
    initializes each group at actor startup. UDFs with the same key (same class and
    constructor args) share a single instance to ensure all are properly initialized.

    Args:
        exprs: List of expressions to collect callable class UDFs from.

    Returns:
        An init_fn that initializes all callable class UDFs, or None if there are
        no callable class UDFs in the expressions.
    r   )_CallableClassUDFCollectorNc                     s<      D ]} | d }|  | dd  D ]}|j|_qqd S )Nr      )valuesinit	_instance)udfs_with_same_key	first_udf	other_udfudfs_by_key T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/util/expression_utils.pyinit_fn1   s   
z2create_callable_class_udf_init_fn.<locals>.init_fn)>ray.data._internal.planner.plan_expression.expression_visitorsr
   visitextendget_callable_class_udfscallable_class_specmake_keyappend)r   r
   callable_class_udfsexpr	collectorudfkeyr   r   r   r   !create_callable_class_udf_init_fn	   s    

	r#   instancec                    sf   ddl }ddl}|jr| i S |jr, fdd}|| S  i S )a  Call a UDF instance, bridging from sync context to async if needed.

    This handles the complexity of calling callable class UDF instances that may
    be sync, async coroutine, or async generator functions.

    Args:
        instance: The callable instance to call
        *args: Positional arguments
        **kwargs: Keyword arguments

    Returns:
        The result of calling the instance
    r   Nc                     sp   g }  i 2 z3 d H W }|  | q
6 | sd S t| dkr%| d S dd l}|dt|  d | d S )Nr   r   zAsync generator yielded zp values in expression context; only the last (most recent) is returned. Use map_batches for multi-yield support.)r   lenloggingwarning)resultsitemr'   argsr$   kwargsr   r   _collectX   s   z6_call_udf_instance_with_async_bridge.<locals>._collect)asyncioinspectiscoroutinefunction__call__runisasyncgenfunction)r$   r,   r-   r/   r0   r.   r   r+   r   $_call_udf_instance_with_async_bridge=   s   r5   )__doc__typingr   r   r   r   r   ray.data.expressionsr   r#   r5   r   r   r   r   <module>   s    
4