o
    2wi                     @   s   d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlmZ dddd	d
e	dededededefddZG dd dejZG dd dZdS )    N)ProcessPoolExecutorThreadPoolExecutor)partial)CallableDict	GeneratorIterableOptionalType)tqdm   i  Fnum_jobs
queue_sizethreadsfn	iterablesr   r   r   returnc                g   s    t | g|R |||d}|  |j}| s| s<z|jddd V  W n
 tjy3   Y qw | s| r|  dS )a  
    Works like Python's ``map``, but parallelizes the execution of ``fn`` over ``num_jobs``
    subprocesses or threads.

    Under the hood, it spawns ``num_jobs`` producer jobs that put their results on a queue.
    The current thread becomes a consumer thread and this generator yields items from the queue
    to the caller, as they become available.

    Example::

        >>> for root in parallel_map(math.sqrt, range(1000), num_jobs=4):
        ...     print(root)

    :param fn: function/callable to execute on each element.
    :param iterables: one of more iterables (one for each parameter of ``fn``).
    :param num_jobs: the number of parallel jobs.
    :param queue_size: max number of result items stored in memory.
        Decreasing this number might save more memory when the downstream processing is slower than
        the producer jobs.
    :param threads: whether to use threads instead of processes for producers (false by default).
    :return: a generator over results from ``fn`` applied to each item of ``iterables``.
    r   Tg?)blocktimeoutN)	SubmitterThreadstartqueueis_aliveemptygetresultEmptyjoin)r   r   r   r   r   threadq r!   L/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/lhotse/parallel.pyparallel_map   s$   	r#   c                       sF   e Zd Zdddddedededed	d
f
 fddZdddZ  ZS )r   r   i'  Fr   r   r   r   r   r   Nc                   s4   t    || _|| _|| _tj|d| _|| _d S )N)maxsize)super__init__r   r   r   r   Queueuse_threads)selfr   r   r   r   r   	__class__r!   r"   r&   ;   s   

zSubmitterThread.__init__c                 C   sr   | j rtnt}|| j#}t| j D ]}|j| jg|R  }| jj	|dd qW d    d S 1 s2w   Y  d S )NT)r   )
r(   r   r   r   zipr   submitr   r   put)r)   executorexargsfuturer!   r!   r"   runJ   s   "zSubmitterThread.run)r   N)	__name__
__module____qualname__r   intboolr&   r3   __classcell__r!   r!   r*   r"   r   :   s     r   c                	   @   sv   e Zd ZU dZi Zeee ef e	d< 			ddeg ef dede
d	efd
dZdd Zdd ZdedefddZdS )ParallelExecutora  
    A class which uses ProcessPoolExecutor to parallelize the execution of a callable class.
    The instances of the runner class are instantiated separately in each worker process.

    Example::

        >>> class MyRunner:
        ...     def __init__(self):
        ...         pass
        ...     def __call__(self, x):
        ...         return f'processed: {x}'
        ...
        >>> runner = ParallelExecutor(MyRunner, num_jobs=4)
        >>> for item in runner(range(10)):
        ...     print(item)

    If the __init__ method of the callable class accepts parameters except for `self`,
    use `functools.partial` or similar method to obtain a proper initialization function:

        >>> class MyRunner:
        ...     def __init__(self, name):
        ...         self.name = name
        ...     def __call__(self, x):
        ...         return f'{self.name}: {x}'
        ...
        >>> runner = ParallelExecutor(partial(MyRunner, name='my_name'), num_jobs=4)
        >>> for item in runner(range(10)):
        ...     print(item)


    The initialization function will be called separately for each worker process. Steps like loading a
    PyTorch model instance to the selected device should be done inside the initialization function.
    _runnersr   F
Processinginit_fnr   verbosedescriptionc                 C   s   || _ || _|| _|| _dS )a  
        Instantiate a parallel executor.

        :param init_fn: A function which returns a runner object (e.g. a class) that will be instantiated
            in each worker process.
        :param num_jobs: The number of parallel jobs to run. Defaults to 1 (no parallelism).
        :param verbose: Whether to show a progress bar.
        :param description: The description to show in the progress bar.
        N)_make_runnerr   r>   r?   )r)   r=   r   r>   r?   r!   r!   r"   r&   w   s   
zParallelExecutor.__init__c                 C   s   t  j}|  | j|< d S N)multiprocessingcurrent_processpidr@   r;   )r)   rD   r!   r!   r"   _init_runner   s   
zParallelExecutor._init_runnerc                 O   s"   t  j}| j| }||i |S rA   )rB   rC   rD   r;   )r)   r1   kwargsrD   runnerr!   r!   r"   _process   s   

zParallelExecutor._processitemsr   c           	      k   sJ   | j dkr"|  }t|| j| j dD ]}||fi |V  qd S t| j | jtdd}|k}z[z"|	t
| jfi ||}t|| jt|| j dD ]}|V  qMW n1 tyo } z|jdd | jritd |d }~w ty } z|jdd td	|d }~ww W | j  n| j  w W d    d S 1 sw   Y  d S )
Nr   )descdisablespawn)max_workersinitializer
mp_context)rJ   totalrK   F)waitzInterrupted by the user.z5Parallel processing failed. Please report this issue.)r   r@   r   r?   r>   r   rE   rB   get_contextmapr   rH   lenKeyboardInterruptshutdownprint	ExceptionRuntimeErrorr;   clear)	r)   rI   rF   rG   itempoolr/   resexcr!   r!   r"   __call__   sP   

"zParallelExecutor.__call__N)r   Fr<   )r4   r5   r6   __doc__r;   r   r	   r7   r   __annotations__r8   strr&   rE   rH   r   r   r_   r!   r!   r!   r"   r:   R   s$   
 "

r:   )rB   r   	threadingconcurrent.futuresr   r   	functoolsr   typingr   r   r   r   r	   r
   	tqdm.autor   r7   r8   r#   Threadr   r:   r!   r!   r!   r"   <module>   s2     
/