o
    $iT-                     @   s@  d Z ddlZddlZddlmZmZ ddlZddlmZ z
ddlm	Z	m
Z
 W n ey9   edej d Y nw ddlmZmZ dd	lmZmZmZ dd
lmZ ejdkr\dd Zndd Zd(ddZdd Zd)ddZdZdefddZdd Zdd Zdd Z d*dd Z!d!d" Z"dede e!de"e"fd#d$Z#d+d&d'Z$dS ),zu
The following is adapted from Dask release 2021.03.1:
    https://github.com/dask/dask/blob/2021.03.1/dask/local.py
    N)EmptyQueue)config)DataNodeDependenciesMappingzEDask on Ray is available only on dask>=2024.11.0, you are on version .)local_callbacksunpack_callbacks)flattenget_dependenciesreverse_dict)orderntc                 C   s(   	 z| j dddW S  ty   Y nw q)NTg?)blocktimeout)getr   q r   Z/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/util/dask/scheduler_utils.py	queue_get   s   r   c                 C   s   |   S N)r   r   r   r   r   r   &   s   c              
      s2  |du r	t | j}|du rtdd}|du rt }t  |  D ]\}}t|tr4| ||<  | q!| 	 }|
| t| } fdd| D }t|}|D ]}	||	dD ]	}
||
 |	 q[qSdd | D }dd | D }t||d	d
}dd | D }||||||t t t d	}|S )a  Start state from a dask
    Examples
    --------
    >>> dsk = {
        'x': 1,
        'y': 2,
        'z': (inc, 'x'),
        'w': (add, 'z', 'y')}  # doctest: +SKIP
    >>> from pprint import pprint  # doctest: +SKIP
    >>> pprint(start_state_from_dask(dsk))  # doctest: +SKIP
    {'cache': {'x': 1, 'y': 2},
     'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
     'dependents': {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}},
     'finished': set(),
     'ready': ['z'],
     'released': set(),
     'running': set(),
     'waiting': {'w': {'z'}},
     'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
    Ncachec                    s"   i | ]\}}| vr|t |qS r   )set.0kv	data_keysr   r   
<dictcomp>P   s   " z)start_state_from_dask.<locals>.<dictcomp>r   c                 S   s   i | ]\}}|r||  qS r   )copyr   r   r   r   r    V   s    c                 S   s   h | ]\}}|s|qS r   r   r   r   r   r   	<setcomp>X   s    z(start_state_from_dask.<locals>.<setcomp>Tkeyreversec                 S   s   i | ]	\}}|r||qS r   r   r   r   r   r   r    Z       )	dependencies
dependentswaitingwaiting_datar   readyrunningfinishedreleased)r   r   r   dictr   items
isinstancer   addr!   updater   r   removesorted)dskr   sortkeyr   r   dsk2r'   r)   r(   abr*   	ready_setr+   stater   r   r   start_state_from_dask*   sH   




r=   c              
   C   sl   z||\}}||}| }	|||	f}d}
W n t y0 } z|||}d}
W Y d}~nd}~ww | ||
fS )zx
    Compute task and handle all administration
    See Also
    --------
    _execute_task : actually execute task
    FTN)BaseException)r$   	task_infodumpsloadsget_idpack_exceptiontaskdataresultidfaileder   r   r   execute_taskk   s   

rJ   Tc                 C   sJ   | |d v r|d |  rJ |d | = |d  |  |r#|d | = dS dS )zQRemove data from temporary storage
    See Also
    --------
    finish_task
    r*   r.   r   N)r2   )r$   r<   deleter   r   r   release_data~   s   
rL   Fc           
   
   C   s  t |d | |ddD ]}|d | }|| |s&|d |= |d | q|d | D ]J}||d v rj|d | }|| |si||vritrbdd	lm}	 td
||tt|	|d 	 d f  ||||d q-|rw||vrw||||d q-|d 
| |d | |S )zn
    Update execution state after a task finishes
    Mutates.  This should run atomically (with a lock).
    r(   Tr#   r)   r+   r'   r*   r   )nbytesz&Key: %s	Dep: %s	 NBytes: %.2f	 Releaser   g    .A)rK   r-   r,   )r5   r4   appendDEBUG
chest.corerM   printsummapvaluesr2   )
r6   r$   r<   resultsr7   rK   rL   depsrM   r   r   r   finish_task   s6   


rX   c                    s(   t | trt fdd| D S  |  S )zGet nested index from collection
    Examples
    --------
    >>> nested_get(1, 'abc')
    'b'
    >>> nested_get([1, 0], 'abc')
    ('b', 'a')
    >>> nested_get([[1, 0], [0, 1]], 'abc')
    (('b', 'a'), ('a', 'b'))
    c                 3   s    | ]}t | V  qd S r   )
