o
    bi#                     @   s,  U d dl mZmZmZmZmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZ d dlmZ d dlmZ d dlmZ eded	Zeeee egef Zi Zeee ef ed
< edee defddZedd Zdd Ze  G dd dZededeee ef dedeeeeef f fddZdS )    )CallableDictListTupleTypeTypeVar)PhysicalOperator)JoinOperator)LogicalOperatorLogicalPlanPhysicalPlan)Join)DataContext)DeveloperAPILogicalOperatorType)bound_PLAN_LOGICAL_OP_FNSlogical_op_typeplan_fnc                 C   s   |t | < dS )z5Register a plan function for a logical operator type.N)r   )r   r    r   V/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/planner/planner.pyregister_plan_logical_op_fn   s   r   c                   C   s   t  S )N)r   copyr   r   r   r   get_plan_logical_op_fns!   s   r   c                     s  ddl m  ddlm ddlm ddlm ddlm	 ddl
m}  ddlm dd	lm} dd
lm} ddlm}m}m}m} ddlm}m} ddlm}	 ddlm}
 ddlm} ddl m!} ddl"m#} ddl$m%}m&}m'}m(} ddl)m*} t+|
| d|dt,t- dt.dt-ffdd}t+|| t+|| d|dt,t- dt.dt-ffdd}t+|| t+|| t+|| t+| | fdd}t+|| fdd }t+|| fd!d"}t+|	|  fd#d$}t+| t+|| t+|| dt/dt,t- dt.dt-fd%d&}t+t/| d S )'Nr   )AggregateNumRowsInputDataBufferLimitOperatorUnionOperatorZipOperator)AbstractAllToAll)Count)AbstractFrom)	InputData)AbstractUDFMapFilterProjectStreamingRepartition)UnionZip)Limit)Read)Write)plan_all_to_all_op)plan_read_op)plan_filter_opplan_project_opplan_streaming_repartition_opplan_udf_map_op)plan_write_op
logical_opphysical_childrendata_contextreturnc                    s   t |dksJ  || jdS )z>Get the corresponding DAG of physical operators for InputData.r   )
input_datalenr;   r7   r8   r9   r   r   r   plan_input_data_opJ   s
   zA_register_default_plan_logical_op_fns.<locals>.plan_input_data_opopc                    s   t |dksJ  || jS )Nr   r<   )r@   r8   r9   r   r   r   plan_from_opZ   s   z;_register_default_plan_logical_op_fns.<locals>.plan_from_opc                    s$   t |dksJ  |d |d |S )N   r      r=   _r8   r9   r!   r   r   plan_zip_opj   s   z:_register_default_plan_logical_op_fns.<locals>.plan_zip_opc                    s    t |dksJ  |g|R  S )NrB   rD   rE   r   r   r   plan_union_opp   s   z<_register_default_plan_logical_op_fns.<locals>.plan_union_opc                    s"   t |dksJ  | j|d |S )NrC   r   )r=   _limitr>   r   r   r   plan_limit_opv   s   z<_register_default_plan_logical_op_fns.<locals>.plan_limit_opc                    s&   t |dksJ  |d g|jdS )NrC   r   )column_name)r=   COLUMN_NAMEr>   )r   r$   r   r   plan_count_op|   s   z<_register_default_plan_logical_op_fns.<locals>.plan_count_opc                 S   sT   t |dksJ | jd usJ t||d |d | j| j| j| j| j| j| j| j	dS )NrB   r   rC   )r9   left_input_opright_input_op	join_typeleft_key_columnsright_key_columnsleft_columns_suffixright_columns_suffixnum_partitionspartition_size_hint#aggregator_ray_remote_args_override)
r=   _num_outputsr	   
_join_type_left_key_columns_right_key_columns_left_columns_suffix_right_columns_suffix_partition_size_hint_aggregator_ray_remote_argsr>   r   r   r   plan_join_op   s   z;_register_default_plan_logical_op_fns.<locals>.plan_join_op)09ray.data._internal.execution.operators.aggregate_num_rowsr   8ray.data._internal.execution.operators.input_data_bufferr   5ray.data._internal.execution.operators.limit_operatorr   5ray.data._internal.execution.operators.union_operatorr    3ray.data._internal.execution.operators.zip_operatorr"   8ray.data._internal.logical.operators.all_to_all_operatorr#   3ray.data._internal.logical.operators.count_operatorr$   3ray.data._internal.logical.operators.from_operatorsr%   8ray.data._internal.logical.operators.input_data_operatorr&   1ray.data._internal.logical.operators.map_operatorr'   r(   r)   r*   3ray.data._internal.logical.operators.n_ary_operatorr+   r,   8ray.data._internal.logical.operators.one_to_one_operatorr-   2ray.data._internal.logical.operators.read_operatorr.   3ray.data._internal.logical.operators.write_operatorr/   -ray.data._internal.planner.plan_all_to_all_opr0   'ray.data._internal.planner.plan_read_opr1   *ray.data._internal.planner.plan_udf_map_opr2   r3   r4   r5   (ray.data._internal.planner.plan_write_opr6   r   r   r   r   r   )r#   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r?   rA   rG   rH   rJ   rM   r`   r   )r   r$   r   r   r    r"   r   %_register_default_plan_logical_op_fns&   s~   













rs   c                   @   s"   e Zd ZdZdedefddZdS )PlannerzThe planner to convert optimized logical to physical operators.

    Note that planner is only doing operators conversion. Physical optimization work is
    done by physical optimizer.
    logical_planr:   c                 C   s,   t  }t|j||j\}}t|||j}|S )z@Convert logical to physical operators recursively in post-order.)r   plan_recursivelydagcontextr   )selfru   plan_fnsphysical_dagop_mapphysical_planr   r   r   plan   s   
zPlanner.planN)__name__
__module____qualname____doc__r   r   r~   r   r   r   r   rt      s    rt   r7   rz   r9   r:   c                 C   s   i }g }| j D ]}t|||\}}|| || qd}| D ]\}	}
t| |	r3|
| ||} nq"|du r?td|  |g}|rY| }|jrLn|	|  |
|j  |sD| ||< ||fS )a  Plan a logical operator and its input dependencies recursively.

    Args:
        logical_op: The logical operator to plan.
        plan_fns: A dictionary of planning functions for different logical operator
            types.
        data_context: The data context.

    Returns:
        A tuple of the physical operator corresponding to the logical operator, and
        a mapping from physical to logical operators.
    Nz0Found unknown logical operator during planning: )input_dependenciesrv   appendupdateitems
isinstance
ValueErrorpop_logical_operatorsset_logical_operatorsextend)r7   rz   r9   r|   r8   childphysical_childchild_op_mapphysical_opop_typer   queuecurr_physical_opr   r   r   rv      s4   



	rv   N)typingr   r   r   r   r   r   'ray.data._internal.execution.interfacesr   +ray.data._internal.execution.operators.joinr	   %ray.data._internal.logical.interfacesr
   r   r   2ray.data._internal.logical.operators.join_operatorr   ray.data.contextr   ray.util.annotationsr   r   PlanLogicalOpFnr   __annotations__r   r   rs   rt   rv   r   r   r   r   <module>   sB   " 
{