o
    biv_                     @   s  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% er~d dl&m'Z' d dl(m)Z) dgZ*e+e,Z-G dd dZ.dS )    N)TYPE_CHECKINGIteratorListOptionalTupleTypeUnion)get_memory_info_replyget_state_from_address)	RefBundle)SourceOperator)LogicalOperator)LogicalPlan)Read)DatasetStats)unify_ref_bundles_schema)BlockMetadataWithSchema)DataContext)omit_traceback_stdout)log_onceStreamingExecutor)Datasetscheduling_strategyc                   @   s|  e Zd ZdZdedefddZdefddZd:d
dZ	defddZ
ded defddZd;ddZd<ddZd<ddZdee fddZ	d=dedeedf fdd Zd!eedf fd"d#Zdeee  fd$d%Zdee fd&d'Zedeee eed	 f fd(d)Ze	d=d*edefd+d,Z e!defd-d.Z"d>d0d1Z#defd2d3Z$defd4d5Z%defd6d7Z&defd8d9Z'd/S )?ExecutionPlana  A lazy execution plan for a Dataset.

    This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
    ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
    plan optimizations, such as operator fusion, in order to reduce Ray task
    overhead and data copies.

    Internally, the execution plan holds a snapshot of a computed list of
    blocks and their associated metadata under ``self._snapshot_bundle``,
    where this snapshot is the cached output of executing the operator chain.statsdata_contextc                 C   sF   || _ d| _d| _d| _d| _d| _d| _d| _d| _d| _	|| _
dS )zCreate a plan with no transformation operators.

        Args:
            stats: Stats for the base blocks.
            data_context: :class:`~ray.data.context.DataContext`
                object to use for execution.
        NF)	_in_stats_snapshot_operator_snapshot_stats_snapshot_bundle_snapshot_metadata_schema_schema_dataset_uuid
_run_index_dataset_name_has_started_execution_context)selfr   r    r*   K/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/plan.py__init__0   s   

zExecutionPlan.__init__returnc                 C   s   | j pd d| j d| j S )ziUnique ID of the dataset, including the dataset name,
        UUID, and current execution index.
        dataset_)r&   r$   r%   r)   r*   r*   r+   get_dataset_idZ   s   zExecutionPlan.get_dataset_idr   c                 C   s.   ddl m} |  jd7  _|| j|  }|S )z!Create an executor for this plan.r   r      )/ray.data._internal.execution.streaming_executorr   r%   r(   r1   )r)   r   executorr*   r*   r+   create_executorb   s   zExecutionPlan.create_executorc                 C   s   d| j  d| j dS )NzExecutionPlan(dataset_uuid=z, snapshot_operator=))r$   r   r0   r*   r*   r+   __repr__j   s   zExecutionPlan.__repr__dataset_clsr   c                    s  ddl m} d}d}|  s		d(dtdtdtf fdd  | jj\}}| jd	ur5| jj	}| j
 }ne| jd	urD| jj	}| jjj
}nVd
}| jj}t|tsct|jdkrYd}n
|jd }t|trO|rjd	}d	}n0t|tssJ |tti d	d| j}	|	t||	j |		 }|	 }n| j	d
d}| j
 }|d	u rd}
n5t|trt|}
n+g }
t|j|jD ]\}}t|dr|j}|
| d|  qd|
}
d|
 d }
|d	u rd}d	}||kr|  }|d	usJ | j d	urd!| j nd}|rd| dnd}d!|j||||
}d}d}d}|| }t||kr| | d|
 }t||krg }
t|j|jD ]A\}}t|drE|j}| |d  | d| }t||krsdt| }t"|t| |}|d	|  | }|
| q8d|
}
d |
 d!| |  d }
| j d	urd!| | d"| j  d#nd}|rd!| | d| d#nd}|j d$| | d!| | d%| d| | d|
 d!| d&}|dkr||7 }|S |||d   d'| 7 }|S ))zCreate a cosmetic string representation of this execution plan.

        Returns:
            The string representation of this execution plan.
        r   )MaterializedDataset opcurr_strdepthc                    s   t | tr	||fS |}| j}|dkr|| d7 }nd|d d  }|| d| d7 }| jD ]} |||d \}}t||}q/||fS )zgTraverse (DFS) the LogicalPlan DAG and
                return a string representation of the operators.r   
 r2      +- )
isinstancer   nameinput_dependenciesmax)r;   r<   r=   curr_max_depthop_nametrailing_spaceinputinput_max_depthgenerate_logical_plan_stringr*   r+   rL      s   


zFExecutionPlan.get_plan_as_string.<locals>.generate_logical_plan_stringNFr2   Tmetadataparent)fetch_if_missingzUnknown schema__name__z: z, {}?z	name={}, znum_blocks=z{}({}{}num_rows={}, schema={})P   
   z   zschema=   z...: z,
