o
    bi~s                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
mZmZmZmZmZ d dlZd dlmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ e
red d
lm Z  e!e"Z#da$de%d< e& Z'eddG dd de(ej)Z*dZ+dZ,dZ-dZ.dZ/dZ0dZ1dZ2dZ3e4ej56ddZ7ej56de*j8Z9eddZ:dZ;dZ<dZ=dZ>dZ?e4e@ej56d d!ZAdZBed"dZCed#dZDed$dZEdZFdZGe4e@ej56d%d!ZHed&dZIed'dZJe4e@ej56d(d)ZKe4ed*d  ZLed+dZMdZNd,ZOd-ZPd.ZQdZRed/dZSeTej56d0d1ZUd ZVd2ZWd3ZXd4ZYd5ZZdZ[ed6d7Z\d8Z]e4e@ej56d9d!Z^ed:d;Z_ed<d=Z`ed>d?ZaeTe%d@< edAdBZbeTe%dC< eeG dDdE dEZcdNdHdIZddFe*fdJdKZeeeG dLdM dMZfefZgdS )O    N)	dataclassfield)TYPE_CHECKINGAnyDictListOptionalUnion)env_bool	env_floatenv_integer)WORKER_MODE) update_dataset_logger_for_worker)DeveloperAPI)log_once)SchedulingStrategyTExecutionOptionszOptional[DataContext]_default_contextalpha)	stabilityc                   @   s   e Zd ZdZdZdZdZdS )ShuffleStrategyzkShuffle strategy determines shuffling algorithm employed by operations
    like aggregate, repartition, etcsort_shuffle_pull_basedsort_shuffle_push_basedhash_shuffleN)__name__
__module____qualname____doc__SORT_SHUFFLE_PULL_BASEDSORT_SHUFFLE_PUSH_BASEDHASH_SHUFFLE r"   r"   D/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/context.pyr      s
    r   i   i   @g      ?i   i   T   FRAY_DATA_PUSH_BASED_SHUFFLE!RAY_DATA_DEFAULT_SHUFFLE_STRATEGY%RAY_DATA_MAX_HASH_SHUFFLE_AGGREGATORS@   SPREADDEFAULTi   RAY_DATA_EAGER_FREE0 RAY_DATA_DEFAULT_MIN_PARALLELISM(RAY_DATA_ENABLE_TENSOR_EXTENSION_CASTINGRAY_DATA_USE_ARROW_TENSOR_V2RAY_DATA_TRACE_ALLOCATIONS+RAY_DATA_LOG_INTERNAL_STACK_TRACE_TO_STDOUT%RAY_DATA_RAISE_ORIGINAL_MAP_EXCEPTIONRAY_TQDM1RAY_DATA_DISABLE_PROGRESS_BARS,RAY_DATA_ENABLE_PROGRESS_BAR_NAME_TRUNCATION)AWS Error INTERNAL_FAILUREAWS Error NETWORK_CONNECTIONAWS Error SLOW_DOWN#AWS Error UNKNOWN (HTTP status 503))r7   r8   r9   r:   zAWS Error SERVICE_UNAVAILABLEl        'RAY_DATA_ENABLE_OP_RESOURCE_RESERVATIONRAY_DATA_OP_RESERVATION_RATIOz0.5u   ⚠️ u   ✔️ i      &RAY_DATA_DEFAULT_WAIT_FOR_MIN_ACTORS_S   RAY_DATA_PER_NODE_METRICS3RAY_DATA_MIN_HASH_SHUFFLE_AGGREGATOR_WAIT_TIME_IN_Si,  :RAY_DATA_HASH_SHUFFLE_AGGREGATOR_HEALTH_WARNING_INTERVAL_S   4RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_UPSCALING_THRESHOLDg       @+DEFAULT_ACTOR_POOL_UTIL_UPSCALING_THRESHOLD6RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLDg      ?-DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLDc                   @   s&   e Zd ZU eZeed< eZeed< dS )AutoscalingConfig#actor_pool_util_upscaling_threshold%actor_pool_util_downscaling_thresholdN)	r   r   r   rF   rJ   float__annotations__rH   rK   r"   r"   r"   r#   rI      s
   
 
