o
    $id                     @   s.  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( erd dl)m*Z* d dl+m,Z, dgZ-e.e/Z0G dd dZ1dS )    N)TYPE_CHECKINGIteratorListOptionalTupleTypeUnion)get_memory_info_replyget_state_from_address)	RefBundle)SourceOperator)LogicalOperator)LogicalPlan)Operator)Read)get_plan_conversion_fns)DatasetStats)BlockMetadataWithSchema_take_first_non_empty_schema)DataContext)omit_traceback_stdout)log_onceStreamingExecutor)Datasetscheduling_strategyc                   @   s  e Zd ZdZdedefddZdefddZdFd
dZ	defddZ
defddZe				dGdededededef
ddZded defddZdHd!d"ZdId#d$ZdId%d&Zdee fd'd(Z	dJd)edeed*f fd+d,Zd-eed*f fd.d/Zdeee  fd0d1Zdee fd2d3Zede e!e" eed	 f fd4d5Z#e	dJd6ede"fd7d8Z$e%defd9d:Z&dKd<d=Z'defd>d?Z(defd@dAZ)defdBdCZ*defdDdEZ+d;S )LExecutionPlana  A lazy execution plan for a Dataset.

    This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
    ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
    plan optimizations, such as operator fusion, in order to reduce Ray task
    overhead and data copies.

    Internally, the execution plan holds a snapshot of a computed list of
    blocks and their associated metadata under ``self._snapshot_bundle``,
    where this snapshot is the cached output of executing the operator chain.statsdata_contextc                 C   sF   || _ d| _d| _d| _d| _d| _d| _d| _d| _d| _	|| _
dS )zCreate a plan with no transformation operators.

        Args:
            stats: Stats for the base blocks.
            data_context: :class:`~ray.data.context.DataContext`
                object to use for execution.
        NF)	_in_stats_snapshot_operator_snapshot_stats_snapshot_bundle_snapshot_metadata_schema_schema_dataset_uuid
_run_index_dataset_name_has_started_execution_context)selfr   r    r,   T/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/plan.py__init__1   s   

zExecutionPlan.__init__returnc                 C   s   | j pd d| j d| j S )ziUnique ID of the dataset, including the dataset name,
        UUID, and current execution index.
        dataset_)r(   r&   r'   r+   r,   r,   r-   get_dataset_id[   s   zExecutionPlan.get_dataset_idr   c                 C   s.   ddl m} |  jd7  _|| j|  }|S )z!Create an executor for this plan.r   r      )/ray.data._internal.execution.streaming_executorr   r'   r*   r3   )r+   r   executorr,   r,   r-   create_executorc   s   zExecutionPlan.create_executorc                 C   s   d| j  d| j dS )NzExecutionPlan(dataset_uuid=z, snapshot_operator=))r&   r!   r2   r,   r,   r-   __repr__k   s   zExecutionPlan.__repr__c                 C   s~   dd gt   }g d}| j}g }t||D ]#\}}||}| j|jdd\}}d| d}	|	 | }
||
 qd|S )	z@Return a string representation of the logical and physical plan.c                 S   s   | S Nr,   )xr,   r,   r-   <lambda>v   s    z'ExecutionPlan.explain.<locals>.<lambda>)zLogical PlanzLogical Plan (Optimized)zPhysical PlanzPhysical Plan (Optimized)T)show_op_reprz

-------- z
 --------
 )r   _logical_planzipgenerate_plan_stringdagappendjoin)r+   convert_fnstitlesplansectionstitle
convert_fnplan_strr1   bannersectionr,   r,   r-   explains   s   
zExecutionPlan.explainr>   r   TFopcurr_strdepthincluding_sourcer=   c           
      C   s   |st | tr||fS |}|rt| n| j}|dkr"|| d7 }nd|d d  }|| d| d7 }| jD ]}t|||d ||\}}	t||	}q7||fS )zXTraverse (DFS) the Plan DAG and
        return a string representation of the operators.r   
 r4      +- )
