o
    bi                    @   s@  U d dl Z d dlZd dlZd dlZd dlZd dl mZ d dlmZ d dlm	Z	m
Z
 d dlmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlZd dlmZ d dlmZ d d	lmZmZm Z m!Z!m"Z" d d
l#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1m2Z2 d dl3m4Z4 e5e6Z7dZ8dZ9dZ:ee;ee) f Z<de=de;fddZ>d3de?de?de;fddZ@G dd dZAG dd  d ZBejCd d!G d"d# d#ZDeE ZFejEeGd$< d%d& ZHG d'd( d(ZIeI ZJG d)d* d*ejKZLG d+d, d,ZMe-e	G d-d. d.ZNe	G d/d0 d0ZOe	G d1d2 d2ZPdS )4    N)defaultdict)contextmanager)	dataclassfields)AnyDictListMappingOptionalSetTupleUnion)uuid4)ActorHandle)	BlockList)NODE_UNKNOWNMetricsGroupMetricsTypeNodeMetricsOpRuntimeMetrics)Topologyget_dataset_metadata_exporter)capfirst)
BlockStats)DataContext)DeveloperAPI)CounterGauge	HistogramMetric)NodeAffinitySchedulingStrategydatasets_stats_actor_dataset_stats_actorunknownsecondsreturnc                 C   sR   | dkrt t| dd S | dkrt t| d dd S t t| d d dd S )N      sgMbP?  msus)strround)r$    r.   L/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/stats.pyfmt*   s
   r0      lvlspaces_per_indentc                 C   s   d| |  S )zReturns a string of spaces which contains `level` indents,
    each indent containing `spaces_per_indent` spaces. For example:
    >>> leveled_indent(2, 3)
    '      '
     r.   )r2   r3   r.   r.   r/   leveled_indent3      r5   c                   @   sp   e Zd ZdZdd ZedddZdeddfd	d
ZdefddZ	defddZ
defddZdefddZdS )Timerz8Helper class for tracking accumulated time (in seconds).c                 C   s    d| _ td| _d| _d| _d S )Nr   inf)_totalfloat_min_max_total_countselfr.   r.   r/   __init__?   s   

zTimer.__init__r%   Nc              
   c   s>    t  }zd V  W | t  |  d S | t  |  w N)timeperf_counteradd)r?   
time_startr.   r.   r/   timerE   s
   *zTimer.timervaluec                 C   s@   |  j |7  _ || jk r|| _|| jkr|| _|  jd7  _d S Nr&   )r9   r;   r<   r=   )r?   rG   r.   r.   r/   rD   M   s   

z	Timer.addc                 C      | j S rA   )r9   r>   r.   r.   r/   getU      z	Timer.getc                 C   rI   rA   )r;   r>   r.   r.   r/   minX   rK   z	Timer.minc                 C   rI   rA   )r<   r>   r.   r.   r/   max[   rK   z	Timer.maxc                 C   s   | j r	| j| j  S tdS )Nr8   )r=   r9   r:   r>   r.   r.   r/   avg^   s   z	Timer.avgr%   N)__name__
__module____qualname____doc__r@   r   rF   r:   rD   rJ   rL   rM   rN   r.   r.   r.   r/   r7   <   s    r7   c                   @   sN   e Zd ZdZdedddee fddZded	dfd
dZ	de
d	dfddZdS )_DatasetStatsBuilderzHelper class for building dataset stats.

    When this class is created, we record the start time. When build() is
    called with the final blocks of the new dataset, the time delta is
    saved as part of the stats.operator_nameparentDatasetStatsoverride_start_timec                 C   s   || _ || _|pt | _d S rA   )rU   rV   rB   rC   
start_time)r?   rU   rV   rX   r.   r.   r/   r@   i   s   z_DatasetStatsBuilder.__init__metadatar%   c                 C   s   i }t | D ].\}\}}t|}t|dkr1|dkr$||| j| < q||| jdd | < q||| j< qt|| j| jd}t	 | j
 |_|S )Nr&   r   z->)rZ   rV   	base_name)	enumerateitemsr   lenrU   splitrW   rV   rB   rC   rY   time_total_s)r?   rZ   op_metadataikvcapped_kstatsr.   r.   r/   build_multioperators   s   z(_DatasetStatsBuilder.build_multioperatorfinal_blocksc                 C   s,   t | j| i| jd}t | j |_|S )N)rZ   rV   )rW   rU   get_metadatarV   rB   rC   rY   ra   )r?   ri   rg   r.   r.   r/   build   s   z_DatasetStatsBuilder.buildN)rP   rQ   rR   rS   r,   r
   r:   r@   	StatsDictrh   r   rk   r.   r.   r.   r/   rT   b   s    

rT   )num_cpusc                   @   sP  e Zd ZdZd,ddZdedeedf deee	f fd	d
Z
deeef fddZdd Zdd Z	d-dedeeeeeef f  dee deeef deeeeeeeef f f  f
ddZd.ddZ		d/ddZdededee d ed!ef
d"d#Zdedeeef fd$d%Zd-dee fd&d'Z		d0ded(ee d)ee fd*d+ZdS )1_StatsActora  Actor holding stats for blocks created by LazyBlockList.

    This actor is shared across all datasets created in the same cluster.
    In order to cap memory usage, we set a max number of stats to keep
    in the actor. When this limit is exceeded, the stats will be garbage
    collected in FIFO order.

    TODO(ekl) we should consider refactoring LazyBlockList so stats can be
    extracted without using an out-of-band actor.r)   c                 C   s  t t| _i | _i | _|| _g | _d| _i | _	i | _
