o
    %ݫi8%                     @   s   d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZmZmZmZ ddlmZ dd	 ZG d
d dZG dd dZe ddddddifdeegef dee dedededee dedefddZdS )zuParallel processing tools to help speed up certain tasks like data
preprocessing.

Authors
 * Sylvain de Langen 2023
    N)deque)ExecutorProcessPoolExecutor)	Condition)AnyCallableIterableOptional)tqdmc                 C   s   t t| |S N)listmap)fnchunk r   N/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/utils/parallel.py_chunk_process_wrapper   s   r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	CancelFuturesOnExitzContext manager that .cancel()s all elements of a list upon exit.
    This is used to abort futures faster when raising an exception.c                 C   s
   || _ d S r   )future_list)selfr   r   r   r   __init__   s   
zCancelFuturesOnExit.__init__c                 C   s   d S r   r   )r   r   r   r   	__enter__   s   zCancelFuturesOnExit.__enter__c                 C   s   | j D ]}|  qd S r   )r   cancel)r   _type_value
_tracebackfuturer   r   r   __exit__    s   

zCancelFuturesOnExit.__exit__N)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r      s
    r   c                   @   sl   e Zd ZdZdeegef dee dedededee	 de
d	efd
dZdd Zdd Zdd Zdd ZdS )_ParallelMapperzEInternal class for `parallel_map`, arguments match the constructor's.r   sourceprocess_count
chunk_size
queue_sizeexecutorprogress_barprogress_bar_kwargsc	           
      C   s   t  | _t | _d| _	 d | _	 || _|| _|| _|| _	|| _
|| _t|dr+t|nd | _t|| _d| _|rLd| ji}	|	| tdi |	| _d S d | _d S )Nr   __len__Ftotalr   )r   future_chunksr   cvjust_finished_countremote_exceptionr   r#   r$   r%   r&   r'   hasattrlen	known_leniter	source_itdepleted_sourceupdater
   pbar)
r   r   r#   r$   r%   r&   r'   r(   r)   tqdm_final_kwargsr   r   r   r   (   s(   



z_ParallelMapper.__init__c              	   c   s    t | jF | jdur|  E dH  n(t| jd}|| _|  E dH  W d   n1 s/w   Y  W d   dS W d   dS W d   dS 1 sOw   Y  dS )z`Spins up an executor (if none were provided), then yields all
        processed chunks in order.N)max_workers)r   r,   r'   _map_allr   r$   )r   poolr   r   r   runN   s    
"z_ParallelMapper.runc                 C   s   |  rdS | }| j |dur|| _|  jd7  _| j  W d   n1 s+w   Y  |du rE| jdurG| jt|	  dS dS dS )ao  Notifies the main thread of the finished job, bumping the number of
        jobs it should requeue. Updates the progress bar based on the returned
        chunk length.

        Arguments
        ---------
        future: concurrent.futures.Future
            A future holding a processed chunk (of type `list`).

        Returns
        -------
        None
        N   )
	cancelled	exceptionr-   r/   r.   notifyr7   r6   r1   result)r   r   future_exceptionr   r   r   _bump_processed_count^   s   
z%_ParallelMapper._bump_processed_countc                 C   sX   t t| j| j}t|dkrd| _dS | jt	| j
|}|| j | j| dS )a  Pulls a chunk from the source iterable and submits it to the
        pool; must be run from the main thread.

        Returns
        -------
        `True` if any job was submitted (that is, if there was any chunk
        left to process), `False` otherwise.
        r   TF)r   	itertoolsislicer4   r%   r1   r5   r'   submitr   r   add_done_callbackrC   r,   append)r   r   r   r   r   r   _enqueue_job   s   
z_ParallelMapper._enqueue_jobc                 c   s(   t | jD ]}|  s nq| jrt| jdkr| j% | jdkr,| j  | jdks"| j	dur4| j	| j}d| _W d   n1 sDw   Y  t |D ]}|  sU nqMt| jdkr|| jd 
 r|| j  E dH  t| jdkr|| jd 
 sd| jrt| jdks| jdur| j  dS dS )z~Performs all the parallel mapping logic.

        Yields
        ------
        The items from source processed by fn
        r   N)ranger&   rI   r5   r1   r,   r-   r.   waitr/   donepopleftrA   r7   close)r   _to_queue_countr   r   r   r:      s4   	




 z_ParallelMapper._map_allN)r   r   r    r!   r   r   r   intr	   r   booldictr   r<   rC   rI   r:   r   r   r   r   r"   %   s.    	
&#r"         T	smoothingg{Gz?r   r#   r$   r%   r&   r'   r(   r)   c           	   	   c   s*    t | |||||||}| E dH  dS )a  Maps iterable items with a function, processing chunks of items in
    parallel with multiple processes and displaying progress with tqdm.

    Processed elements will always be returned in the original, correct order.
    Unlike `ProcessPoolExecutor.map`, elements are produced AND consumed lazily.

    Arguments
    ---------
    fn: Callable
        The function that is called for every element in the source list.
        The output is an iterator over the source list after fn(elem) is called.

    source: Iterable
        Iterator whose elements are passed through the mapping function.

    process_count: int
        The number of processes to spawn. Ignored if a custom executor is
        provided.
        For CPU-bound tasks, it is generally not useful to exceed logical core
        count.
        For IO-bound tasks, it may make sense to as to limit the amount of time
        spent in iowait.

    chunk_size: int
        How many elements are fed to the worker processes at once. A value of 8
        is generally fine. Low values may increase overhead and reduce CPU
        occupancy.

    queue_size: int
        Number of chunks to be waited for on the main process at a time.
        Low values increase the chance of the queue being starved, forcing
        workers to idle.
        Very high values may cause high memory usage, especially if the source
        iterable yields large objects.

    executor: Optional[Executor]
        Allows providing an existing executor (preferably a
        ProcessPoolExecutor). If None (the default), a process pool will be
        spawned for this mapping task and will be shut down after.

    progress_bar: bool
        Whether to show a tqdm progress bar.

    progress_bar_kwargs: dict
        A dict of keyword arguments that is forwarded to tqdm when
        `progress_bar == True`. Allows overriding the defaults or e.g.
        specifying `total` when it cannot be inferred from the source iterable.

    Yields
    ------
    The items from source processed by fn
    N)r"   r<   )	r   r#   r$   r%   r&   r'   r(   r)   mapperr   r   r   parallel_map   s   >
rX   )r!   rD   multiprocessingcollectionsr   concurrent.futuresr   r   	threadingr   typingr   r   r   r	   	tqdm.autor
   r   r   r"   	cpu_countrQ   rR   rS   rX   r   r   r   r   <module>   sF     )