o
    `۷ir#                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ eeZedZded	efd
dZ	d3deeg ef ee
eef gef f dee
eef  de	dee d	eg ef f
ddZG dd dee Z d4de!fddZ"dd Z#dd Z$deeee f d	eeee f fddZ%e j&d ee	 d	ed! fd"d#Z'd$e(d	efd%d&Z)d'ed	efd(d)Z*	 d5d*e+d+e,d	efd,d-Z-d	e!fd.d/Z.d4d0e!d	efd1d2Z/dS )6    N)datetime)
AnyCallableContextManagerDict	GeneratorGenericListOptionalTypeVarUnion)count_required_parameters)UserExceptionWithTraceback)	ObjectRefTbundlereturnc                 C   s<   |   } | dd| dd| ddd}| r| |d< |S )zConvert a bundle of resources to Ray actor/task arguments.

    >>> bundle_to_remote_args({"GPU": 1, "memory": 1, "custom": 0.1})
    {'num_cpus': 0, 'num_gpus': 1, 'memory': 1, 'resources': {'custom': 0.1}}
    CPUr   GPUmemory)num_cpusnum_gpusr   	resources)copypop)r   args r   Q/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/v2/_internal/util.pybundle_to_remote_args   s   


r   train_loop_per_worker
train_funcconfigtrain_func_contextfn_arg_namec                    sr   t }|dkr| d| d}t||dkr+ pi  t fdd}|S tfdd}|S )a  Validates and constructs the training function to execute.
    Args:
        train_func: The training function to execute.
            This can either take in no arguments or a ``config`` dict.
        config (Optional[Dict]): Configurations to pass into
            ``train_func``. If None then an empty Dict will be created.
        train_func_context: Context manager for user's `train_func`, which executes
            backend-specific logic before and after the training function.
        fn_arg_name (Optional[str]): The name of training function to use for error
            messages.
    Returns:
        A valid training function.
    Raises:
        ValueError: if the input ``train_func`` is invalid.
       z: should take in 0 or 1 required arguments, but it accepts z required arguments instead.c                      s2      W  d    S 1 sw   Y  d S Nr   r   r!   r    r"   r   r   train_fnQ   s   $z&construct_train_func.<locals>.train_fnc                      s0       W  d    S 1 sw   Y  d S r%   r   r   )r    r"   r   r   r'   X   s   $)r   
ValueError	functoolswraps)r    r!   r"   r#   num_required_paramserr_msgr'   r   r&   r   construct_train_func0   s   r-   c                   @   s,   e Zd ZdZdefddZdefddZdS )	ObjectRefWrapperz>Thin wrapper around ray.put to manually control dereferencing.objc                 C   s   t || _d S r%   )rayput_ref)selfr/   r   r   r   __init__c   s   zObjectRefWrapper.__init__r   c                 C   s   t | jS r%   )r0   getr2   )r3   r   r   r   r5   f   s   zObjectRefWrapper.getN)__name__
__module____qualname____doc__r   r4   r5   r   r   r   r   r.   `   s    r.   F
include_msc                 C   s   d}| r|d7 }t  |S )Nz%Y-%m-%d_%H-%M-%Sz.%f)r   todaystrftime)r:   patternr   r   r   date_strj   s   r>   c                   C   s   t  S r%   )time	monotonicr   r   r   r   time_monotonicq   s   rA   c                    s    fdd}|S )Nc                    s    j | _ | S r%   )r9   )func	copy_funcr   r   wrappedv   s   z_copy_doc.<locals>.wrappedr   )rD   rE   r   rC   r   	_copy_docu   s   rF   object_refsc                    s   t | t}|r	| n| g} | }i  |r0tj|dd\}}|r.t|t|D ]\}}| |< q%|st t| ks:J  fdd| D }|rG|S |d S )a  This is a safe version of `ray.get` that raises an exception immediately
    if an input task dies, while the others are still running.

    TODO(ml-team, core-team): This is NOT a long-term solution,
    and we should not maintain this function indefinitely.
    This is a mitigation for a Ray Core bug, and should be removed when
    that is fixed.
    See here: https://github.com/ray-project/ray/issues/47204

    Args:
        object_refs: A single or list of object refs to wait on.

    Returns:
        task_outputs: The outputs of the tasks.

    Raises:
        `RayTaskError`/`RayActorError`: if any of the tasks encounter a runtime error
            or fail due to actor/task death (ex: node failure).
    r$   )num_returnsc                    s   g | ]} | qS r   r   ).0tasktask_to_outputr   r   