t | _d}tdd|d| _tdd|d| _tdd	|d| _td
d|d| _tdd|d| _tdd|d| _tdd|d| _| jtj|d| _| jtj|d| _| jtj|d| _| jtj|d| _| jtj|d| _ | jtj!|d| _"| # | _$d}tdd|d| _%tdd|d| _&tdd|d| _'d}tdd|d| _(tdd|d| _)tdd d!*d"d# t+D  d$|d| _,d}td%d&|d| _-td'd(|d| _.td)d*|d| _/td+d,d!*d-d# t+D  d$|d| _0d S ).Nr   datasetoperatordata_spilled_byteszBytes spilled by dataset operators.
                DataContext.enable_get_object_locations_for_metrics
                must be set to True to report this metricdescriptiontag_keysdata_freed_bytesz Bytes freed by dataset operatorsdata_current_bytesz9Bytes currently in memory store used by dataset operatorsdata_cpu_usage_coresz#CPUs allocated to dataset operatorsdata_gpu_usage_coresz#GPUs allocated to dataset operatorsdata_output_bytesz$Bytes outputted by dataset operatorsdata_output_rowsz#Rows outputted by dataset operators)metrics_groupru   )rp   data_iter_total_blocked_secondsz0Seconds user thread is blocked by iter_batches()data_iter_user_secondszSeconds spent in user codedata_iter_initialize_secondsz-Seconds spent in iterator initialization coderp   job_idrY   #data_dataset_estimated_total_blocksz&Total work units in blocks for dataset!data_dataset_estimated_total_rowsz$Total work units in rows for datasetdata_dataset_statezState of dataset (z, c                 S      g | ]}|j  d |j qS =rG   name.0r(   r.   r.   r/   
<listcomp>.      z(_StatsActor.__init__.<locals>.<listcomp>)$data_operator_estimated_total_blocksz'Total work units in blocks for operator"data_operator_estimated_total_rowsz%Total work units in rows for operatordata_operator_queued_blocksz$Number of queued blocks for operatordata_operator_statezState of operator (c                 S   r   r   r   r   r.   r.   r/   r   D  r   )1collectionsr   dictrZ   	last_timerY   	max_stats
fifo_queuenext_dataset_iddatasets_ray_nodes_cacher   _metadata_exporterr   spilled_bytesfreed_bytescurrent_bytescpu_usage_coresgpu_usage_coresoutput_bytesoutput_rows0_create_prometheus_metrics_for_execution_metricsr   INPUTSexecution_metrics_inputsOUTPUTSexecution_metrics_outputsTASKSexecution_metrics_tasksOBJECT_STORE_MEMORY"execution_metrics_obj_store_memoryACTORSexecution_metrics_actorsMISCexecution_metrics_misc/_create_prometheus_metrics_for_per_node_metricsper_node_metricsiter_total_blocked_siter_user_siter_initialize_sr   r   joinDatasetStater   r   r   r   r   )r?   r   op_tags_keysiter_tag_keysdataset_tagsoperator_tagsr.   r.   r/   r@      s  						
z_StatsActor.__init__r|   ru   .r%   c                 C   s   i }t  D ]H}|j|ksqd|j }|j}|jtjkr(t|||d||j< q|jtjkr>t|f||d|j	||j< q|jtj
krNt
|||d||j< q|S )Ndata_rs   )r   get_metricsr|   r   rt   metrics_typer   r   r   metrics_argsr   )r?   r|   ru   metricsmetricmetric_namemetric_descriptionr.   r.   r/   r   H  s:   
z<_StatsActor._create_prometheus_metrics_for_execution_metricsc                 C   s8   i }t tD ]}d|j d}t|ddd||j< q|S )Nr   	_per_node )rp   node_iprs   )r   r   r   r   )r?   r   fieldr   r.   r.   r/   r   f  s   z;_StatsActor._create_prometheus_metrics_for_per_node_metricsc                 C   s   t | j}|  jd7  _|S rH   )r,   r   )r?   
dataset_idr.   r.   r/   get_dataset_idq  s   
z_StatsActor.get_dataset_idc                 C   s,   |D ]}| j |  q|D ]}| j|  qd S rA   )update_execution_metricsupdate_iteration_metrics)r?   execution_metricsiteration_metricsr   r.   r.   r/   update_metricsv  s
   z_StatsActor.update_metricsNdataset_tag
op_metricsr   stater   c              	   C   sZ  	 ddt dtttf dtttf fdd}t||D ]\}}| ||}	| j	|
dd|	 | j	|
dd|	 | j	|
d	d|	 | j	|
d
d|	 | j	|
dd|	 | j	|
dd|	 | j	|
dd|	 | j D ]\}
}|||
|
d|	 qv| j D ]\}
}|||
|
d|	 q| j D ]\}
}|||
|
d|	 q| j D ]\}
}|||
|
d|	 q| j D ]\}
}|||
|
d|	 q| j D ]\}
}|||
|
d|	 qq|d ur%| D ]1\}}|| jvr|   | j
|t}| j||d}	| D ]\}}| j| }||||	 qq| || d S )Nprom_metricrG   tagsc                 S   sR   t | tr| || d S t | tr| || d S t | tr'| || d S d S rA   )
isinstancer   setr   incr   observe)r   rG   r   r.   r.   r/   _record  s   


z5_StatsActor.update_execution_metrics.<locals>._recordobj_store_mem_spilledr   obj_store_mem_freedobj_store_mem_usedbytes_task_outputs_generatedrow_outputs_taken	cpu_usage	gpu_usage)r   node_ip_tagrA   )r   r   intr:   r   r,   zip_create_tagsr   r   rJ   r   r   r   r   r   r   r   r^   r   r   r   r   r   r   _rebuild_ray_nodes_cacher   r   update_dataset)r?   r   r   r   r   r   r   rg   operator_tagr   
field_namer   node_idnode_metricsr   r   metric_valuer.   r.   r/   r   |  sX   	