nested_get)r   icollr   r   	<genexpr>   s    znested_get.<locals>.<genexpr>)r1   listtuple)indr\   r   r[   r   rY      s   
rY   c                   C   s   dS )zDefault get_idNr   r   r   r   r   default_get_id   s   ra   c                  C   s    r   r   )rI   r@   r   r   r   default_pack_exception   s   rb   c                 C   s   | j |ur
| || r   )__traceback__with_traceback)exctbr   r   r   reraise   s   

rg   c                 C   s   | S )z<Identity function. Returns x.
    >>> identity(3)
    3
    r   )xr   r   r   identity   s   ri   c           "         s  t  t|trtt|}n|h}t|}tt|	=}	t|	\}}}}g }d}i z|	D ]}|d r?|d  || q3t	}t
||jd|	D ]\}}}}}|ra| qS|du rltdd}d rxd sxtd f	d	d
}d rtd |k r|  d rtd |k sd sd sd r&t\}}}|rو|\}}|rԇfddt|D }| }|| n||| |\}}|d |< t|||j |D ]
} | ||| qd rtd |k r|  d rtd |k s	d sd sd sd}W |D ]\}}}}}!|!r<|!|  q+n|D ]\}}}}}!|!rR|!|  qAw W d   n	1 s`w   Y  t|d S )a  Asynchronous get function
    This is a general version of various asynchronous schedulers for dask.  It
    takes a an apply_async function as found on Pool objects to form a more
    specific ``get`` method that walks through the dask array with parallel
    workers, avoiding repeat computation and minimizing memory use.
    Parameters
    ----------
    apply_async : function
        Asynchronous apply function as found on Pool or ThreadPool
    num_workers : int
        The number of active tasks we should have at any one time
    dsk : dict
        A dask dictionary specifying a workflow
    result : key or list of keys
        Keys corresponding to desired data
    cache : dict-like, optional
        Temporary storage of results
    get_id : callable, optional
        Function to return the worker id, takes no arguments. Examples are
        `threading.current_thread` and `multiprocessing.current_process`.
    rerun_exceptions_locally : bool, optional
        Whether to rerun failing tasks in local process to enable debugging
        (False by default)
    pack_exception : callable, optional
        Function to take an exception and ``dumps`` method, and return a
        serialized tuple of ``(exception, traceback)`` to send back to the
        scheduler. Default is to just raise the exception.
    raise_exception : callable, optional
        Function that takes an exception and a traceback, and raises an error.
    dumps: callable, optional
        Function to serialize task data and results to communicate between
        worker and parent.  Defaults to identity.
    loads: callable, optional
        Inverse function of `dumps`.  Defaults to identity.
    callbacks : tuple or list of tuples, optional
        Callbacks are passed in as tuples of length 5. Multiple sets of
        callbacks may be passed in as a list of tuples. For more information,
        see the dask.diagnostics documentation.
    See Also
    --------
    threaded.get
    Fr   )r   r7   Nrerun_exceptions_locallyr)   r+   z Found no accessible jobs in daskc                     st   d   } d |  D ]}||  qfddt| D } t| |  |ffjd dS )z"Fire off a task to the thread poolr+   r,   c                       i | ]	}| d  | qS r   r   r   rV   r<   r   r   r    C  r&   z0get_async.<locals>.fire_task.<locals>.<dictcomp>)argscallbackN)popr2   r   rJ   put)r$   frE   	apply_asyncr6   r@   rB   rA   rC   pretask_cbsqueuer<   r   r   	fire_task:  s    
zget_async.<locals>.fire_taskr,   c                    rk   rl   r   rm   rn   r   r   r    \  s    zget_async.<locals>.<dictcomp>r   T)r   r1   r^   r   r
   r/   r   r	   rN   r   r=   r   r   
ValueErrorlenr   r   rX   rY   )"ru   num_workersr6   rF   r   rB   rj   rC   raise_exception	callbacksr@   rA   kwargsresult_flatrU   _posttask_cbsstarted_cbs	succeededcbkeyorderstart_staterx   r$   res_inforH   re   rf   rE   rD   res	worker_idrs   finishr   rt   r   	get_async   s|   9




Ur   r   c                 C   s2   |du ri }| |i |}|dur|| dS dS )z*A naive synchronous version of apply_asyncNr   )funcro   kwdsrp   r   r   r   r   
apply_syncw  s   r   )NN)Tr   )r   NN)%__doc__oswarningsrw   r   r   daskr   dask._task_specr   r   ImportErrorwarn__version__dask.callbacksr   r	   	dask.corer
   r   r   
dask.orderr   namer   r=   rJ   rL   rO   rX   rY   ra   rb   rg   ri   r   r   r   r   r   r   <module>   sV    

	
A

$

 