rI   returnr   c                  C   s   ddl m}  |  S )Nr   r   )'ray.data._internal.execution.interfacesr   r   r"   r"   r#   _execution_options_factory  s   rP   c                  C   sJ   t r
td tjS dd tD } t| v s#J dd|  dt dtS )NzqRAY_DATA_PUSH_BASED_SHUFFLE is deprecated, please use RAY_DATA_DEFAULT_SHUFFLE_STRATEGY to set shuffling strategyc                 S   s   g | ]}|qS r"   r"   ).0sr"   r"   r#   
<listcomp>  s    z5_deduce_default_shuffle_algorithm.<locals>.<listcomp>z8RAY_DATA_DEFAULT_SHUFFLE_STRATEGY has to be one of the [,z] (got ))DEFAULT_USE_PUSH_BASED_SHUFFLEloggerwarningr   r    DEFAULT_SHUFFLE_STRATEGYjoin)vsr"   r"   r#   !_deduce_default_shuffle_algorithm  s   
r\   c                       s  e Zd ZU dZeZeed< eZ	eed< e
Zeed< eZeed< eZeed< eZeed< eedZeed	< eZeed
< e Zeed< dZeed< eZeZe e ed< e!Z"eed< e#Z$eed< dZ%e e ed< dZ&e'ed< dZ(e'ed< dZ)e'ed< e*Z+e,ed< e-Z.e,ed< e/Z0eed< e1Z2eed< e3Z4eed< e5Z6eed< e7Z8eed< eZ9eed< e:Z;eed< e<Z=eed< e>Z?eed < dZ@e e ed!< eAZBeed"< eCZDeed#< eEZFeed$< eeGdZHd%ed&< eIZJeed'< eKZLeed(< dZMeed)< eNZOeed*< ePZQeed+< eRZSeTeU ed,< eVZWeed-< eXZYeZeeTe[ f ed.< e\Z]eed/< e^Z_e'ed0< e`Zaeed1< ebZceed2< edZeeed3< dZfeed4< egZheed5< eiZjeed6< ekZle e ed7< ed8d9 dZmeTeU ed:< enZoeed;< dZpe'ed<< d=Zqe e' ed>< dZre eU ed?< d@ZseedA< dBdC ZtdDeUdEeudFdf fdGdHZvewd^dIdJZxewd_dLdMZyezdFefdNdOZ{e{j|dEedFdfdPdOZ{d`dQeUdReudFeufdSdTZ}dQeUdEeudFdfdUdVZ~dQeUdFdfdWdXZd^dYdZZd[eUdFdfd\d]Z  ZS )aDataContexta  Global settings for Ray Data.

    Configure this class to enable advanced features and tune performance.

    .. warning::
        Apply changes before creating a :class:`~ray.data.Dataset`. Changes made after
        won't take effect.

    .. note::
        This object is automatically propagated to workers. Access it from the driver
        and remote workers with :meth:`DataContext.get_current()`.

    Examples:
        >>> from ray.data import DataContext
        >>> DataContext.get_current().enable_progress_bars = False

    Args:
        target_max_block_size: The max target block size in bytes for reads and
            transformations.
        target_shuffle_max_block_size: The max target block size in bytes for shuffle
            ops like ``random_shuffle``, ``sort``, and ``repartition``.
        target_min_block_size: Ray Data avoids creating blocks smaller than this
            size in bytes on read. This takes precedence over
            ``read_op_min_num_blocks``.
        streaming_read_buffer_size: Buffer size when doing streaming reads from local or
            remote storage.
        enable_pandas_block: Whether pandas block format is enabled.
        actor_prefetcher_enabled: Whether to use actor based block prefetcher.
        autoscaling_config: Autoscaling configuration.
        use_push_based_shuffle: Whether to use push-based shuffle.
        pipeline_push_based_shuffle_reduce_tasks:
        scheduling_strategy: The global scheduling strategy. For tasks with large args,
            ``scheduling_strategy_large_args`` takes precedence.
        scheduling_strategy_large_args: Scheduling strategy for tasks with large args.
        large_args_threshold: Size in bytes after which point task arguments are
            considered large. Choose a value so that the data transfer overhead is
            significant in comparison to task scheduling (i.e., low tens of ms).
        use_polars: Whether to use Polars for tabular dataset sorts, groupbys, and
            aggregations.
        eager_free: Whether to eagerly free memory.
        decoding_size_estimation: Whether to estimate in-memory decoding data size for
            data source.
        min_parallelism: This setting is deprecated. Use ``read_op_min_num_blocks``
            instead.
        read_op_min_num_blocks: Minimum number of read output blocks for a dataset.
        enable_tensor_extension_casting: Whether to automatically cast NumPy ndarray
            columns in Pandas DataFrames to tensor extension columns.
        use_arrow_tensor_v2: Config enabling V2 version of ArrowTensorArray supporting
            tensors > 2Gb in size (off by default)
        enable_fallback_to_arrow_object_ext_type: Enables fallback to serialize column
            values not suppported by Arrow natively (like user-defined custom Python
            classes for ex, etc) using `ArrowPythonObjectType` (simply serializing
            these as bytes)
        enable_auto_log_stats: Whether to automatically log stats after execution. If
            disabled, you can still manually print stats with ``Dataset.stats()``.
        verbose_stats_logs: Whether stats logs should be verbose. This includes fields
            such as `extra_metrics` in the stats output, which are excluded by default.
        trace_allocations: Whether to trace allocations / eager free. This adds
            significant performance overheads and should only be used for debugging.
        execution_options: The
            :class:`~ray.data._internal.execution.interfaces.execution_options.ExecutionOptions`
            to use.
        use_ray_tqdm: Whether to enable distributed tqdm.
        enable_progress_bars: Whether to enable progress bars.
        enable_progress_bar_name_truncation: If True, the name of the progress bar
            (often the operator name) will be truncated if it exceeds
            `ProgressBar.MAX_NAME_LENGTH`. Otherwise, the full operator name is shown.
        enable_get_object_locations_for_metrics: Whether to enable
            ``get_object_locations`` for metrics.
        write_file_retry_on_errors: A list of substrings of error messages that should
            trigger a retry when writing files. This is useful for handling transient
            errors when writing to remote storage systems.
        warn_on_driver_memory_usage_bytes: If driver memory exceeds this threshold,
            Ray Data warns you. For now, this only applies to shuffle ops because most
            other ops are unlikely to use as much driver memory.
        actor_task_retry_on_errors: The application-level errors that actor task should
            retry. This follows same format as :ref:`retry_exceptions <task-retries>` in
            Ray Core. Default to `False` to not retry on any errors. Set to `True` to
            retry all errors, or set to a list of errors to retry.
        enable_op_resource_reservation: Whether to reserve resources for each operator.
        op_resource_reservation_ratio: The ratio of the total resources to reserve for
            each operator.
        max_errored_blocks: Max number of blocks that are allowed to have errors,
            unlimited if negative. This option allows application-level exceptions in
            block processing tasks. These exceptions may be caused by UDFs (e.g., due to
            corrupted data samples) or IO errors. Data in the failed blocks are dropped.
            This option can be useful to prevent a long-running job from failing due to
            a small number of bad blocks.
        log_internal_stack_trace_to_stdout: Whether to include internal Ray Data/Ray
            Core code stack frames when logging to stdout. The full stack trace is
            always written to the Ray Data log file.
        raise_original_map_exception: Whether to raise the original exception
            encountered in map UDF instead of wrapping it in a `UserCodeException`.
        print_on_execution_start: If ``True``, print execution information when
            execution starts.
        s3_try_create_dir: If ``True``, try to create directories on S3 when a write
            call is made with a S3 URI.
        wait_for_min_actors_s: The default time to wait for minimum requested
            actors to start before raising a timeout, in seconds.
        max_tasks_in_flight_per_actor: Max number of tasks that could be submitted
            for execution to individual actor at the same time. Note that only up to
            `max_concurrency` number of these tasks will be executing concurrently
            while remaining ones will be waiting in the Actor's queue. Buffering
            tasks in the queue allows us to overlap pulling of the blocks (which are
            tasks arguments) with the execution of the prior tasks maximizing
            individual Actor's utilization
        retried_io_errors: A list of substrings of error messages that should
            trigger a retry when reading or writing files. This is useful for handling
            transient errors when reading from remote storage systems.
        enable_per_node_metrics: Enable per node metrics reporting for Ray Data,
            disabled by default.
        memory_usage_poll_interval_s: The interval to poll the USS of map tasks. If `None`,
            map tasks won't record memory stats.
    target_max_block_sizetarget_shuffle_max_block_sizetarget_min_block_sizestreaming_read_buffer_sizeenable_pandas_blockactor_prefetcher_enabled)default_factoryautoscaling_configuse_push_based_shuffle_shuffle_strategyT(pipeline_push_based_shuffle_reduce_tasksmax_hash_shuffle_aggregators*min_hash_shuffle_aggregator_wait_time_in_s1hash_shuffle_aggregator_health_warning_interval_sN(max_hash_shuffle_finalization_batch_size3join_operator_actor_num_cpus_per_partition_override;hash_shuffle_operator_actor_num_cpus_per_partition_override=hash_aggregate_operator_actor_num_cpus_per_partition_overridescheduling_strategyscheduling_strategy_large_argslarge_args_threshold
use_polarsuse_polars_sort
eager_freedecoding_size_estimationmin_parallelismread_op_min_num_blocksenable_tensor_extension_castinguse_arrow_tensor_v2(enable_fallback_to_arrow_object_ext_typeenable_auto_log_statsverbose_stats_logstrace_allocationsr   execution_optionsuse_ray_tqdmenable_progress_barsenable_operator_progress_bars#enable_progress_bar_name_truncation'enable_get_object_locations_for_metricswrite_file_retry_on_errors!warn_on_driver_memory_usage_bytesactor_task_retry_on_errorsop_resource_reservation_enabledop_resource_reservation_ratiomax_errored_blocks"log_internal_stack_trace_to_stdoutraise_original_map_exceptionprint_on_execution_starts3_try_create_dirwait_for_min_actors_smax_tasks_in_flight_per_actorc                   C   s   t tS N)listDEFAULT_RETRIED_IO_ERRORSr"   r"   r"   r#   <lambda>  s    zDataContext.<lambda>retried_io_errorsenable_per_node_metrics+override_object_store_memory_limit_fraction   memory_usage_poll_interval_sdataset_logger_idF_enable_actor_pool_on_exit_hookc                 C   s`   i | _ i | _t| _tjdd u}|r+t j	j
tk}|r&tdr&td d| _d S d| _d S )N
RAY_JOB_ID3ray_data_disable_operator_progress_bars_in_ray_jobszDisabling operator-level progress bars by default in Ray Jobs. To enable progress bars for all operators, set `ray.data.DataContext.get_current().enable_operator_progress_bars = True`.FT) _task_pool_data_task_remote_args_kv_configs.DEFAULT_MAX_NUM_BLOCKS_IN_STREAMING_GEN_BUFFER'_max_num_blocks_in_streaming_gen_bufferosenvirongetrayget_runtime_contextworkermoder   r   rW   infor   )self
is_ray_job	is_driverr"   r"   r#   __post_init__
  s   