z$_StatsActor.update_execution_metricsc                 C   sH   t  }|D ]}|dd }|dd }|d ur!|d ur!|| j|< qd S )NNodeIDNodeName)raynodesrJ   r   )r?   current_nodesnoder   	node_namer.   r.   r/   r     s   
z$_StatsActor._rebuild_ray_nodes_cacherg   rW   c                 C   sJ   |  |}| j|j | | j|j | | j|j | d S rA   )r   r   r   rJ   r   r   )r?   rg   r   r   r.   r.   r/   r     s   
z$_StatsActor.update_iteration_metricsr   topologydata_contextc           	   	   C   sn   t   }|tjjddd|d dd |D d| j|< | jd ur5ddlm} ||||||d}| j| d S d S )Nr   c                 S   s    i | ]}|t jjd d d dqS )r   )r   progresstotalqueued_blocks)r   RUNNINGr   )r   rq   r.   r.   r/   
<dictcomp>  s    z0_StatsActor.register_dataset.<locals>.<dictcomp>)r   r   r   r   
total_rowsrY   end_time	operators)DatasetMetadata)r   r   r   rY   r   )	rB   r   r   r   r   r   $ray.data._internal.metadata_exporterr   export_dataset_metadata)	r?   r   r   r   r   r   rY   r   dataset_metadatar.   r.   r/   register_dataset  s.   
z_StatsActor.register_datasetc                 C   s<  | j | | | j | }| j | dd}tt| j | dd}|||d}| j|dd| | j|dd| |dtj	j
}t|}| j|j| |d	i  D ]?\}}	||d
}
| j|	dd|
 | j|	dd|
 | j|	dd|
 |	dtj	j
}t|}| j|j|
 q\d S )Nr   NonerY   r   r   r   r   r   r   ro   r   )r   updaterJ   r,   r   r   r   r   r   UNKNOWNr   from_stringr   rG   r^   r   r   r   r   )r?   r   r   r   rY   r   state_string
state_enumrq   op_stater   r.   r.   r/   r     sD   


z_StatsActor.update_datasetc                    s"    s| j S  fdd| j  D S )Nc                    s"   i | ]\}}|d   kr||qS r   r.   r   rd   re   r  r.   r/   r   +  s   " z,_StatsActor.get_datasets.<locals>.<dictcomp>)r   r^   )r?   r   r.   r  r/   get_datasets(  s   z_StatsActor.get_datasetsr   r   c                 C   s,   d|i}|d ur||d< |d ur||d< |S )Nrp   rq   r   r.   )r?   r   r   r   r   r.   r.   r/   r   -  s   z_StatsActor._create_tags)r)   rA   rO   )rg   rW   )NN)rP   rQ   rR   rS   r@   r   r   r,   r   r   r   r   r   r   r   r   r   r   r:   r   r
   r   r   r   r   r   r   r   r	  r   r.   r.   r.   r/   rn      sh    

 .





G


',rn   _stats_actor_lockc                  C   sp   t  } | j}tjjj stt 	 dd}t
 tjttdd|d W  d    S 1 s1w   Y  d S )NF)softTdetached)r   	namespaceget_if_existslifetimescheduling_strategy)r   get_currentr  r   utilclientis_connectedr    get_runtime_contextget_node_idr
  rn   optionsSTATS_ACTOR_NAMESTATS_ACTOR_NAMESPACEremote)ctxr  r.   r.   r/   _get_or_create_stats_actor@  s"   
$r  c                   @   s   e Zd ZdZdZdZdd Zd$dee fddZ	d	d
 Z
dee deeeeeeeef f f  fddZ	d%dedee dee deeef def
ddZdefddZdddefddZdefddZdedee dedefdd Zdefd!d"Zd#S )&_StatsManageraf  A Class containing util functions that manage remote calls to _StatsActor.

    This class collects stats from execution and iteration codepaths and keeps
    track of the latest snapshot.

    An instance of this class runs a single background thread that periodically
    forwards the latest execution/iteration stats to the _StatsActor.

    This thread will terminate itself after being inactive (meaning that there are
    no active executors or iterators) for STATS_ACTOR_UPDATE_THREAD_INACTIVITY_LIMIT
    iterations. After terminating, a new thread will start if more calls are made
    to this class.
       c                 C   s6   d | _ d | _i | _i | _t | _d | _t | _d S rA   )	_stats_actor_handle_stats_actor_cluster_id_last_execution_stats_last_iteration_stats	threadingLock_stats_lock_update_thread_update_thread_lockr>   r.   r.   r/   r@   j  s   
z_StatsManager.__init__Tr%   c                 C   sz   t jjjd u rtdt jjjj}| jd u s| j|kr:|r"t | _nz
t j	t
td| _W n
 ty6   Y d S w || _| jS )NzGlobal node is not initialized.)r   r  )r   _privateworker_global_nodeRuntimeError
cluster_idr  r   r  	get_actorr  r  
ValueError)r?   create_if_not_existscurrent_cluster_idr.   r.   r/   _stats_actor|  s    


