o
    ciL                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZ d dl m!Z! d d	l"m#Z#m$Z$ d d
l%m&Z&m'Z' d dl(Z(e)e*Z+dd Z,e Z-e&G dd deZ.e&e/dddfdeg ef deee/ eee/  f de0de0dee de1fddZ2e&G dd dZ3e&G dd de4Z5e&dd Z6dd  Z7d!d" Z8e&d#efd$d%Z9d&ed'e:d(e:d)e:fd*d+Z;d,e:d-e:dee fd.d/Z<e'd0d1		2	3	4	dJd5eee0e:f  d6e=d7e0d8e0d9ee= f
d:d;Z>e&		 dKd<ed=ee d>e0fd?d@Z?dAdB Z@e' 	CdLdDee: dEeeeef  dFedGe1fdHdIZAdS )M    N)defaultdict)datetime)Number)Thread)AnyCallableDictListOptionalSequenceTypeUnion)deep_updateflatten_dictmerge_dictsunflatten_dictunflatten_list_dictunflattened_lookup)SafeFallbackEncoder)is_nanis_nan_or_inf)DeveloperAPI	PublicAPIc                  C   s(   zdd l } W | S  ty   d } Y | S w Nr   )GPUtilImportError)r    r   G/home/ubuntu/.local/lib/python3.10/site-packages/ray/tune/utils/util.py_import_gputil"   s   
r   c                       sB   e Zd ZdZd fdd	Zdd Zdd	 Zd
d Zdd Z  Z	S )UtilMonitora4  Class for system usage utilization monitoring.

    It keeps track of CPU, RAM, GPU, VRAM usage (each gpu separately) by
    pinging for information every x seconds in a separate thread.

    Requires psutil and GPUtil to be installed. Can be enabled with
    Tuner(param_space={"log_sys_usage": True}).
    Tffffff?c                    s   d| _ t }|| _|d u r|rtd td u r|rtd |d u r)td u r)d S tt|   || _	t
t| _t | _d| _|rH|   d S d S )NTz)Install gputil for GPU system monitoring.z-Install psutil to monitor system performance.)stoppedr   r   loggerwarningpsutilsuperr   __init__delayr   listvalues	threadingLocklockdaemonstart)selfr.   r'   r   	__class__r   r   r&   8   s"   



zUtilMonitor.__init__c              	   C   s   | j r td ur#| jd ttjd d | jd tt j | jd ureg }z| j	 }W n t
y?   td Y nw |D ]*}| jdt|j  t|j | jdt|j  t|j qBW d    d S W d    d S 1 sxw   Y  d S )Ncpu_util_percent)intervalram_util_percentzGPUtil failed to retrieve GPUs.gpu_util_percentvram_util_percent)r,   r$   r)   appendfloatcpu_percentvirtual_memorypercentr   getGPUs	Exceptionr"   debugstridload
memoryUtil)r/   gpu_listgpur   r   r   _read_utilizationM   s4   


"zUtilMonitor._read_utilizationc                 C   sp   | j ri S | j t| j}| j D ]	\}}|d d = qW d    n1 s(w   Y  ddd | D iS )Nperfc                 S   s(   i | ]\}}t |d kr|t|qS )r   )lennpmean).0kvr   r   r   
<dictcomp>l   s   ( z(UtilMonitor.get_data.<locals>.<dictcomp>)r!   r,   copydeepcopyr)   items)r/   
ret_valueskeyvalr   r   r   get_datad   s   zUtilMonitor.get_datac                 C   s.   d| _ | j s|   t| j | j rd S d S NF)r!   rE   timesleepr'   r/   r   r   r   runn   s
   zUtilMonitor.runc                 C   s
   d| _ d S )NT)r!   rX   r   r   r   stopt   s   
zUtilMonitor.stop)Tr    )
__name__
__module____qualname____doc__r&   rE   rT   rY   rZ   __classcell__r   r   r0   r   r   -   s    	
r         fnexception_typenum_retries
sleep_timetimeoutreturnc                    s   t    fdd}t|D ]>}   t j|d}d|_|  |j|d | rBt	
d|d  d| d	td
d   n  sI dS t| qdS )Nc               
      sF   z  W d S  y" }  zt |     W Y d } ~ d S d } ~ ww N)r"   r#   set)eerroredrc   rb   r   r   _try_fn   s   
