o
    bi                     @   sx  d dl Z d dlZd dlmZ d dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ eG dd	 d	eZed
krd dlZe Zejddeddd ejddeddd e ZejjddddZejdd ddZejdd ddZee	ddddeiejeje ddZe d e!  e"  ze d!#ej$j%j&d"d# W dS  e'y   e d$ Y dS w dS )%    N)Optional)train)ScalingConfig)
DataConfig)DataParallelTrainer)DeveloperAPIc                
       sd   e Zd ZdZddddddee deded	ee f fd
dZededed	ee fddZ	  Z
S )DummyTrainera  A Trainer that does nothing except read the data for a given number of epochs.

    It prints out as much debugging statistics as possible.

    This is useful for debugging data ingest problem. This trainer supports normal
    scaling options same as any other Trainer (e.g., num_workers, use_gpu).

    Args:
        scaling_config: Configuration for how to scale training. This is the same
            as for :class:`~ray.train.base_trainer.BaseTrainer`.
        num_epochs: How many many times to iterate through the datasets for.
        prefetch_batches: The number of batches to prefetch ahead of the
            current block during the scan. This is the same as
            :meth:`~ray.data.Dataset.iter_batches`
    N   i   )scaling_config
num_epochsprefetch_batches
batch_sizer
   r   r   r   c                   s4   |st dd}t j|t||||d| d S )Nr	   )num_workers)train_loop_per_workerr
   )r   super__init__r   make_train_loop)selfr
   r   r   r   argskwargs	__class__ M/home/ubuntu/.local/lib/python3.10/site-packages/ray/air/util/check_ingest.pyr   #   s   	

zDummyTrainer.__init__c                    s    fdd}|S )zAMake a debug train loop that runs for the given amount of epochs.c               
      s  dd l } t  }td}t }d\}}}g }td| tD ]k}|d7 }t }	|j	 dD ]Y}
t |	 }|
| |d7 }t|
| jrY|t|
jddd 7 }n%t|
tjre||
j7 }nt|
trw|
 D ]}||j7 }qnn|t|
7 }tt||||d	 t }	q4q#t | }td
|d tdt|dt|dt| td| td| tdt|d dd tdt|d | dd |dkrtd|  d S d S )Nr   r   )r   r   r   zStarting train loop on workerr	   )r   r   T)indexdeep)
bytes_readbatches_readepochs_readbatch_delayzTime to read all datasecondszP50/P95/Max batch delay (s)g      ?gffffff?zNum epochs readzNum batches readzNum bytes readi      MiBzMean throughputzMiB/szIngest stats from rank=0:

{})pandasr   get_contextget_world_rankget_dataset_shardtimeperf_counterprintrangeiter_batchesappend
isinstance	DataFrameintmemory_usagesumnpndarraynbytesdictvaluessys	getsizeofreportquantilemaxroundformatstats)pdrank
data_shardstartr   r   r   batch_delaysepochbatch_startbatchr   arrdeltar   r   r   r   r   r   ?   sn   










z;DummyTrainer.make_train_loop.<locals>.train_loop_per_workerr   )r   r   r   r   r   rI   r   r   7   s   =zDummyTrainer.make_train_loop)__name__
__module____qualname____doc__r   r   r/   r   staticmethodr   __classcell__r   r   r   r   r      s.    r   __main__z--num-epochsz-er	   zNumber of epochs to read.)typedefaulthelpz--prefetch-batchesz-bz0Number of batches to prefetch when reading data.iP  )P   rT      d   )shapeoverride_num_blocksc                 C      | d S Nr!   r   dfr   r   r   <lambda>       r]   r#   )batch_formatc                 C   rY   rZ   r   r[   r   r   r   r]      r^   F)r   use_gpur   )r
   datasetsr   r   dataset_configr   zDataset configz"Memory stats at end of ingest:

{}T)
stats_onlyzError getting Ray memory stats)(r7   r'   typingr   numpyr2   rayr   ray.air.configr   	ray.trainr   ray.train.data_parallel_trainerr   ray.util.annotationsr   r   rJ   argparseArgumentParserparseradd_argumentr/   
parse_argsr   datarange_tensordsmap_batchesr   r   trainerr)   get_dataset_configfitr=   _privateinternal_apimemory_summary	Exceptionr   r   r   r   <module>   sb   m