zDataContext.__post_init__namevaluerN   c                    s`   |dkr|t krtdt n|dkrtdt n|dkr'tdt || _t || d S )Nr   zR`write_file_retry_on_errors` is deprecated. Configure `retried_io_errors` instead.rf   T`use_push_based_shuffle` is deprecated, please configure `shuffle_strategy` instead.rs   zH`use_polars` is deprecated, please configure `use_polars_sort`  instead.)"DEFAULT_WRITE_FILE_RETRY_ON_ERRORSwarningswarnDeprecationWarningrt   super__setattr__)r   r   r   	__class__r"   r#   r   /  s$   zDataContext.__setattr__c                   C   s:   t  tdu r
t atW  d   S 1 sw   Y  dS )a  Get or create the current DataContext.

        When a Dataset is created, the current DataContext will be sealed.
        Changes to `DataContext.get_current()` will not impact existing Datasets.

        Examples:

            .. testcode::
                import ray

                context = ray.data.DataContext.get_current()

                context.target_max_block_size = 100 * 1024 ** 2
                ds1 = ray.data.range(1)
                context.target_max_block_size = 1 * 1024 ** 2
                ds2 = ray.data.range(1)

                # ds1's target_max_block_size will be 100MB
                ds1.take_all()
                # ds2's target_max_block_size will be 1MB
                ds2.take_all()

        Developer notes: Avoid using `DataContext.get_current()` in data
        internal components, use the DataContext object captured in the
        Dataset and pass it around as arguments.
        N)_context_lockr   r]   r"   r"   r"   r#   get_currentJ  s
   $zDataContext.get_currentcontextc                 C   s"   t rt j| jkrt| j | a dS )zSet the current context in a remote worker.

        This is used internally by Dataset to propagate the driver context to
        remote workers used for parallelization.
        N)r   r   r   )r   r"   r"   r#   _set_currento  s
   	
zDataContext._set_currentc                 C   s   | j rtd tjS | jS )Nr   )rf   rW   rX   r   r    rg   r   r"   r"   r#   shuffle_strategy~  s   zDataContext.shuffle_strategyc                 C   s
   || _ d S r   )rg   )r   r   r"   r"   r#   r        
keydefaultc                 C   s   | j ||S )a  Get the value for a key-value style config.

        Args:
            key: The key of the config.
            default: The default value to return if the key is not found.
        Returns: The value for the key, or the default value if the key is not found.
        )r   r   )r   r   r   r"   r"   r#   
get_config  s   zDataContext.get_configc                 C   s   || j |< dS )zSet the value for a key-value style config.

        Args:
            key: The key of the config.
            value: The value of the config.
        N)r   )r   r   r   r"   r"   r#   
set_config  s   zDataContext.set_configc                 C   s   | j |d dS )z`Remove a key-value style config.

        Args:
            key: The key of the config.
        N)r   pop)r   r   r"   r"   r#   remove_config  s   zDataContext.remove_configc                 C   s
   t | S )z)Create a copy of the current DataContext.)copydeepcopyr   r"   r"   r#   r     r   zDataContext.copy