zretry_fn.<locals>._try_fn)targetT)rf   zProcess timed out (try ra   /z): r[   F)r*   Eventrangeclearr   r-   r.   joinis_aliver"   r>   getattris_setrV   rW   )rb   rc   rd   re   rf   rm   iprocr   rk   r   retry_fnx   s$   
ry   c                	   @   s`   e Zd ZdZeejddZdZ				dde
dee d	ee
 d
efddZdd Zdd ZdS )warn_if_slowa)  Prints a warning if a given operation is slower than 500ms.

    Example:
        >>> from ray.tune.utils.util import warn_if_slow
        >>> something = ... # doctest: +SKIP
        >>> with warn_if_slow("some_operation"): # doctest: +SKIP
        ...    ray.get(something) # doctest: +SKIP
    TUNE_WARN_THRESHOLD_Sg      ?zTThe `{name}` operation took {duration:.3f} s, which may be a performance bottleneck.NFname	thresholdmessagedisablec                 C   s.   || _ |p| j| _|p| j| _d| _|| _d S rU   )r|   DEFAULT_THRESHOLDr}   DEFAULT_MESSAGEr~   too_slowr   )r/   r|   r}   r~   r   r   r   r   r&      s
   
zwarn_if_slow.__init__c                 C   s   t   | _| S rh   )rV   r.   rX   r   r   r   	__enter__   s   
zwarn_if_slow.__enter__c                 C   sb   t   }| jr	d S || j | jkr-|t dkr/d| _|| j }t| jj	| j
|d d S d S d S )Ng      N@T)r|   duration)rV   r   r.   r}   START_OF_TIMEr   r"   r#   r~   formatr|   )r/   typevalue	tracebacknowr   r   r   r   __exit__   s   
zwarn_if_slow.__exit__)NNF)r[   r\   r]   r^   r8   osenvirongetr   r   r?   r
   boolr&   r   r   r   r   r   r   rz      s&    	
rz   c                   @   sx   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zedd Z	edd Z
edd Zdd Zdd Zdd Zdd ZdS )Teec                 C   s   || _ || _d| _d S rU   )stream1stream2_handling_warning)r/   r   r   r   r   r   r&      s   
zTee.__init__c                 C   sJ   | j rd S d| d| d}|d| d| 7 }d| _ t| d| _ d S )NzValueError when calling 'z' on stream (z). zargs: z	 kwargs: TF)r   r"   r#   )r/   opsargskwargsmsgr   r   r   _warn   s   

z	Tee._warnc              
   O   L   | j | jfD ]}z
|j|i | W q ty#   | d||| Y qw d S )Nseek)r   r   r   
ValueErrorr   r/   r   r   r   r   r   r   r         zTee.seekc              
   O   r   )Nwrite)r   r   r   r   r   r   r   r   r   r      r   z	Tee.writec              
   O   r   )Nflush)r   r   r   r   r   r   r   r   r   r      r   z	Tee.flushc                 C      t | jdr
| jjS | jjS )Nencoding)hasattrr   r   r   rX   r   r   r   r         zTee.encodingc                 C   r   )Nerror)r   r   r   r   rX   r   r   r   r      r   z	Tee.errorc                 C   r   )Nnewlines)r   r   r   r   rX   r   r   r   r     r   zTee.newlinesc                 C      t rh   NotImplementedErrorrX   r   r   r   detach     z
Tee.detachc                 O   r   rh   r   r/   r   r   r   r   r   read  r   zTee.readc                 O   r   rh   r   r   r   r   r   readline  r   zTee.readlinec                 O   r   rh   r   r   r   r   r   tell  r   zTee.tellN)r[   r\   r]   r&   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r      s     


r   c                   C   s   t  dS )Nz%Y-%m-%d_%H-%M-%S)r   todaystrftimer   r   r   r   date_str  s   r   c                 C   s   | t dfS )zConverts obj to a form that can be pinned in object store memory.

    Currently only numpy arrays are pinned in memory, if you have a strong
    reference to the array value.
    ra   )rH   zerosobjr   r   r   _to_pinnable  s   r   c                 C   s   | d S )z"Retrieve from _to_pinnable format.r   r   r   r   r   r   _from_pinnable(  s   r   	trainablec              
      s(  ddl m m}  fdd}td|  d z|dt|  | dd	 td
 W dS  tyA } ztd|  W Y d}~nd}~ww tdt|  d t| }t }|j	rktdt
|j	 d ||j	|dd  |jrtdt
|j d ||j|dd  |std |S td| d |S )a  Utility for detecting why your trainable function isn't serializing.

    Args:
        trainable: The trainable object passed to
            tune.Tuner(trainable). Currently only supports
            Function API.

    Returns:
        bool | set of unserializable objects.

    Example:

    .. code-block:: python

        import threading
        # this is not serializable
        e = threading.Event()

        def test():
            print(e)

        diagnose_serialization(test)
        # should help identify that 'e' should be moved into
        # the `test` scope.

        # correct implementation
        def test():
            e = threading.Event()
            print(e)

        assert diagnose_serialization(test) is True

    r   )_check_serializabilityregister_trainablec                    s   |   D ]H\}}d }z	 || d}W n$ ty7 } zd}|jj dt| }|| W Y d }~nd }~ww |t| d| d|  |rL|| qd S )NPASSEDFAILEDz: z[name='z'']... )rP   r=   r1   r[   r?   add)objectsfailure_setprintervar_namevariabler   statusrj   r   r   r   check_variablesS  s    
z/diagnose_serialization.<locals>.check_variableszTrying to serialize z...z__test:F)warnzSerialization succeeded!TzSerialization failed: NzIInspecting the scope of the trainable by running `inspect.getclosurevars(z)`...z	Detected z. global variables. Checking serializability...c                 S      t d|  S Nz   printr   r   r   r   <lambda>t      z(diagnose_serialization.<locals>.<lambda>z0 nonlocal variables. Checking serializability...c                 S   r   r   r   r   r   r   r   r   {  r   zNothing was found to have failed the diagnostic test, though serialization did not succeed. Feel free to raise an issue on github.zVariable(s) z was found to be non-serializable. Consider either removing the instantiation/imports of these objects or moving them into the scope of the trainable. )ray.tune.registryr   r   r   r?   r=   inspectgetclosurevarsri   globalsrG   	nonlocals)r   r   r   rj   closurer   r   r   r   diagnose_serialization.  sJ   #

r   statecheckpoint_dir	file_nametmp_file_namec                 C   sj   ddl m} tj||}t|d}|| | W d   n1 s#w   Y  t|tj|| dS )aS  Atomically saves the state object to the checkpoint directory.

    This is automatically used by Tuner().fit during a Tune job.

    Args:
        state: Object state to be serialized.
        checkpoint_dir: Directory location for the checkpoint.
        file_name: Final name of file.
        tmp_file_name: Temporary name of file.
    r   Nwb)ray.cloudpicklecloudpickler   pathrs   opendumpreplace)r   r   r   r   r   tmp_search_ckpt_pathfr   r   r   _atomic_save  s   r   dirpathckpt_patternc                 C   sj   ddl m} ttj| |}|sdS t|}t|d}||}W d   |S 1 s.w   Y  |S )ah  Returns the most recently modified checkpoint.

    Assumes files are saved with an ordered name, most likely by
    :obj:atomic_save.

    Args:
        dirpath: Directory in which to look for the checkpoint file.
        ckpt_pattern: File name pattern to match to find checkpoint
            files.

    Returns:
        (dict) Deserialized state dict.
    r   Nrb)	r   r   globr   r   rs   maxr   rA   )r   r   r   
full_pathsmost_recent_checkpointr   checkpoint_stater   r   r   _load_newest_checkpoint  s   
r   beta)	stability{Gz?      gpu_idtarget_utilretrydelay_sgpu_memory_limitc           
         s&  t  }|du rtddu rt }|std|d d ttr0 r-tnd ntts?tdt	 d fd	d
fdd|
 D }|vr^t d| dtt|D ]*}tfdd|
 D }	|	j|krtd| d|	jd t| qd dS td)aO  Checks if a given GPU has freed memory.

    Requires ``gputil`` to be installed: ``pip install gputil``.

    Args:
        gpu_id: GPU id or uuid to check.
            Must be found within GPUtil.getGPUs(). If none, resorts to
            the first item returned from `ray.get_gpu_ids()`.
        target_util: The utilization threshold to reach to unblock.
            Set this to 0 to block until the GPU is completely free.
        retry: Number of times to check GPU limit. Sleeps `delay_s`
            seconds between checks.
        delay_s: Seconds to wait before check.

    Returns:
        bool: True if free.

    Raises:
        RuntimeError: If GPUtil is not found, if no GPUs are detected
            or if the check fails.

    Example:

    .. code-block:: python

        def tune_func(config):
            tune.utils.wait_for_gpu()
            train()

        tuner = tune.Tuner(
            tune.with_resources(
                tune_func,
                resources={"gpu": 1}
            ),
            tune_config=tune.TuneConfig(num_samples=10)
        )
        tuner.fit()

    Nz3GPUtil must be installed if calling `wait_for_gpu`.zPNo GPU ids found from `ray.get_gpu_ids()`. Did you set Tune resources correctly?r   r@   uuidzgpu_id (z) must be type str/int.c                    s
   t |  S rh   )ru   )g)gpu_attrr   r   	gpu_id_fn  s   
zwait_for_gpu.<locals>.gpu_id_fnc                    s   h | ]} |qS r   r   rJ   r   )r   r   r   	<setcomp>  s    zwait_for_gpu.<locals>.<setcomp>z% not found in set of available GPUs: zt. `wait_for_gpu` takes either GPU ordinal ID (e.g., '0') or UUID (e.g., 'GPU-04546190-b68d-65ac-101b-035f8faed77d').c                 3   s     | ]}| kr|V  qd S rh   r   r   )r   r   r   r   	<genexpr>  s    zwait_for_gpu.<locals>.<genexpr>zWaiting for GPU util to reach z. Util: z0.3fTzGPU memory was not freed.)r   RuntimeErrorrayget_gpu_ids
isinstancer?   isdigitintr   r   r<   rq   nextrB   r"   inforV   rW   )
r   r   r   r   r   r   gpu_id_listgpu_idsrw   
gpu_objectr   )r   r   r   r   wait_for_gpu  sD   /



r	  trainable_clsconfignum_gpusc           	      C   s   t  sJ dt j|d| }|j|d}|j|d}ddlm} tdD ]
}t |j }q&||s:J dt |j|j	  t |j }|| dksVJ t |j }|| d	ksfJ d
S )a  Helper method to check if your Trainable class will resume correctly.

    Args:
        trainable_cls: Trainable class for evaluation.
        config: Config to pass to Trainable when testing.
        num_gpus: GPU resources to allocate when testing.
        use_object_store: Whether to save and restore to Ray's object
            store. Recommended to set this to True if planning to use
            algorithms that pause training (i.e., PBT, HyperBand).
    zNeed Ray to be initialized.)r  )r  r   )TRAINING_ITERATIONr`   zQValidation will not pass because it requires `training_iteration` to be returned.   r   T)