z_StatsManager._stats_actorc                    sv    j .  jd u s j s) fdd}tj|dd _ j  W d    d S W d    d S 1 s4w   Y  d S )Nc                     s   d} 	  j s	 jr=z" jdd}|d u rW q|jjt j t j  d d} W n! ty<   tj	ddd Y d S w | d7 } | t
jkrMt	d	 d S ttj q)
Nr   TF)r/  )r   r   z1Error occurred during remote call to _StatsActor.)exc_infor&   z2Terminating StatsManager thread due to inactivity.)r"  r!  r1  r   r  listvalues	Exceptionloggerdebugr  UPDATE_THREAD_INACTIVITY_LIMITrB   sleepStatsManager#STATS_ACTOR_UPDATE_INTERVAL_SECONDS)iter_stats_inactivitystats_actorr>   r.   r/   _run_update_loop  sF   zD_StatsManager._start_thread_if_not_running.<locals>._run_update_loopT)targetdaemon)r'  r&  is_aliver#  Threadstart)r?   r>  r.   r>   r/   _start_thread_if_not_running  s   )"z*_StatsManager._start_thread_if_not_runningr   c              	   C   sl   t  jsdS tdd }|D ]$}|j D ]\}}|| }ttD ]}||j  t	||j7  < q"qq|S )aR  
        Aggregate per-node metrics from a list of OpRuntimeMetrics objects.

        If per-node metrics are disabled in the current DataContext, returns None.
        Otherwise, it sums up all NodeMetrics fields across the provided metrics and
        returns a nested dictionary mapping each node ID to a dict of field values.
        Nc                   S   s   t tS rA   )r   r   r.   r.   r.   r/   <lambda>  s    z;_StatsManager._aggregate_per_node_metrics.<locals>.<lambda>)
r   r  enable_per_node_metricsr   _per_node_metricsr^   r   r   r   getattr)r?   r   aggregated_by_noder   r   r   agg_node_metricsfr.   r.   r/   _aggregate_per_node_metrics  s   

z)_StatsManager._aggregate_per_node_metricsFr   r   r   force_updatec           	      C   sz   dd |D }|  |}|||||f}|r|  jj|  d S | j || j|< W d    n1 s2w   Y  |   d S )Nc                 S      g | ]}|  qS r.   )as_dict)r   r   r.   r.   r/   r         z:_StatsManager.update_execution_metrics.<locals>.<listcomp>)rL  r1  r   r  r%  r!  rD  )	r?   r   r   r   r   rM  op_metrics_dictsr   argsr.   r.   r/   r     s   
z&_StatsManager.update_execution_metricsc                 C   N   | j  || jv r| j|= W d    d S W d    d S 1 s w   Y  d S rA   )r%  r!  r?   r   r.   r.   r/   clear_last_execution_stats  s   

"z(_StatsManager.clear_last_execution_statsrg   rW   c                 C   s@   | j  ||f| j|< W d    n1 sw   Y  |   d S rA   )r%  r"  rD  )r?   rg   r   r.   r.   r/   r     s   z&_StatsManager.update_iteration_metricsc                 C   rS  rA   )r%  r"  rT  r.   r.   r/   clear_iteration_metrics  s   

"z%_StatsManager.clear_iteration_metricsr   r   c                 C   s$   |   jt  |||| dS )a1  Register a dataset with the stats actor.

        Args:
            dataset_tag: Tag for the dataset
            operator_tags: List of operator tags
            topology: Optional Topology representing the DAG structure to export
            data_context: The DataContext attached to the dataset
        N)r1  r   r  r   r  
get_job_id)r?   r   r   r   r   r.   r.   r/   register_dataset_to_stats_actor  s   

z-_StatsManager.register_dataset_to_stats_actorc                 C   s2   zt |  j W S  ty   t j Y S w rA   )r   rJ   r1  r   r  r5  r   hexr>   r.   r.   r/   get_dataset_id_from_stats_actor  s
   z-_StatsManager.get_dataset_id_from_stats_actorN)T)F)rP   rQ   rR   rS   r;  r8  r@   r
   r   r1  rD  r   r   r	   r,   r   r   r:   rL  r   r   boolr   rU  r   rV  r   r   rX  rZ  r.   r.   r.   r/   r  T  sJ    5


	
r  c                   @   s4   e Zd ZdZdZdZdZdZdd Ze	dd	 Z
d
S )r   zDEnum representing the possible states of a dataset during execution.r   r&   r'   r1   c                 C   rI   rA   )r   r>   r.   r.   r/   __str__2  rK   zDatasetState.__str__c                 C   s$   z| | W S  t y   | j Y S w )zGet enum by name.)KeyErrorr  )clstextr.   r.   r/   r  5  s
   

zDatasetState.from_stringN)rP   rQ   rR   rS   r  r   FINISHEDFAILEDr\  classmethodr  r.   r.   r.   r/   r   *  s    r   c                   @   s~   e Zd ZdZdddedeed  ed  f defddZ	e
d	d
 Z	ddedee defddZdddZdefddZdS )rW   zHolds the execution times for a given Dataset.

    This object contains a reference to the parent Dataset's stats as well,
    but not the Dataset object itself, to allow its blocks to be dropped from
    memory.N)r\   rZ   rV   r\   c                C   s   || _ |durt|ts|g}|pg | _| jsdntdd | jD d | _|| _d| _d| _t	 | _
