o
    `۷i.                     @   sf  d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZm Z m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z=m>Z>m?Z?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZF e	dedZGeeGee eFgef ZHde)dee deFdefddZIde$dee deFdefd d!ZJd"d# ZKd$d% ZLd&d' ZMd(d) ZNde*dee deFdefd*d+ZOde/dee deFfd,d-ZPG d.d/ d/ZQded0eee eHf deeH fd1d2ZRdS )3    N)CallableDictListOptionalTupleTypeTypeVar)	ObjectRef)add_execution_callback)PhysicalOperator)AggregateNumRows)InputDataBuffer)JoinOperator)LimitOperator)OutputSplitter)UnionOperator)ZipOperator)LogicalOperatorLogicalPlanPhysicalPlan)AbstractAllToAllAbstractFromAbstractUDFMapCountDownloadFilter	InputDataJoinLimitProjectReadStreamingRepartitionStreamingSplitUnionWriteZip)#plan_read_op_with_checkpoint_filter$plan_write_op_with_checkpoint_writer)plan_all_to_all_op)plan_download_op)plan_read_op)plan_filter_opplan_project_opplan_streaming_repartition_opplan_udf_map_op)plan_write_opLoadCheckpointCallback)DataContextLogicalOperatorType)bound
logical_opphysical_childrendata_contextreturnc                 C   s   t |dksJ t|| jdS )z>Get the corresponding DAG of physical operators for InputData.r   )
input_datalenr   r9   r5   r6   r7    r=   X/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/_internal/planner/planner.pyplan_input_data_opA   s
   r?   opc                 C   s   t |dksJ t|| jS )Nr   r:   )r@   r6   r7   r=   r=   r>   plan_from_opO   s   rA   c                 C       t |dksJ t|g|R  S N   )r;   r   _r6   r7   r=   r=   r>   plan_zip_opX      rG   c                 C   rB   rC   )r;   r   rE   r=   r=   r>   plan_union_op]   rH   rI   c                 C   s"   t |dksJ t| j|d |S )N   r   )r;   r   limitr<   r=   r=   r>   plan_limit_opb   s   rL   c                 C   s&   t |dksJ t|d g|tjdS )NrJ   r   )column_name)r;   r   r   COLUMN_NAMEr<   r=   r=   r>   plan_count_opg   s   rO   c                 C   sF   t |dksJ t||d |d | j| j| j| j| j| j| j| j	dS )NrD   r   rJ   )r7   left_input_opright_input_op	join_typeleft_key_columnsright_key_columnsleft_columns_suffixright_columns_suffixnum_partitionspartition_size_hint#aggregator_ray_remote_args_override)
r;   r   rR   rS   rT   rU   rV   num_outputsrX   aggregator_ray_remote_argsr<   r=   r=   r>   plan_join_opn   s   r\   c                 C   s,   t |dksJ t|d | j| j|| jdS )NrJ   r   )nequalr7   locality_hints)r;   r   
num_splitsr^   r_   r<   r=   r=   r>   plan_streaming_split_op   s   ra   c                
   @   s  e Zd ZdZi eeeeee	e
eeeeeeeeeeeeeeeeeeeeee e!e"e#Z$efZ%dd Z&de'de(fddZ)de*de+fd	d
Z,de*de-de.e/e0e*e/f f fddZ1de2fddZ3de4g e5f de0e6e* e+f fddZ7de'de8fddZ9dS )PlannerzThe planner to convert optimized logical to physical operators.

    Note that planner is only doing operators conversion. Physical optimization work is
    done by physical optimizer.
    c                 C   s   d| _ i | _d S )NF)_supports_checkpointing_plan_fns_for_checkpointing)selfr=   r=   r>   __init__   s   
zPlanner.__init__logical_planr8   c                 C   s   |j j}|dur%| |r%d| _| |}t||j  |j}| || _n|dur5| |r0J t	
d | |j|j \}}t|||j }|S )z@Convert logical to physical operators recursively in post-order.NTzqYou've enabled checkpointing, but the logical plan doesn't support checkpointing. Checkpointing will be disabled.)contextcheckpoint_config_check_supports_checkpointingrc   _create_checkpoint_callbackr
   load_checkpoint_get_plan_fns_for_checkpointingrd   warningswarn_plan_recursivelydagr   )re   rg   ri   checkpoint_callbackrl   physical_dagop_mapphysical_planr=   r=   r>   plan   s*   
zPlanner.planr5   c                 C   sN   | j r| jsJ t|| j}|d ur|S t|| j}|d ur |S td| )Nz0Found unknown logical operator during planning: )rc   rd   find_plan_fn_DEFAULT_PLAN_FNS
ValueError)re   r5   plan_fnr=   r=   r>   get_plan_fn   s   
zPlanner.get_plan_fnr7   c                 C   s   i }g }|j D ]}| ||\}}|| || q| |}||||}	|	g}
|
rE|
 }|jr4n|| |||< |
|j  |
s,|||	< |	|fS )a]  Plan a logical operator and its input dependencies recursively.

        Args:
            logical_op: The logical operator to plan.
            data_context: The data context.

        Returns:
            A tuple of the physical operator corresponding to the logical operator, and
            a mapping from physical to logical operators.
        )	input_dependenciesrp   appendupdater{   pop_logical_operatorsset_logical_operatorsextend)re   r5   r7   rt   r6   childphysical_childchild_op_maprz   physical_opqueuecurr_physical_opr=   r=   r>   rp      s&   



zPlanner._plan_recursivelyc                 C   s   t |S )zFactory method to create the LoadCheckpointCallback.

        Subclasses can override this to use a different callback implementation.
        r0   )re   ri   r=   r=   r>   rk     s   z#Planner._create_checkpoint_callbackrl   c                 C   s   t tjt|dtti}|S )N)rl   )r    	functoolspartialr&   r$   r'   )re   rl   plan_fnsr=   r=   r>   rm     s   z'Planner._get_plan_fns_for_checkpointingc                    s6   t |jttfs
dS dtdtf fdd  |jS )zCheck if the logical plan supports checkpointing.

        Subclasses can override _CHECKPOINT_FILTER_OPS to support more operators.
        Fr@   r8   c                    s(   t | jrdS t fdd| jD S )NTc                 3   s    | ]} |V  qd S )Nr=   ).0	input_dep)$_all_paths_contain_checkpoint_filterr=   r>   	<genexpr>,  s
    
zfPlanner._check_supports_checkpointing.<locals>._all_paths_contain_checkpoint_filter.<locals>.<genexpr>)
isinstance_CHECKPOINT_FILTER_OPSallr|   )r@   r   re   r=   r>   r   )  s
   zSPlanner._check_supports_checkpointing.<locals>._all_paths_contain_checkpoint_filter)r   rq   r$   r"   r   bool)re   rg   r=   r   r>   rj   !  s   
z%Planner._check_supports_checkpointingN):__name__
__module____qualname____doc__r    r*   r   r?   r$   r/   r   rA   r   r+   r   r.   r   r(   r#   rI   r%   rG   r   rL   r   rO   r   r,   r!   r-   r   r\   r"   ra   r   r)   rx   r   rf   r   r   rv   r   PlanLogicalOpFnr{   r2   r   r   r   rp   r1   rk   r   r	   r   rm   r   rj   r=   r=   r=   r>   rb      sj    	

.

rb   r   c                 C   s(   |  D ]\}}t| |r|  S qdS )a  Find the plan function for a logical operator.

    This function goes through the plan functions in order and returns the first one
    that is an instance of the logical operator type.

    Args:
        logical_op: The logical operator to find the plan function for.
        plan_fns: The dictionary of plan functions.

    Returns:
        The plan function for the logical operator, or None if no plan function is
        found.
    N)itemsr   )r5   r   op_typerz   r=   r=   r>   rw   4  s
   
rw   )Sr   rn   typingr   r   r   r   r   r   r   rayr	   /ray.data._internal.execution.execution_callbackr
   'ray.data._internal.execution.interfacesr   9ray.data._internal.execution.operators.aggregate_num_rowsr   8ray.data._internal.execution.operators.input_data_bufferr   +ray.data._internal.execution.operators.joinr   5ray.data._internal.execution.operators.limit_operatorr   6ray.data._internal.execution.operators.output_splitterr   5ray.data._internal.execution.operators.union_operatorr   3ray.data._internal.execution.operators.zip_operatorr   %ray.data._internal.logical.interfacesr   r   r   $ray.data._internal.logical.operatorsr   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   %ray.data._internal.planner.checkpointr&   r'   -ray.data._internal.planner.plan_all_to_all_opr(   +ray.data._internal.planner.plan_download_opr)   'ray.data._internal.planner.plan_read_opr*   *ray.data._internal.planner.plan_udf_map_opr+   r,   r-   r.   (ray.data._internal.planner.plan_write_opr/   ,ray.data.checkpoint.load_checkpoint_callbackr1   ray.data.contextr2   r3   r   r?   rA   rG   rI   rL   rO   r\   ra   rb   rw   r=   r=   r=   r>   <module>   s    $H

	

 #