r   is_initializedremoteray.air.constantsr  rq   r   trainrestoresave)	r
  r  r  
remote_clstrainable_1trainable_2r  _resr   r   r   validate_save_restore   s    r  c              
   C   sX   t | }d}z|i  W |S  ty+ } ztt| d}W Y d}~|S d}~ww )zCheck if func({}) works.TFN)r   	signaturebindr=   r"   r>   r?   )funcfunc_siguse_config_singlerj   r   r   r   _detect_config_singleJ  s   
r   Tparameter_namespoints_to_evaluateevaluated_rewardsvalidate_point_name_lengthsc                 C   s   |r<t |tstdt||D ])}t |ttfs#td| d|r;t|t| ks;td|d|  d q|rd|rft |tsNtdt|t|t|kshtd|d	| d d
S d
S d
S )a5  Generic validation of a Searcher's warm start functionality.
    Raises exceptions in case of type and length mismatches between
    parameters.

    If ``validate_point_name_lengths`` is False, the equality of lengths
    between ``points_to_evaluate`` and ``parameter_names`` will not be
    validated.
    z1points_to_evaluate expected to be a list, got {}.z9points_to_evaluate expected to include list or dict, got .zDim of point {}z and parameter_names {}z do not match.z0evaluated_rewards expected to be a list, got {}.zDim of evaluated_rewards {}z and points_to_evaluate {}N)r  r(   	TypeErrorr   r   dictrG   r   )r!  r"  r#  r$  pointr   r   r   validate_warmstartV  sP   

r)  )Nr   r   r   Nr   )T)BrN   r   r   loggingr   r*   rV   collectionsr   r   numbersr   r   typingr   r   r   r	   r
   r   r   r   numpyrH   r   ray._private.dictr   r   r   r   r   r   ray.air._internal.jsonr   ray.air._internal.utilr   r   ray.util.annotationsr   r   r$   	getLoggerr[   r"   r   r   r   r=   r  r   ry   rz   objectr   r   r   r   r   r?   r   r   r8   r	  r  r   r)  r   r   r   r   <module>   s    ( 
J
'+L

_d)