z{
r>   zname=,(z	num_rows=r6   rA   )r:   r   )#ray.data.datasetr9   has_computed_outputr   strint_logical_plandagr!   schemanum_rowsr"   rN   rB   r   lenrD   r   r   r(   link_logical_planr   
meta_counttypezipnamestypeshasattrrQ   appendjoininitial_num_blocksr&   formatrE   )r)   r8   r9   plan_strplan_max_depthr`   counthas_n_ary_operatorr_   plan
schema_strnt
num_blocksname_strnum_blocks_strdataset_strSCHEMA_LINE_CHAR_LIMITMIN_FIELD_LENGTH
INDENT_STRrH   schema_str_on_new_linecol_strshortened_suffixchars_left_for_col_namer*   rK   r+   get_plan_as_stringr   s  













	


	z ExecutionPlan.get_plan_as_stringlogical_planr   c                 C   s   || _ | j| j _dS )zLink the logical plan into this execution plan.

        This is used for triggering execution for optimizer code path in this legacy
        execution plan.
        N)r^   r(   )r)   r   r*   r*   r+   rc   *  s   zExecutionPlan.link_logical_planc                 C   s>   t | j| jd}| jdur| j|_| j|_| j|_| j|_|S )zCreate a shallow copy of this execution plan.

        This copy can be executed without mutating the original, but clearing the copy
        will also clear the original.

        Returns:
            A shallow copy of this execution plan.
        r   N)r   r   r(   r!   r   r    r&   r)   	plan_copyr*   r*   r+   copy3  s   	
zExecutionPlan.copyc                 C   sV   t t| j| j d}| jr%t| j|_t| j|_t| j|_| j|_|S )zCreate a deep copy of this execution plan.

        This copy can be executed AND cleared without mutating the original.

        Returns:
            A deep copy of this execution plan.
        r   )r   r   r   r(   r!   r   r    r&   r   r*   r*   r+   	deep_copyH  s   
zExecutionPlan.deep_copyc                 C   s   | j j S )zGet the estimated number of blocks from the logical plan
        after applying execution plan optimizations, but prior to
        fully executing the dataset.)r^   r_   estimated_num_outputsr0   r*   r*   r+   rl   \  s   z ExecutionPlan.initial_num_blocksFrP   zpyarrow.lib.Schemac                 C   s   | j dur| j S d}|  r| jj}n4| jj }|du rG|rG|  \}}}| |D ]}|jdur7|j} nq+W d   n1 sBw   Y  | | | j S )aK  Get the schema after applying all execution plan optimizations,
        but prior to fully executing the dataset
        (unless `fetch_if_missing` is set to True).

        Args:
            fetch_if_missing: Whether to execute the plan to fetch the schema.

        Returns:
            The schema of the output dataset.
        N)	r#   r[   r!   r`   r^   r_   infer_schemaexecute_to_iteratorcache_schema)r)   rP   r`   iter_ref_bundlesr/   r4   bundler*   r*   r+   r`   b  s$   



zExecutionPlan.schemar`   c                 C   s
   || _ d S N)r#   )r)   r`   r*   r*   r+   r     s   
zExecutionPlan.cache_schemac                 C   s   | j j jS )z1Get the input files of the dataset, if available.)r^   r_   infer_metadatainput_filesr0   r*   r*   r+   r     s   zExecutionPlan.input_filesc                 C   sN   | j j}|  rtdd | jjD }|S | jdur#| j}|S d}|S )zGet the number of rows after applying all plan optimizations, if possible.

        This method will never trigger any computation.

        Returns:
            The number of records of the result Dataset, or None.
        c                 s       | ]}|j V  qd S r   )ra   ).0mr*   r*   r+   	<genexpr>      z+ExecutionPlan.meta_count.<locals>.<genexpr>N)r^   r_   r[   sumr!   rN   r   ra   )r)   r_   ra   r*   r*   r+   rd     s   
zExecutionPlan.meta_countc                 C   s   d| _ |  r|  }t|g| jdfS ddlm} |  }||| }t|}zt	t
|g|}W n	 ty;   Y nw | | _|| j|fS )a  Execute this plan, returning an iterator.

        This will use streaming execution to generate outputs.

        NOTE: Executor will be shutdown upon either of the 2 following conditions:

            - Iterator is fully exhausted (ie until StopIteration is raised)
            - Executor instances is garbage-collected

        Returns:
            Tuple of iterator over output RefBundles, DatasetStats, and the executor.
        TNr   )!execute_to_legacy_bundle_iterator)r'   r[   executeiterr    *ray.data._internal.execution.legacy_compatr   r5   	itertoolschainnextStopIteration	get_stats)r)   r   r   r4   bundle_itergenr*   r*   r+   r     s   

z!ExecutionPlan.execute_to_iteratorpreserve_orderc              
      s  d| _ | j}t dstdrtd |  sddl	m
}m} t| jjtr[| jj dur[|| | jj }| jj }tdd	 |D }t|}td
d |D ||d}n?|  }	||	| | j|d}
tt|
 |
