o
    ci                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ edZdedefd	d
Z	d)deeg ef eeeef gef f deeeef  dedee deg ef f
ddZG dd de
e Zd*defddZdd Zdd Zdeeee f deeee f fddZe jdee de	d  fd!d"Z d#e!defd$d%Z"d&edefd'd(Z#dS )+    N)datetime)
AnyCallableContextManagerDict	GeneratorGenericListOptionalTypeVarUnion)count_required_parameters)	ObjectRefTbundlereturnc                 C   s<   |   } | dd| dd| ddd}| r| |d< |S )zConvert a bundle of resources to Ray actor/task arguments.

    >>> bundle_to_remote_args({"GPU": 1, "memory": 1, "custom": 0.1})
    {'num_cpus': 0, 'num_gpus': 1, 'memory': 1, 'resources': {'custom': 0.1}}
    CPUr   GPUmemory)num_cpusnum_gpusr   	resources)copypop)r   args r   O/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/v2/_internal/util.pybundle_to_remote_args   s   


r   train_loop_per_worker
train_funcconfigtrain_func_contextfn_arg_namec                    sr   t }|dkr| d| d}t||dkr+ pi  t fdd}|S tfdd}|S )a  Validates and constructs the training function to execute.
    Args:
        train_func: The training function to execute.
            This can either take in no arguments or a ``config`` dict.
        config (Optional[Dict]): Configurations to pass into
            ``train_func``. If None then an empty Dict will be created.
        train_func_context: Context manager for user's `train_func`, which executes
            backend-specific logic before and after the training function.
        fn_arg_name (Optional[str]): The name of training function to use for error
            messages.
    Returns:
        A valid training function.
    Raises:
        ValueError: if the input ``train_func`` is invalid.
       z: should take in 0 or 1 required arguments, but it accepts z required arguments instead.c                      s2      W  d    S 1 sw   Y  d S Nr   r   r    r   r!   r   r   train_fnK   s   $z&construct_train_func.<locals>.train_fnc                      s0       W  d    S 1 sw   Y  d S r$   r   r   )r   r!   r   r   r&   R   s   $)r   
ValueError	functoolswraps)r   r    r!   r"   num_required_paramserr_msgr&   r   r%   r   construct_train_func*   s   r,   c                   @   s,   e Zd ZdZdefddZdefddZdS )	ObjectRefWrapperz>Thin wrapper around ray.put to manually control dereferencing.objc                 C   s   t || _d S r$   )rayput_ref)selfr.   r   r   r   __init__]   s   zObjectRefWrapper.__init__r   c                 C   s   t | jS r$   )r/   getr1   )r2   r   r   r   r4   `   s   zObjectRefWrapper.getN)__name__
__module____qualname____doc__r   r3   r4   r   r   r   r   r-   Z   s    r-   F
include_msc                 C   s   d}| r|d7 }t  |S )Nz%Y-%m-%d_%H-%M-%Sz.%f)r   todaystrftime)r9   patternr   r   r   date_strd   s   r=   c                   C   s   t  S r$   )time	monotonicr   r   r   r   time_monotonick   s   r@   c                    s    fdd}|S )Nc                    s    j | _ | S r$   )r8   )func	copy_funcr   r   wrappedp   s   z_copy_doc.<locals>.wrappedr   )rC   rD   r   rB   r   	_copy_doco   s   rE   object_refsc                    s   t | t}|r	| n| g} | }i  |r0tj|dd\}}|r.t|t|D ]\}}| |< q%|st t| ks:J  fdd| D }|rG|S |d S )a  This is a safe version of `ray.get` that raises an exception immediately
    if an input task dies, while the others are still running.

    TODO(ml-team, core-team): This is NOT a long-term solution,
    and we should not maintain this function indefinitely.
    This is a mitigation for a Ray Core bug, and should be removed when
    that is fixed.
    See here: https://github.com/ray-project/ray/issues/47204

    Args:
        object_refs: A single or list of object refs to wait on.

    Returns:
        task_outputs: The outputs of the tasks.

    Raises:
        `RayTaskError`/`RayActorError`: if any of the tasks encounter a runtime error
            or fail due to actor/task death (ex: node failure).
    r#   )num_returnsc                    s   g | ]} | qS r   r   ).0tasktask_to_outputr   r   
<listcomp>   s    z ray_get_safe.<locals>.<listcomp>r   )
isinstancelistr/   waitzipr4   len)rF   is_listunreadyreadyrI   task_outputordered_outputsr   rJ   r   ray_get_safew   s   

rW   context_managers)NNNc                 c   sL    t  }| D ]}||  qdV  W d   dS 1 sw   Y  dS )z
    Utility to invoke a list of context managers and yield sequentially.

    Args:
        context_managers: List of context managers to invoke.
    N)
contextlib	ExitStackenter_context)rX   stackcontext_managerr   r   r   invoke_context_managers   s   

"r^   r.   c                 C   s   | j  d| j S )zReturns the full module name of the given object, including its qualified name.

    Args:
        obj: The object (class, function, etc.) whose module name is required.

    Returns:
        Full module and qualified name as a string.
    .)r6   r7   )r.   r   r   r   get_module_name   s   	r`   fnc                 C   s.   t | tjrt| jS t| dr| jS | jjS )a  Returns a readable name for any callable.

    Examples:

        >>> get_callable_name(lambda x: x)
        '<lambda>'
        >>> def foo(a, b): pass
        >>> get_callable_name(foo)
        'foo'
        >>> from functools import partial
        >>> bar = partial(partial(foo, a=1), b=2)
        >>> get_callable_name(bar)
        'foo'
        >>> class Dummy:
        ...     def __call__(self, a, b): pass
        >>> get_callable_name(Dummy())
        'Dummy'
    r5   )rM   r(   partialget_callable_namerA   hasattrr5   	__class__)ra   r   r   r   rc      s
   

rc   )r   )F)$rY   r(   r>   r   typingr   r   r   r   r   r   r	   r
   r   r   r/   ray.train._internal.utilsr   	ray.typesr   r   dictr   strr,   r-   boolr=   r@   rE   rW   contextmanagerr^   objectr`   rc   r   r   r   r   <module>   sN    0$

0

&