isinstancer   reprnameinput_dependenciesr   rA   max)
rO   rP   rQ   rR   r=   curr_max_depthop_strtrailing_spaceinputinput_max_depthr,   r,   r-   rA      s   

z"ExecutionPlan.generate_plan_stringdataset_clsr   c                 C   s  ddl m} d}d}|  s| j| jjdd\}}| jdur(| jj}| j }ne| j	dur7| j	j}| j	j
j}nVd}| jj}t|tsVt|jdkrLd}n
|jd }t|trB|r]d}d}n0t|tsfJ |tti dd	| j}	|	t||	j |	 }|	 }n| jdd
}| j }|du rd}
n5t|trt|}
n+g }
t|j|jD ]\}}t|dr|j}|
| d|  qd|
}
d|
 d }
|du rd}d}||kr|  }|dusJ | jdurd | jnd}|rd| dnd}d |j||||
}d}d}d}|| }t||kr| | d|
 }t||kr~g }
t|j|jD ]A\}}t|dr7|j}| |d  | d| }t||kredt| }t!|t| |}|d|  | }|
| q*d|
}
d|
 d| |  d }
| jdurd| | d| j dnd}|rd| | d| dnd}|j d | | d| | d!| d| | d|
 d| d"}|dkr||7 }|S |||d   d#| 7 }|S )$zCreate a cosmetic string representation of this execution plan.

        Returns:
            The string representation of this execution plan.
        r   )MaterializedDatasetr>   F)rR   Nr4   Tmetadataparent)fetch_if_missingzUnknown schema__name__z: z, {}?z	name={}, znum_blocks=z{}({}{}num_rows={}, schema={})P   
   z   zschema=   z...: z,
z{
rS   zname=,(z	num_rows=r8   rV   )"ray.data.datasetrb   has_computed_outputrA   r?   rB   r#   schemanum_rowsr$   rd   rW   r   lenrZ   r   r   r*   link_logical_planr   
meta_counttypestrr@   namestypeshasattrrg   rC   rD   initial_num_blocksr(   formatr[   )r+   ra   rb   rK   plan_max_depthrr   counthas_n_ary_operatorrB   rG   
schema_strnt
num_blocksname_strnum_blocks_strdataset_strSCHEMA_LINE_CHAR_LIMITMIN_FIELD_LENGTH
INDENT_STRr^   schema_str_on_new_linecol_strshortened_suffixchars_left_for_col_namer,   r,   r-   get_plan_as_string   s   














	


	z ExecutionPlan.get_plan_as_stringlogical_planr   c                 C   s   || _ | j| j _dS )zLink the logical plan into this execution plan.

        This is used for triggering execution for optimizer code path in this legacy
        execution plan.
        N)r?   r*   )r+   r   r,   r,   r-   ru   N  s   zExecutionPlan.link_logical_planc                 C   s>   t | j| jd}| jdur| j|_| j|_| j|_| j|_|S )zCreate a shallow copy of this execution plan.

        This copy can be executed without mutating the original, but clearing the copy
        will also clear the original.

        Returns:
            A shallow copy of this execution plan.
        r   N)r   r    r*   r#   r!   r"   r(   r+   	plan_copyr,   r,   r-   copyW  s   	
zExecutionPlan.copyc                 C   sV   t t| j| j d}| jr%t| j|_t| j|_t| j|_| j|_|S )zCreate a deep copy of this execution plan.

        This copy can be executed AND cleared without mutating the original.

        Returns:
            A deep copy of this execution plan.
        r   )r   r   r    r*   r#   r!   r"   r(   r   r,   r,   r-   	deep_copyl  s   