j|
 d}W d   n1 sw   Y  |	  jdd}|jrt| z%t t!t" j#}|j$j%dkrt&|j$j'_(|j$j)dkrt&|j$j*_+W n t,y } zt-d|  W Y d}~nd}~ww d_. fdd   || _/| jj| _0| _1| j| j1_2| j/S )zExecutes this plan (eagerly).

        Args:
            preserve_order: Whether to preserve order in execution.

        Returns:
            The blocks of the output dataset.
        TCPUcpu_warninga<  Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tuner   )_get_initial_stats_from_planexecute_to_legacy_block_listNc                 s   r   r   )owns_blocks)r   r   r*   r*   r+   r     r   z(ExecutionPlan.execute.<locals>.<genexpr>c                 S   s$   g | ]}|j D ]\}}||fqqS r*   )blocks)r   r   blockrN   r*   r*   r+   
<listcomp>  s    z)ExecutionPlan.execute.<locals>.<listcomp>)r   r`   )dataset_uuidr   F)include_parentzLSkipping recording memory spilled and restored statistics due to exception: c                    s0    j | jdd7  _ | jD ]} | qd S )Nobj_store_mem_spilledr   )dataset_bytes_spilledextra_metricsgetparents)	cur_statsrO   collect_statsr   r*   r+   r   '  s   

z,ExecutionPlan.execute.<locals>.collect_stats)3r'   r(   rayavailable_resourcesr   r   loggerwarningr[   r   r   r   rB   r^   r_   r   output_datar   allr   r   r5   r$   tupleiter_blocks_with_metadata_owned_by_consumer
get_schemar   
to_summary	to_stringenable_auto_log_statsinfor	   r
   get_runtime_contextgcs_addressstore_statsspill_time_total_sr]   spilled_bytes_totalglobal_bytes_spilledrestore_time_total_srestored_bytes_totalglobal_bytes_restored	Exceptiondebugr   r!   r   r    r   )r)   r   contextr   r   output_bundlesr`   r   r   r4   r   stats_summary_stringreplyer*   r   r+   r     s   




zExecutionPlan.executec                 C   s   | j S )zBReturn ``True`` if this plan has been partially or fully executed.)r'   r0   r*   r*   r+   has_started_execution8  s   z#ExecutionPlan.has_started_executionNc                 C   s   d| _ d| _d| _dS )z;Clear the snapshot kept in the plan to the beginning state.N)r!   r   r    r0   r*   r*   r+   clear_snapshot=  s   
zExecutionPlan.clear_snapshotc                 C   s   | j s	ti ddS | j S )zqReturn stats for this plan.

        If the plan isn't executed, an empty stats object will be returned.
        NrM   )r    r   r0   r*   r*   r+   r   C  s   zExecutionPlan.statsc                 C   s   t dd | j D S )z/Return whether this plan has lazy input blocks.c                 s   s    | ]}t |tV  qd S r   )rB   r   )r   r;   r*   r*   r+   r   N  s    z/ExecutionPlan.has_lazy_input.<locals>.<genexpr>)r   r^   sourcesr0   r*   r*   r+   has_lazy_inputL  s   zExecutionPlan.has_lazy_inputc                 C   s   | j duo| j| jjkS )ztWhether this plan has a computed snapshot for the final operator, i.e. for
        the output of this plan.
        N)r!   r   r^   r_   r0   r*   r*   r+   r[   P  s   
z!ExecutionPlan.has_computed_outputc                 C   sB   ddl m} ddlm} | jj D ]}t|||fr dS qdS )z-Whether this plan requires to preserve order.r   )Sort)ZipTF)8ray.data._internal.logical.operators.all_to_all_operatorr   3ray.data._internal.logical.operators.n_ary_operatorr   r^   r_   post_order_iterrB   )r)   r   r   r;   r*   r*   r+   require_preserve_orderY  s   z$ExecutionPlan.require_preserve_order)r-   r   )r   r   )r-   r   )F)r-   N)(rQ   
__module____qualname____doc__r   r   r,   r\   r1   r5   r7   r   r   rc   r   r   r   r]   rl   boolr   re   r`   r   r   r   rd   r   r   r   r   r   r   propertyr   r   r   r   r[   r   r*   r*   r*   r+   r   $   sV    
*
 
9
	


"%u
		r   )/r   r   loggingtypingr   r   r   r   r   r   r   pyarrowr   ray._private.internal_apir	   r
   'ray.data._internal.execution.interfacesr   %ray.data._internal.logical.interfacesr   6ray.data._internal.logical.interfaces.logical_operatorr   2ray.data._internal.logical.interfaces.logical_planr   2ray.data._internal.logical.operators.read_operatorr   ray.data._internal.statsr   ray.data._internal.utilr   ray.data.blockr   ray.data.contextr   ray.data.exceptionsr   ray.util.debugr   r3   r   rZ   r   INHERITABLE_REMOTE_ARGS	getLoggerrQ   r   r   r*   r*   r*   r+   <module>   s0    $
