o
    di                   
   @   s  d Z ddlZddlZddlmZmZmZmZ ddlm	Z	m
Z
mZ ddlmZmZ ddlmZmZmZmZ ddd	eee  d
ee dee fddZdddeee  dedee fddZddddee dededee	 fddZ		ddeee  dededee fddZdS )zt
Friendlier version of asyncio standard library.

Provisional library.  Must be imported as `aioitertools.asyncio`.
    N)AsyncGeneratorAsyncIterable	AwaitableIterable)AnycastOptional   )itermaybe_await)AnyIterableAsyncIteratorMaybeAwaitableT)timeoutawsr   returnc                C  s   t  }dd | D }d}|r|dkrt | }nd}|rk|r?|t  }|dkr?|D ]}t|tjr9|  q,	 q,t ttt t	t
  t t	t
  f tj||tjdI dH \}}|D ]}|I dH V  q^|sdS dS )a  
    Run awaitables in `aws` concurrently, and yield results as they complete.

    Unlike `asyncio.as_completed`, this yields actual results, and does not require
    awaiting each item in the iterable.

    Cancels all remaining awaitables if a timeout is given and the timeout threshold
    is reached.

    Example::

        async for value in as_completed(futures):
            ...  # use value immediately

    c                 S   s   h | ]}t |qS  asyncioensure_future).0ar   r   H/home/ubuntu/.local/lib/python3.10/site-packages/aioitertools/asyncio.py	<setcomp>(   s    zas_completed.<locals>.<setcomp>Nr   )r   return_when)settime
isinstancer   FuturecancelTimeoutErrorr   tupler   r   waitFIRST_COMPLETED)r   r   donepending	remaining	thresholdfutitemr   r   r   as_completed   s6   
	
	r+   F)return_exceptions	iterablesr,   c                  s  t   t  dtt ddf fddfdd| D }t|}z~zK|roz  }|r3|V  n|W n
 t jy@   Y nw z	 }|V  W n" t jyl   t|D ]}| ra|	| qVt 
dI dH  Y nw |s(W n t jtfy|   Y nw W |D ]
}| s|  q|D ]}z|I dH  W q t jy   Y qw dS |D ]
}| s|  q|D ]}z|I dH  W q t jy   Y qw w )	ao  
    Yield results from one or more async iterables, in the order they are produced.

    Like :func:`as_completed`, but for async iterators or generators instead of futures.
    Creates a separate task to drain each iterable, and a single queue for results.

    If ``return_exceptions`` is ``False``, then any exception will be raised, and
    pending iterables and tasks will be cancelled, and async generators will be closed.
    If ``return_exceptions`` is ``True``, any exceptions will be yielded as results,
    and execution will continue until all iterables have been fully consumed.

    Example::

        async def generator(x):
            for i in range(x):
                yield i

        gen1 = generator(10)
        gen2 = generator(12)

        async for value in as_generated([gen1, gen2]):
            ...  # intermixed values yielded from gen1 and gen2
    r
   r   Nc              
      s   z| 2 z3 d H W } |I d H  q6 W d S  tjy+   t| tr*|  I d H    tyE } z  |I d H  W Y d }~d S d }~ww N)putr   CancelledErrorr   r   aclose	Exception)r
   r*   e)	exc_queuequeuer   r   tailero   s   
zas_generated.<locals>.tailerc                    s   g | ]	}t  |qS r   r   )r   r
   )r6   r   r   
<listcomp>z   s    z as_generated.<locals>.<listcomp>gMbP?)r   Queuer   r   r   
get_nowait
QueueEmptylistr%   removesleepr0   GeneratorExitr    )r-   r,   tasksr&   excvaluetaskr   )r4   r5   r6   r   as_generatedO   sn   

rC   r,   limitargsrF   c                    s  i }i }dgt | }t }t }d}	 |t |k r`|dks%t ||k r`|| |v r5|||  | nt|| }	||	 |||	< |g||| < |d7 }|t |k r`|dks%t ||k s%|rz+tj|tjdI dH \}}|D ]}
| r|
 r|
 |||
 < qr|
	 |||
 < qrW n tj
y   |D ]}
|
  qtj|ddiI dH   w |s|t |krnq| D ]}tdt |D ]}||d  ||| < qq|S )aR  
    Like asyncio.gather but with a limit on concurrency.

    Note that all results are buffered.

    If gather is cancelled all tasks that were internally created and still pending
    will be cancelled as well.

    Example::

        futures = [some_coro(i) for i in range(10)]

        results = await gather(*futures, limit=2)
    Nr   TrD   r	   )r   r,   )lenr   appendr   r   addr#   r$   	exceptionresultr0   r    gathervaluesrange)r,   rF   rG   	input_mapposretr&   r%   next_argrB   xlstir   r   r   rM      sR    
 
)rM   itrc                    s,   t dd t| 2 I dH ||dI dH S )z
    Wrapper around gather to handle gathering an iterable instead of ``*args``.

    Note that the iterable values don't have to be awaitable.
    c                    s    g | z
3 d H W }t |q6 S r.   )r   )r   rV   r   r   r   r7      s    zgather_iter.<locals>.<listcomp>NrE   )rM   aiter)rW   r,   rF   r   r   r   gather_iter   s   
rY   )FrD   )__doc__r   r   collections.abcr   r   r   r   typingr   r   r   builtinsr
   rX   r   typesr   r   r   r   floatr+   boolrC   intr;   rM   rY   r   r   r   r   <module>   sZ   

?

S
Q