t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _i | _d| _d| _d| _d| _d| _d| _t	 | _dS )a  Create dataset stats.

        Args:
            metadata: Dict of operators used to create this Dataset from the
                previous one. Typically one entry, e.g., {"map": [...]}.
            parent: Reference to parent Dataset's stats, or a list of parents
                if there are multiple.
            base_name: The name of the base operation for a multi-operator operation.
        Nr   c                 s       | ]}|j V  qd S rA   )numberr   pr.   r.   r/   	<genexpr>[      z(DatasetStats.__init__.<locals>.<genexpr>r&   unknown_uuid)rZ   r   r3  parentsrM   rd  r\   dataset_uuidra   r7   streaming_exec_schedule_siter_wait_s
iter_get_siter_next_batch_siter_format_batch_siter_collate_batch_siter_finalize_batch_sr   r   r   iter_total_sextra_metricsiter_blocks_localiter_blocks_remoteiter_unknown_locationglobal_bytes_spilledglobal_bytes_restoreddataset_bytes_spilledstreaming_split_coordinator_s)r?   rZ   rV   r\   r.   r.   r/   r@   E  s8   
 zDatasetStats.__init__c                 C   s   t  S rA   )r  r>   r.   r.   r/   r=    s   zDatasetStats.stats_actorr   rX   r%   c                 C   s   t || |S )z>Start recording stats for an op of the given name (e.g., map).)rT   )r?   r   rX   r.   r.   r/   child_builder  s   zDatasetStats.child_builderDatasetStatsSummaryc                 C   s   g }t | jdk}| j D ]\}}|tj|||d qt| j| j| j	| j
| j| j| j| j| j| j| j| j| j| j}g }| jdurLdd | jD }| jrT| j nd}t|||| j| j| j| j| j| j| j| j |S )zGenerate a `DatasetStatsSummary` object from the given `DatasetStats`
        object, which can be used to generate a summary string.r&   )is_sub_operatorNc                 S   rN  r.   )
to_summaryre  r.   r.   r/   r     rP  z+DatasetStats.to_summary.<locals>.<listcomp>r   )!r_   rZ   r^   appendOperatorStatsSummaryfrom_block_metadataIterStatsSummaryrm  rn  ro  rp  rq  rr  r   r   r   rs  r{  ru  rv  rw  rj  rl  rJ   r}  rd  rk  ra   r\   rt  rx  ry  rz  )r?   operators_statsr~  r   rg   
iter_statsstats_summary_parentsrl  r.   r.   r/   r    s^   
zDatasetStats.to_summaryc                 C   s   |    S )ax  Generate a string representing the runtime metrics of a Dataset. This is
        a high level summary of the time spent in Ray Data code broken down by operator.
        It also includes the time spent in the scheduler. Times are shown as the total
        time for each operator and percentages of time are shown as a fraction of the
        total time for the whole dataset.)r  runtime_metricsr>   r.   r.   r/   r    r6   zDatasetStats.runtime_metricsrA   )r%   r}  )rP   rQ   rR   rS   rl   r   r
   r   r,   r@   propertyr=  r:   rT   r|  r  r  r.   r.   r.   r/   rW   >  s,    
@


5rW   c                   @   s2  e Zd ZU ed ed< ded< ed  ed< eed< eed< eed< eed	< eee	f ed
< eed< eed< eed< eed< 			d)de
ee  dedefddZedd ded  fddZedd deeef fddZdefddZd*defdd Zdefd!d"Zdefd#d$Zdefd%d&Zdefd'd(ZdS )+r}  r  r  r  r  rj  rd  rk  ra   r\   rt  rx  ry  rz  rl  NTalready_printedinclude_parentr%   c                 C   s  |du rt  }d}| jr%|r%| jD ]}|j|dd}|r$||7 }|d7 }qd}t| jdkrY| jd }|j}| j| }	|d| j|7 }|	|v rM|d	7 }n\|	|	 |t
|7 }nPt| jdkrt| jd
}
|
dkrld}
|d| j| j|
7 }t| jD ],\}}|j}| j| }	|d7 }|d||7 }|	|v r|d7 }q||	|	 |t
|7 }q|t j}|r| jr|r|jrdnd}||7 }|dt
| j d 7 }|t
| j7 }t| jdkrS|rSt| jd }t| jd }|s|r|d7 }|d|7 }|d|7 }t| jd }|r|d7 }|d|7 }| jd j}|r$|d nd}|  }|  }|rS|rS|rS|d7 }|d7 }|d||  d7 }|d||  d7 }|ra|ra|d|   7 }|S )a  Return a human-readable summary of this Dataset's stats.

        Args:
            already_printed: Set of operator IDs that have already had its stats printed
               out.
            include_parent: If true, also include parent stats summary; otherwise, only
               log stats of the latest operator.
            add_global_stats: If true, includes global stats to this summary.
        Returns:
            String with summary statistics for executing the Dataset.
        Nr   F)add_global_stats
r&   r   zOperator {} {}: z[execution cached]
r'   z Operator {} {}: executed in {}s
z	Suboperator {} {}: z	[execution cached]
	z* Extra metrics:     .Az
Cluster memory:
z* Spilled to disk: {}MB
z* Restored from disk: {}MB
z
Dataset memory:
r[   sumzDataset throughput:
	* Ray Data throughput:  rows/s
%	* Estimated single node throughput: )r   rj  	to_stringr_   r  rU   rk  formatrd  rD   r,   r-   ra   r\   r]   r   r  verbose_stats_logsrt  r~  r  rx  ry  rz  output_num_rowsget_total_wall_timeget_total_time_all_blocksr  )r?   r  r  r  outrf  
parent_sumoperators_stats_summaryrU   operator_uuidrounded_totalnr  indent
mb_spilledmb_restoreddataset_mb_spilledr  total_num_out_rows	wall_timetotal_time_all_blocksr.   r.   r/   r    s   












zDatasetStatsSummary.to_stringcurrc                 C   s4   g }| j D ]}|r|j r|t| q|| g S rA   )rj  extendr}   _collect_dataset_stats_summaries)r  summsrf  r.   r.   r/   r  Q  s   