<listcomp>   s    z ray_get_safe.<locals>.<listcomp>r   )
isinstancelistr0   waitzipr5   len)rG   is_listunreadyreadyrJ   task_outputordered_outputsr   rK   r   ray_get_safe}   s   

rX   context_managers)NNNc                 c   sL    t  }| D ]}||  qdV  W d   dS 1 sw   Y  dS )z
    Utility to invoke a list of context managers and yield sequentially.

    Args:
        context_managers: List of context managers to invoke.
    N)
contextlib	ExitStackenter_context)rY   stackcontext_managerr   r   r   invoke_context_managers   s   

"r_   r/   c                 C   s   | j  d| j S )zReturns the full module name of the given object, including its qualified name.

    Args:
        obj: The object (class, function, etc.) whose module name is required.

    Returns:
        Full module and qualified name as a string.
    .)r7   r8   )r/   r   r   r   get_module_name   s   	ra   fnc                 C   s.   t | tjrt| jS t| dr| jS | jjS )a  Returns a readable name for any callable.

    Examples:

        >>> get_callable_name(lambda x: x)
        '<lambda>'
        >>> def foo(a, b): pass
        >>> get_callable_name(foo)
        'foo'
        >>> from functools import partial
        >>> bar = partial(partial(foo, a=1), b=2)
        >>> get_callable_name(bar)
        'foo'
        >>> class Dummy:
        ...     def __call__(self, a, b): pass
        >>> get_callable_name(Dummy())
        'Dummy'
    r6   )rN   r)   partialget_callable_namerB   hasattrr6   	__class__rb   r   r   r   rd      s
   

rd   eexclude_framesc                 C   s:   t jtt | j|  d}td|  t| |dS )a@  Construct a UserExceptionWithTraceback from a base exception.

    Args:
        e: The base exception to construct a UserExceptionWithTraceback from.
        exclude_frames: The number of frames to exclude from the beginnning of
            the traceback.

    Returns:
        A UserExceptionWithTraceback object.
    )limitzError in training function:
)traceback_str)	traceback
format_excrR   
extract_tb__traceback__loggererrorr   )rh   ri   exc_traceback_strr   r   r   'construct_user_exception_with_traceback   s
   rs   c                  C   s.   ddl m}  z|   W dS  ty   Y dS w )z6Check if the current process is a Ray Train V2 worker.r   get_train_fn_utilsTF)/ray.train.v2._internal.execution.train_fn_utilsru   RuntimeErrorrt   r   r   r   _in_ray_train_worker   s   rx   raise_in_tune_sessionc                    s   dt dt f fdd}|S )a  Check that the caller is a Ray Train worker spawned by Ray Train,
    with access to training function utilities.

    Args:
        raise_in_tune_session: Whether to raise a specific error message if the caller
            is in a Tune session. If True, will raise a DeprecationWarning.

    Returns:
        A decorator that performs this check, which raises an error if the caller
        is not a Ray Train worker.
    rb   r   c                    s   t   fdd}|S )Nc                     sN   ddl m} r| rtd j dt s td j d | i |S )Nr   )_in_tune_sessionz`ray.train.z` is deprecated when running in a function passed to Ray Tune. Please use the equivalent `ray.tune` API instead. See this issue for more context: https://github.com/ray-project/ray/issues/49454`z` cannot be used outside of a Ray Train training function. You are calling this API from the driver or another non-training process. These utilities are only available within a function launched by `trainer.fit()`.)%ray.tune.trainable.trainable_fn_utilsrz   DeprecationWarningr6   rx   rw   )r   kwargsrz   )rb   ry   r   r   _wrapped_fn  s   
z9requires_train_worker.<locals>._wrap.<locals>._wrapped_fn)r)   r*   )rb   r   ry   rg   r   _wrap  s   z$requires_train_worker.<locals>._wrap)r   )ry   r   r   r   r   requires_train_worker   s   r   )r   )F)r   )0rZ   r)   loggingr?   rl   r   typingr   r   r   r   r   r   r	   r
   r   r   r0   ray.train._internal.utilsr   !ray.train.v2._internal.exceptionsr   	ray.typesr   	getLoggerr6   rp   r   dictr   strr-   r.   boolr>   rA   rF   rX   contextmanagerr_   objectra   rd   BaseExceptionintrs   rx   r   r   r   r   r   <module>   sj    0
$

0

&