zExecutionPlan.deep_copyc                 C   s   | j j S )zGet the estimated number of blocks from the logical plan
        after applying execution plan optimizations, but prior to
        fully executing the dataset.)r?   rB   estimated_num_outputsr2   r,   r,   r-   r|     s   z ExecutionPlan.initial_num_blocksrf   zpyarrow.lib.Schemac                 C   s   | j dur| j S d}|  r| jj}n.| jj }|du rA|rA|  \}}}| tdd |D }W d   n1 s<w   Y  | 	| | j S )aK  Get the schema after applying all execution plan optimizations,
        but prior to fully executing the dataset
        (unless `fetch_if_missing` is set to True).

        Args:
            fetch_if_missing: Whether to execute the plan to fetch the schema.

        Returns:
            The schema of the output dataset.
        Nc                 s       | ]}|j V  qd S r:   rr   .0bundler,   r,   r-   	<genexpr>      
z'ExecutionPlan.schema.<locals>.<genexpr>)
r%   rq   r#   rr   r?   rB   infer_schemaexecute_to_iteratorr   cache_schema)r+   rf   rr   iter_ref_bundlesr1   r6   r,   r,   r-   rr     s   



zExecutionPlan.schemarr   c                 C   s
   || _ d S r:   )r%   )r+   rr   r,   r,   r-   r     s   
zExecutionPlan.cache_schemac                 C   s   | j j jS )z1Get the input files of the dataset, if available.)r?   rB   infer_metadatainput_filesr2   r,   r,   r-   r     s   zExecutionPlan.input_filesc                 C   sN   | j j}|  rtdd | jjD }|S | jdur#| j}|S d}|S )zGet the number of rows after applying all plan optimizations, if possible.

        This method will never trigger any computation.

        Returns:
            The number of records of the result Dataset, or None.
        c                 s   r   r:   )rs   )r   mr,   r,   r-   r         z+ExecutionPlan.meta_count.<locals>.<genexpr>N)r?   rB   rq   sumr#   rd   r   rs   )r+   rB   rs   r,   r,   r-   rv     s   
zExecutionPlan.meta_countc                 C   s   d| _ |  r|  }t|g| jdfS ddlm} |  }||| }t|}zt	t
|g|}W n	 ty;   Y nw | | _|| j|fS )a  Execute this plan, returning an iterator.

        This will use streaming execution to generate outputs.

        NOTE: Executor will be shutdown upon either of the 2 following conditions:

            - Iterator is fully exhausted (ie until StopIteration is raised)
            - Executor instances is garbage-collected

        Returns:
            Tuple of iterator over output RefBundles, DatasetStats, and the executor.
        TNr   )!execute_to_legacy_bundle_iterator)r)   rq   executeiterr"   *ray.data._internal.execution.legacy_compatr   r7   	itertoolschainnextStopIteration	get_stats)r+   r   r   r6   bundle_itergenr,   r,   r-   r     s   

z!ExecutionPlan.execute_to_iteratorpreserve_orderc              
      s  d| _ | j}t dstdrtd |  sddl	m
}m} t| jjtrZ| jj durZ|| | jj }tdd	 |D }td
d	 |D }tdd |D ||d}n?|  }	||	| | j|d}
tt|
 |
j|
 d}W d   n1 sw   Y  |	  jdd}|jrt| z%tt t! j"}|j#j$dkrt%|j#j&_'|j#j(dkrt%|j#j)_*W n t+y } zt,d|  W Y d}~nd}~ww d_- fdd   || _.| jj| _/| _0| j| j0_1| j.S )zExecutes this plan (eagerly).

        Args:
            preserve_order: Whether to preserve order in execution.

        Returns:
            The blocks of the output dataset.
        TCPUcpu_warninga<  Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tuner   )_get_initial_stats_from_planexecute_to_legacy_block_listNc                 s   r   r:   )owns_blocksr   r,   r,   r-   r     r   z(ExecutionPlan.execute.<locals>.<genexpr>c                 s   r   r:   r   r   r,   r,   r-   r     r   c                 S   s$   g | ]}|j D ]\}}||fqqS r,   )blocks)r   r   blockrd   r,   r,   r-   
<listcomp>  s    z)ExecutionPlan.execute.<locals>.<listcomp>)r   rr   )dataset_uuidr   F)include_parentzLSkipping recording memory spilled and restored statistics due to exception: c                    s0    j | jdd7  _ | jD ]} | qd S )Nobj_store_mem_spilledr   )dataset_bytes_spilledextra_metricsgetparents)	cur_statsre   collect_statsr   r,   r-   r   K  s   