z4DatasetStatsSummary._collect_dataset_stats_summariessummc                 C   s0   t dd | jD }tdd | jD }||fS )Nc                 s   rc  rA   )earliest_start_timer   opsr.   r.   r/   rg  ^  rh  z:DatasetStatsSummary._find_start_and_end.<locals>.<genexpr>c                 s   rc  rA   )latest_end_timer  r.   r.   r/   rg  _  rh  )rL   r  rM   )r  earliest_start
latest_endr.   r.   r/   _find_start_and_end\  s   z'DatasetStatsSummary._find_start_and_endc                    s   |    dtdtdtf fdd}t| }d}|D ]}t|jdkr6t|\}}|| }|||j|7 }q||d| j	7 }||d	 7 }|S )
Nr   rB   r%   c                    s6    dkr|  nd}d|  dt | d|d ddS )Nr   z* : z (d   z.3fz%)
r0   )r   rB   fractiontotal_wall_timer.   r/   fmt_linee  s   "z5DatasetStatsSummary.runtime_metrics.<locals>.fmt_linezRuntime Metrics:
r   
SchedulingTotal)
r  r,   r:   r}  r  r_   r  r  r\   rl  )r?   r  	summariesr  r  r  r  op_total_timer.   r  r/   r  b  s   
z#DatasetStatsSummary.runtime_metricsr   c                    s  t  }d fdd| jD }d fdd| jD }d fdd| j D }|r8d| d| dnd	}|rEd| d| dnd	}|rRd| d| dnd	}d	g | d
| d| j d| d| j d| d| j d| d| d| d| d| d| j	
 d  d| d| jd  d| d| jd  d| d| jd  d| d| d| dS )Nr  c                       g | ]	}|  d  qS r'   __repr__r   sslevelr.   r/   r   y      z0DatasetStatsSummary.__repr__.<locals>.<listcomp>c                    r  r  r  )r   psr  r.   r/   r   {  r  c                 3   s0    | ]\}}t  d   | d| dV  qdS )r'   r  ,N)r5   r  r  r.   r/   rg  |  s
    
z/DatasetStatsSummary.__repr__.<locals>.<genexpr>,
z   r   zDatasetStatsSummary(
z   dataset_uuid=z   base_name=z
   number=z   extra_metrics={z},
z   operators_stats=[z],
z   iter_stats=r&   z   global_bytes_spilled=r  zMB,
z   global_bytes_restored=z   dataset_bytes_spilled=z   parents=[r   )r5   r   r  rj  rt  r^   rk  r\   rd  r  r  rx  ry  rz  )r?   r  r  r  parent_statsrt  r.   r  r/   r  v  sp   		


zDatasetStatsSummary.__repr__c                 C   sP   dd t | D }t|dkrdS tdd |D }tdd |D }|| S )zCalculate the total wall time for the dataset, this is done by finding
        the earliest start time and latest end time for any block in any operator.
        The wall time is the difference of these two times.
        c                 S   s$   g | ]}t |jd krt|qS r   )r_   r  r}  r  r   r  r.   r.   r/   r     s
    z;DatasetStatsSummary.get_total_wall_time.<locals>.<listcomp>r   c                 s       | ]}|d  V  qdS )r   Nr.   r   	start_endr.   r.   r/   rg        z:DatasetStatsSummary.get_total_wall_time.<locals>.<genexpr>c                 s   r  )r&   Nr.   r  r.   r.   r/   rg    r  )r}  r  r_   rL   rM   )r?   
start_endsr  r  r.   r.   r/   r    s   z'DatasetStatsSummary.get_total_wall_timec                 C   s   t | }tdd |D S )zGCalculate the sum of the wall times across all blocks of all operators.c                 s   s$    | ]}t d d |jD V  qdS )c                 s   s(    | ]}|j r|j d dndV  qdS r  r   N)r  rJ   r  r.   r.   r/   rg    s
    
zJDatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>.<genexpr>N)r  r  r  r.   r.   r/   rg    s    
z@DatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>)r}  r  r  )r?   r  r.   r.   r/   r    s   
z-DatasetStatsSummary.get_total_time_all_blocksc                 C   s,   t dd | jD }|t dd | jD  S )Nc                 s   s    | ]}|  V  qd S rA   )get_total_cpu_timere  r.   r.   r/   rg    r  z9DatasetStatsSummary.get_total_cpu_time.<locals>.<genexpr>c                 s   s    | ]
}|j d dV  qdS r  )cpu_timerJ   r  r.   r.   r/   rg    s    
)r  rj  r  )r?   r  r.   r.   r/   r    s   

z&DatasetStatsSummary.get_total_cpu_timec                 C   sF   dd | j D }|rt|nd}| js|S t|gdd | jD R  S )Nc                 S   rN  r.   )get_max_heap_memoryre  r.   r.   r/   r     rP  z;DatasetStatsSummary.get_max_heap_memory.<locals>.<listcomp>r   c                 S   s   g | ]	}|j d dqS )rM   r   )memoryrJ   r  r.   r.   r/   r     r  )rj  rM   r  )r?   parent_memory
parent_maxr.   r.   r/   r    s   z'DatasetStatsSummary.get_max_heap_memory)NTTr  )rP   rQ   rR   r   __annotations__r   r,   r:   r   r   r
   r   r[  r  staticmethodr  r   r  r  r  r  r  r  r  r.   r.   r.   r/   r}    sJ   
 

t
 r}  c                	   @   s>  e Zd ZU eed< eed< eed< eed< eed< eed< dZee	eef  ed< dZ