dataset_idc                 C   s
   || _ dS )zSet the current dataset logger id.

        This is used internally to propagate the current dataset logger id to remote
        workers.
        N)r   )r   r   r"   r"   r#   set_dataset_logger_id  s   
z!DataContext.set_dataset_logger_id)rN   r]   )r   r]   rN   Nr   )r   r   r   r   DEFAULT_TARGET_MAX_BLOCK_SIZEr^   intrM   %DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZEr_   DEFAULT_TARGET_MIN_BLOCK_SIZEr`   "DEFAULT_STREAMING_READ_BUFFER_SIZEra   DEFAULT_ENABLE_PANDAS_BLOCKrb   bool DEFAULT_ACTOR_PREFETCHER_ENABLEDrc   r   rI   re   rV   rf   r\   rg   r   rh   DEFAULT_MIN_PARALLELISM default_hash_shuffle_parallelism$DEFAULT_MAX_HASH_SHUFFLE_AGGREGATORSri   r   2DEFAULT_MIN_HASH_SHUFFLE_AGGREGATOR_WAIT_TIME_IN_Srj   9DEFAULT_HASH_SHUFFLE_AGGREGATOR_HEALTH_WARNING_INTERVAL_Srk   rl   rm   rL   rn   ro   DEFAULT_SCHEDULING_STRATEGYrp   r   &DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGSrq   DEFAULT_LARGE_ARGS_THRESHOLDrr   DEFAULT_USE_POLARSrs   DEFAULT_USE_POLARS_SORTrt   DEFAULT_EAGER_FREEru   (DEFAULT_DECODING_SIZE_ESTIMATION_ENABLEDrv   rw   DEFAULT_READ_OP_MIN_NUM_BLOCKSrx   'DEFAULT_ENABLE_TENSOR_EXTENSION_CASTINGry   DEFAULT_USE_ARROW_TENSOR_V2rz   r{   DEFAULT_AUTO_LOG_STATSr|   DEFAULT_VERBOSE_STATS_LOGr}   DEFAULT_TRACE_ALLOCATIONSr~   rP   r   DEFAULT_USE_RAY_TQDMr   DEFAULT_ENABLE_PROGRESS_BARSr   r   +DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATIONr   /DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICSr   r   r   r   str)DEFAULT_WARN_ON_DRIVER_MEMORY_USAGE_BYTESr   "DEFAULT_ACTOR_TASK_RETRY_ON_ERRORSr   r	   BaseException&DEFAULT_ENABLE_OP_RESOURCE_RESERVATIONr   %DEFAULT_OP_RESOURCE_RESERVATION_RATIOr   DEFAULT_MAX_ERRORED_BLOCKSr   *DEFAULT_LOG_INTERNAL_STACK_TRACE_TO_STDOUTr   -DEFAULT_RAY_DATA_RAISE_ORIGINAL_MAP_EXCEPTIONr   r   DEFAULT_S3_TRY_CREATE_DIRr   DEFAULT_WAIT_FOR_MIN_ACTORS_Sr   %DEFAULT_MAX_TASKS_IN_FLIGHT_PER_ACTORr   r   DEFAULT_ENABLE_PER_NODE_METRICSr   r   r   r   r   r   r   r   staticmethodr   r   propertyr   setterr   r   r   r   r   __classcell__r"   r"   r   r#   r]     s   
 s







%$
	
r]   )rN   r   )hr   enumloggingr   	threadingr   dataclassesr   r   typingr   r   r   r   r   r	   r   ray._private.ray_constantsr
   r   r   ray._private.workerr   ray.data._internal.loggingr   ray.util.annotationsr   ray.util.debugr   ray.util.scheduling_strategiesr   rO   r   	getLoggerr   rW   r   rM   Lockr   r   Enumr   r   r   MAX_SAFE_BLOCK_SIZE_FACTORMAX_SAFE_ROWS_PER_BLOCK_FACTORr   r   r   r   r   r   r   r   rV   r   rY   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rL   r   r   WARN_PREFIX	OK_PREFIXLEGACY_DEFAULT_BATCH_SIZEr   r   r   r   r   r   r   rF   rH   rI   rP   r\   r]   DatasetContextr"   r"   r"   r#   <module>   s   
  


		
   