z,ExecutionPlan.execute.<locals>.collect_stats)2r)   r*   rayavailable_resourcesr   r   loggerwarningrq   r   r   r   rW   r?   rB   r   output_dataallr   r   r7   r&   tupleiter_blocks_with_metadata_owned_by_consumer
get_schemar   
to_summary	to_stringenable_auto_log_statsinfor	   r
   get_runtime_contextgcs_addressstore_statsspill_time_total_sintspilled_bytes_totalglobal_bytes_spilledrestore_time_total_srestored_bytes_totalglobal_bytes_restored	Exceptiondebugr   r#   r!   r"   r   )r+   r   contextr   r   output_bundlesr   rr   r   r6   r   stats_summary_stringreplyer,   r   r-   r     s   




zExecutionPlan.executec                 C   s   | j S )zBReturn ``True`` if this plan has been partially or fully executed.)r)   r2   r,   r,   r-   has_started_execution\  s   z#ExecutionPlan.has_started_executionNc                 C   s   d| _ d| _d| _dS )z;Clear the snapshot kept in the plan to the beginning state.N)r#   r!   r"   r2   r,   r,   r-   clear_snapshota  s   
zExecutionPlan.clear_snapshotc                 C   s   | j s	ti ddS | j S )zqReturn stats for this plan.

        If the plan isn't executed, an empty stats object will be returned.
        Nrc   )r"   r   r2   r,   r,   r-   r   g  s   zExecutionPlan.statsc                 C   s   t dd | j D S )z/Return whether this plan has lazy input blocks.c                 s   s    | ]}t |tV  qd S r:   )rW   r   )r   rO   r,   r,   r-   r   r  s    z/ExecutionPlan.has_lazy_input.<locals>.<genexpr>)r   r?   sourcesr2   r,   r,   r-   has_lazy_inputp  s   zExecutionPlan.has_lazy_inputc                 C   s   | j duo| j| jjkS )ztWhether this plan has a computed snapshot for the final operator, i.e. for
        the output of this plan.
        N)r#   r!   r?   rB   r2   r,   r,   r-   rq   t  s   
z!ExecutionPlan.has_computed_outputc                 C   sB   ddl m} ddlm} | jj D ]}t|||fr dS qdS )z-Whether this plan requires to preserve order.r   )Sort)ZipTF)8ray.data._internal.logical.operators.all_to_all_operatorr   3ray.data._internal.logical.operators.n_ary_operatorr   r?   rB   post_order_iterrW   )r+   r   r   rO   r,   r,   r-   require_preserve_order}  s   z$ExecutionPlan.require_preserve_order)r/   r   )r>   r   TF)r   r   )r/   r   )F)r/   N),rg   
__module____qualname____doc__r   r   r.   rx   r3   r7   r9   rN   staticmethodr   r   boolrA   r   r   ru   r   r   r   r|   r   rw   rr   r   r   r   rv   r   r   r   r   r   r   propertyr   r   r   r   rq   r   r,   r,   r,   r-   r   %   sx    
*
 

	


!%v
		r   )2r   r   loggingtypingr   r   r   r   r   r   r   pyarrowr   ray._private.internal_apir	   r
   'ray.data._internal.execution.interfacesr   %ray.data._internal.logical.interfacesr   6ray.data._internal.logical.interfaces.logical_operatorr   2ray.data._internal.logical.interfaces.logical_planr   .ray.data._internal.logical.interfaces.operatorr   2ray.data._internal.logical.operators.read_operatorr   %ray.data._internal.logical.optimizersr   ray.data._internal.statsr   ray.data.blockr   r   ray.data.contextr   ray.data.exceptionsr   ray.util.debugr   r5   r   rp   r   INHERITABLE_REMOTE_ARGS	getLoggerrg   r   r   r,   r,   r,   r-   <module>   s2    $