ee	eef  ed	< dZee	eef  ed
< dZee	eef  ed< dZee	eef  ed< dZee	eef  ed< dZee	eef  ed< dZee	eef  ed< ededee dedd fddZdefddZddefddZdS )r  rU   r~  ra   r  r  block_execution_summary_strNr  r  udf_timer  r  output_size_bytes
node_count	task_rowsblock_statsr%   c                 C   sV  dd |D }d}d}d\}}|r't dd |D }tdd |D }|| }|r1dt|}	n|rGt|d	}|dkr>d}d
t||}	nd}	|	d7 }	tt}
|D ]}|jdurk|j	durk|
|j	j
  |j7  < qTd}t|
dkrt |
 t|
 ttt|
 t|
d}dt|
|	}	d\}}}}|r!t dd |D tdd |D tdd |D tdd |D d}t dd |D tdd |D tdd |D tdd |D d}dd |D }t |t|tt|d}t dd |D tdd |D tdd |D tdd |D d}d}d d |D }|r?t |t|tt|t|d}d}d!d |D }|r]t |t|tt|t|d}d}|rtt}|D ]}||j |j
 qid"d# | D }t | t| ttt| t|d}t||||||	||||||||d$S )%a  Calculate the stats for a operator from a given list of blocks,
        and generates a `OperatorStatsSummary` object with the results.

        Args:
            block_stats: List of `BlockStats` to calculate stats of
            operator_name: Name of operator associated with `blocks`
            is_sub_operator: Whether this set of blocks belongs to a sub operator.
        Returns:
            A `OperatorStatsSummary` object initialized with the calculated statistics
        c                 S      g | ]
}|j d ur|j qS rA   )
exec_statsr   mr.   r.   r/   r         z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>r   )r   r   c                 s   rc  rA   )start_time_sr   r.   r.   r/   rg    rh  z;OperatorStatsSummary.from_block_metadata.<locals>.<genexpr>c                 s   rc  rA   )
end_time_sr   r.   r.   r/   rg    rh  z{} blocks produced
r'   z{} blocks produced in {}sr   r  N)rL   rM   meancountz{} tasks executed, {})NNNNc                 S      g | ]}|j qS r.   wall_time_sr   er.   r.   r/   r   #      c                 S   r  r.   r  r  r.   r.   r/   r   $  r  c                 S   r  r.   r  r  r.   r.   r/   r   %  r  c                 S   r  r.   r  r  r.   r.   r/   r   &  r  )rL   rM   r  r  c                 S   r  r.   
cpu_time_sr  r.   r.   r/   r   )  r  c                 S   r  r.   r  r  r.   r.   r/   r   *  r  c                 S   r  r.   r  r  r.   r.   r/   r   +  r  c                 S   r  r.   r  r  r.   r.   r/   r   ,  r  c                 S   s    g | ]}t |jp	d d dqS )r   i   r'   )r-   max_uss_bytesr  r.   r.   r/   r   /  s    )rL   rM   r  c                 S   r  r.   
udf_time_sr  r.   r.   r/   r   9  r  c                 S   r  r.   r  r  r.   r.   r/   r   :  r  c                 S   r  r.   r  r  r.   r.   r/   r   ;  r  c                 S   r  r.   r  r  r.   r.   r/   r   <  r  c                 S   r  rA   )num_rowsr  r.   r.   r/   r   @  r  c                 S   r  rA   )
size_bytesr  r.   r.   r/   r   J  s    c                 S      i | ]	\}}|t |qS r.   )r_   )r   r   tasksr.   r.   r/   r   [  r  z<OperatorStatsSummary.from_block_metadata.<locals>.<dictcomp>)rU   r~  ra   r  r  r  r  r  r  r  r  r  r  r  )rL   rM   r  r_   r-   r   r   r   r  r  task_idxr4  npr  r3  r  r   r   rD   r^   r  )r^  rU   r  r~  r  r  ra   r  r  exec_summary_strr  metatask_rows_statswall_time_stats	cpu_statsmemory_stats	udf_statsmemory_stats_mboutput_num_rows_statsr  output_size_bytes_statsr  node_counts_stats
node_tasksr(   node_countsr.   r.   r/   r    s   






z(OperatorStatsSummary.from_block_metadatac              	   C   sT  | j rdnd}| j}| j}|r-||7 }|dt|d t|d t|d t|d 7 }| j}|rP||7 }|dt|d t|d t|d t|d 7 }| j}|rs||7 }|d	t|d t|d t|d t|d 7 }| j}|r||7 }|d
|d |d |d 7 }| j}|r||7 }|d|d |d |d |d 7 }| j	}|r||7 }|d|d |d |d |d 7 }| j
}	|	r||7 }|d|	d |	d |	d |	d 7 }| j}
|
r||7 }|d|
d |
d |
d |
d 7 }|r(| jr(|r(|d }||7 }|d7 }||d|| j  d 7 }||d||d   d 7 }|S )E  For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
        `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
        that summarizes operator execution statistics.

        Returns:
            String with summary statistics for executing the given operator.
        r  r   z6* Remote wall time: {} min, {} max, {} mean, {} total
rL   rM   r  r  z5* Remote cpu time: {} min, {} max, {} mean, {} total
z.* UDF time: {} min, {} max, {} mean, {} total
z8* Peak heap memory usage (MiB): {} min, {} max, {} mean
z?* Output num rows per block: {} min, {} max, {} mean, {} total
zA* Output size bytes per block: {} min, {} max, {} mean, {} total
z?* Output rows per task: {} min, {} max, {} mean, {} tasks used
r  z9* Tasks per node: {} min, {} max, {} mean; {} nodes used
z* Operator throughput:
r  r  r  )r~  r  r  r  r0   r  r  r  r  r  r  r  ra   )r?   r  r  r  r  r   r  r  r  r  node_count_statsr  r.   r.   r/   r\  t  s   











			
zOperatorStatsSummary.__str__r   c           
      C   s  t |}|| jrt dnd7 }dd | jpi  D }dd | jp"i  D }dd | jp.i  D }dd | jp:i  D }dd | jpFi  D }d	d | jpRi  D }d	g | d
| d| j
 d| d| j d| dt| j d| d| j | d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d}	|	S )r  r&   r   c                 S   r  r.   r  r  r.   r.   r/   r     r  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>c                 S   r  r.   r  r  r.   r.   r/   r     r  c                 S   r  r.   r  r  r.   r.   r/   r     r  c                 S   r  r.   r  r  r.   r.   r/   r         c                 S   r  r.   r  r  r.   r.   r/   r     r	  c                 S   r  r.   r  r  r.   r.   r/   r     r  zOperatorStatsSummary(
z   operator_name='z',
z   is_suboperator=r  z   time_total_s=z   block_execution_summary_str=z   wall_time=Nz   cpu_time=z
   memory=z   output_num_rows=z   output_size_bytes=z   node_count=r   )r5   r~  r  r^   r  r  r  r  r  r   rU   r0   ra   r  )
r?   r  r  r  r  r  r  r  node_conut_statsr  r.   r.   r/   r    sr   		


zOperatorStatsSummary.__repr__r  )rP   rQ   rR   r,   r  r[  r:   r  r
   r   r  r  r  r  r  r  r  rb  r   r   r  r\  r  r.   r.   r.   r/   r    s8   
  zr  c                   @   s   e Zd ZU eed< eed< eed< eed< eed< eed< eed< eed< eed	< eed
< eed< eed< eed< eed< defddZdefddZddefddZ	dS )r  	wait_timeget_time	next_timeformat_timecollate_timefinalize_batch_time
block_time	user_timeinitialize_time
total_timestreaming_split_coord_timeru  rv  rw  r%   c                 C   s   |   S rA   )r  r>   r.   r.   r/   r\  3  s   zIterStatsSummary.__str__c              	   C   s  d}| j  s&| j s&| j s&| j s&| j s&| j s&| j rc|d7 }| j r;|dt	| j 7 }| j
 rL|dt	| j
 7 }| j  r]|dt	| j  7 }| j rn|dt	| j 7 }|d7 }| j r|dt	| j t	| j t	| j t	| j 7 }| j rd	}||t	| j t	| j t	| j t	| j 7 }| j rd
}||t	| j t	| j t	| j t	| j 7 }| j r|dt	| j t	| j t	| j t	| j 7 }| j r)d}||t	| j t	| j t	| j t	| j 7 }t jrK|d7 }|d| j7 }|d| j7 }|d| j7 }| j dkrc|d7 }|t	| j  d7 }|S )Nr   z"
Dataset iterator time breakdown:
z* Total time overall: {}
z>    * Total time in Ray Data iterator initialization code: {}
zE    * Total time user thread is blocked by Ray Data iter_batches: {}
z/    * Total execution time for user thread: {}
zC* Batch iteration time breakdown (summed across prefetch threads):
z5    * In ray.get(): {} min, {} max, {} avg, {} total
z:    * In batch creation: {} min, {} max, {} avg, {} total
z<    * In batch formatting: {} min, {} max, {} avg, {} total
z6    * In collate_fn: {} min, {} max, {} avg, {} total
zA    * In host->device transfer: {} min, {} max, {} avg, {} total
zBlock locations:
z    * Num blocks local: {}
z    * Num blocks remote: {}
z&    * Num blocks unknown location: {}
r   z+Streaming split coordinator overhead time: r  )r  rJ   r  r  r  r  r  r  r  r0   r  r  rL   rM   rN   r   r  'enable_get_object_locations_for_metricsru  rv  rw  r  )r?   r  batch_creation_str
format_strr.   r.   r/   r  6  s   	






zIterStatsSummary.to_stringr   c                 C   s:  t |}dg d| dt| j pd  d| dt| j p'd  d| d| jp4d  d| d| jpAd  d| d| jpNd  d| d	t| j	 p_d  d| d
t| j
 ppd  d| dt| j pd  d| dt| j pd  d| dS )Nr   zIterStatsSummary(
z   wait_time=r  z   get_time=z   iter_blocks_local=z   iter_blocks_remote=z   iter_unknown_location=z   next_time=z   format_time=z   user_time=z   total_time=r   )r5   r   r0   r  rJ   r  ru  rv  rw  r  r  r  r  )r?   r  r  r.   r.   r/   r    sR   
		

zIterStatsSummary.__repr__Nr  )
rP   rQ   rR   r7   r  r   r,   r\  r  r  r.   r.   r.   r/   r    s$   
 Xr  )r   r1   )Qr   enumloggingr#  rB   r   
contextlibr   dataclassesr   r   typingr   r   r   r	   r
   r   r   r   uuidr   numpyr  r   	ray.actorr   ray.data._internal.block_listr   :ray.data._internal.execution.interfaces.op_runtime_metricsr   r   r   r   r   r   r   r   ray.data._internal.utilr   ray.data.blockr   ray.data.contextr   ray.util.annotationsr   ray.util.metricsr   r   r   r   ray.util.scheduling_strategiesr    	getLoggerrP   r6  r  r  r  r,   rl   r:   r0   r   r5   r7   rT   r  rn   RLockr
  r  r  r  r:  IntEnumr   rW   r}  r  r  r.   r.   r.   r/   <module>   sl   
 (
		&
-   0 T  x  P