o
    $iC$                     @   sr  d dl mZmZmZmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z(m)Z)m*Z* d dl+m,Z,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z>m?Z?m@Z@mAZA d dlBmCZC d dlDmEZE ededZFeeFee	 eEge	f ZGde#dee	 d eEd!e	fd"d#ZHd$e!dee	 d eEd!e	fd%d&ZId'd( ZJd)d* ZKd+d, ZLd-d. ZMde%dee	 d eEd!e	fd/d0ZNde4dee	 d eEfd1d2ZOG d3d4 d4ZPded5eee eGf d!eeG fd6d7ZQd8S )9    )CallableDictListOptionalTupleTypeTypeVar)PhysicalOperator)AggregateNumRows)InputDataBuffer)JoinOperator)LimitOperator)OutputSplitter)UnionOperator)ZipOperator)LogicalOperatorLogicalPlanPhysicalPlan)AbstractAllToAll)Count)AbstractFrom)	InputData)Join)AbstractUDFMapFilterProjectStreamingRepartition)UnionZip)DownloadLimit)Read)StreamingSplit)Write)plan_all_to_all_op)plan_download_op)plan_read_op)plan_filter_opplan_project_opplan_streaming_repartition_opplan_udf_map_op)plan_write_op)DataContextLogicalOperatorType)bound
logical_opphysical_childrendata_contextreturnc                 C   s   t |dksJ t|| jdS )z>Get the corresponding DAG of physical operators for InputData.r   )
input_datalenr   r3   r/   r0   r1    r7   _/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/planner/planner.pyplan_input_data_op8   s
   r9   opc                 C   s   t |dksJ t|| jS )Nr   r4   )r:   r0   r1   r7   r7   r8   plan_from_opF   s   r;   c                 C       t |dksJ t|g|R  S N   )r5   r   _r0   r1   r7   r7   r8   plan_zip_opO      rA   c                 C   r<   r=   )r5   r   r?   r7   r7   r8   plan_union_opT   rB   rC   c                 C   s"   t |dksJ t| j|d |S )N   r   )r5   r   _limitr6   r7   r7   r8   plan_limit_opY   s   rF   c                 C   s&   t |dksJ t|d g|tjdS )NrD   r   )column_name)r5   r
   r   COLUMN_NAMEr6   r7   r7   r8   plan_count_op^   s   rI   c                 C   sF   t |dksJ t||d |d | j| j| j| j| j| j| j| j	dS )Nr>   r   rD   )r1   left_input_opright_input_op	join_typeleft_key_columnsright_key_columnsleft_columns_suffixright_columns_suffixnum_partitionspartition_size_hint#aggregator_ray_remote_args_override)
r5   r   
_join_type_left_key_columns_right_key_columns_left_columns_suffix_right_columns_suffix_num_outputs_partition_size_hint_aggregator_ray_remote_argsr6   r7   r7   r8   plan_join_ope   s   r\   c                 C   s,   t |dksJ t|d | j| j|| jdS )NrD   r   )nequalr1   locality_hints)r5   r   _num_splits_equal_locality_hintsr6   r7   r7   r8   plan_streaming_split_opz   s   rc   c                
   @   s   e Zd ZdZi eeeeee	e
eeeeeeeeeeeeeeeeeeeeee e!e"e#Z$de%de&fddZ'de(de)fddZ*de(d	e+de,e-e.e(e-f f fd
dZ/dS )PlannerzThe planner to convert optimized logical to physical operators.

    Note that planner is only doing operators conversion. Physical optimization work is
    done by physical optimizer.
    logical_planr2   c                 C   s&   |  |j|j\}}t|||j}|S )z@Convert logical to physical operators recursively in post-order.)_plan_recursivelydagcontextr   )selfre   physical_dagop_mapphysical_planr7   r7   r8   plan   s
   zPlanner.planr/   c                 C   s&   t || j}|d ur|S td| )Nz0Found unknown logical operator during planning: )find_plan_fn_DEFAULT_PLAN_FNS
ValueError)ri   r/   plan_fnr7   r7   r8   get_plan_fn   s   zPlanner.get_plan_fnr1   c                 C   s   i }g }|j D ]}| ||\}}|| || q| |}||||}	|	g}
|
rE|
 }|jr4n|| |||< |
|j  |
s,|||	< |	|fS )a]  Plan a logical operator and its input dependencies recursively.

        Args:
            logical_op: The logical operator to plan.
            data_context: The data context.

        Returns:
            A tuple of the physical operator corresponding to the logical operator, and
            a mapping from physical to logical operators.
        )	input_dependenciesrf   appendupdaterr   pop_logical_operatorsset_logical_operatorsextend)ri   r/   r1   rk   r0   childphysical_childchild_op_maprq   physical_opqueuecurr_physical_opr7   r7   r8   rf      s&   



zPlanner._plan_recursivelyN)0__name__
__module____qualname____doc__r!   r&   r   r9   r#   r+   r   r;   r   r'   r   r*   r   r$   r   rC   r   rA   r    rF   r   rI   r   r(   r   r)   r   r\   r"   rc   r   r%   ro   r   r   rm   r   PlanLogicalOpFnrr   r,   r   r	   r   rf   r7   r7   r7   r8   rd      sX    	
	rd   plan_fnsc                 C   s(   |  D ]\}}t| |r|  S qdS )a  Find the plan function for a logical operator.

    This function goes through the plan functions in order and returns the first one
    that is an instance of the logical operator type.

    Args:
        logical_op: The logical operator to find the plan function for.
        plan_fns: The dictionary of plan functions.

    Returns:
        The plan function for the logical operator, or None if no plan function is
        found.
    N)items
isinstance)r/   r   op_typerq   r7   r7   r8   rn      s
   
rn   N)Rtypingr   r   r   r   r   r   r   'ray.data._internal.execution.interfacesr	   9ray.data._internal.execution.operators.aggregate_num_rowsr
   8ray.data._internal.execution.operators.input_data_bufferr   +ray.data._internal.execution.operators.joinr   5ray.data._internal.execution.operators.limit_operatorr   6ray.data._internal.execution.operators.output_splitterr   5ray.data._internal.execution.operators.union_operatorr   3ray.data._internal.execution.operators.zip_operatorr   %ray.data._internal.logical.interfacesr   r   r   8ray.data._internal.logical.operators.all_to_all_operatorr   3ray.data._internal.logical.operators.count_operatorr   3ray.data._internal.logical.operators.from_operatorsr   8ray.data._internal.logical.operators.input_data_operatorr   2ray.data._internal.logical.operators.join_operatorr   1ray.data._internal.logical.operators.map_operatorr   r   r   r   3ray.data._internal.logical.operators.n_ary_operatorr   r   8ray.data._internal.logical.operators.one_to_one_operatorr   r    2ray.data._internal.logical.operators.read_operatorr!   =ray.data._internal.logical.operators.streaming_split_operatorr"   3ray.data._internal.logical.operators.write_operatorr#   -ray.data._internal.planner.plan_all_to_all_opr$   +ray.data._internal.planner.plan_download_opr%   'ray.data._internal.planner.plan_read_opr&   *ray.data._internal.planner.plan_udf_map_opr'   r(   r)   r*   (ray.data._internal.planner.plan_write_opr+   ray.data.contextr,   r-   r   r9   r;   rA   rC   rF   rI   r\   rc   rd   rn   r7   r7   r7   r8   <module>   s   $ 

	

Z