o
    
iD                     @   sr   d Z ddlZddlZddlZddlmZ ddlmZ e	e
ZG dd deZG dd deZejdd
dZdS )z\Common functionality for concurrent processing. The main entry point is :func:`create_pool`.    N)deque)ThreadPoolExecutorc                   @   s    e Zd ZdZdddddZdS )r   z+Subclass with a lazy consuming imap method.N   )timeoutqueued_tasks_per_workerc                '   s    t  | j|d  }}|j|j| j }} fdd}	t| D ]}
|||g|
R   t||kr8|	 V  q"|rC|	 V  |s;dS dS )a  Ordered imap that consumes iterables just-in-time.

        References:
            https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999

        Args:
            fn: Function to apply.
            iterables: One (or more) iterable(s) to pass to fn (using zip) as positional argument(s).
            timeout: Per-future result retrieval timeout in seconds.
            queued_tasks_per_worker: Amount of additional items per worker to fetch from iterables to
                    fill the queue: this determines the total queue size.
                Setting 0 will result in a true just-in-time behaviour: when a worker finishes a task,
                    it waits until a result is consumed from the imap generator, at which point next()
                    is called on the input iterable(s) and a new task is submitted.
                Default 2 ensures there is always some work to pick up. Note that at imap startup,
                    the queue will fill up before the first yield occurs.

        Example:
            long_generator = itertools.count()
            with ThreadPoolExecutor(42) as pool:
                result_generator = pool.imap(fn, long_generator)
                for result in result_generator:
                    print(result)
           c                      s      S )z8Block until the next task is done and return the result.)result popleftr   r	   J/home/ubuntu/.local/lib/python3.10/site-packages/smart_open/concurrency.pyget3   s   z$ThreadPoolExecutor.imap.<locals>.getN)r   _max_workersr   appendsubmitziplen)selffnr   r   	iterablesfuturesmaxlenr   r   r   argsr	   r
   r   imap   s   zThreadPoolExecutor.imap)__name__
__module____qualname____doc__r   r	   r	   r	   r   r      s    r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	ConcurrentFuturesPoolz_A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes.c                 C   s   t |d| _d S )Nmax_workers)r   executor)r   r    r	   r	   r   __init__F   s   zConcurrentFuturesPool.__init__c                 #   s6     fdd|D }t j|D ]}| V  qd S )Nc                    s   g | ]	}j  |qS r	   )r!   r   ).0itemfunctionr   r	   r   
<listcomp>J   s    z8ConcurrentFuturesPool.imap_unordered.<locals>.<listcomp>)
concurrentr   as_completedr   )r   r&   itemsr   futurer	   r%   r   imap_unorderedI   s
   z$ConcurrentFuturesPool.imap_unorderedc                 C   s   | j jdd d S )NT)wait)r!   shutdown)r   r	   r	   r   	terminateN   s   zConcurrentFuturesPool.terminateN)r   r   r   r   r"   r,   r/   r	   r	   r	   r   r   D   s
    r   r   c                 c   s*    t d|  t| d}|V  |  d S )Nz0creating concurrent futures pool with %i workersr   )loggerinfor   r/   )	processespoolr	   r	   r   create_poolR   s
   
r4   )r   )r   concurrent.futuresr(   
contextlibloggingcollectionsr   r   _ThreadPoolExecutor	getLoggerr   r0   objectr   contextmanagerr4   r	   r	   r	   r   <module>   s   
0