o
    ̳iT                     @   s  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZmZ ddlmZ dd	l m Z  dd
l!m"Z" dZ#G dd deZ$edZ%dee% dee%gef ddfddZ&ee% Z'ee Z(G dd dee%ef Z)dee j*ej*f de j*dej+fddZ,G dd dee Z-G dd dee Z.G d d! d!ee Z/G d"d# d#ee Z0eee j*ee1ej2ej+gdf Z3G d$d% d%ee Z4dS )&    N)AnyCallableDictGenericIteratorListLiteralOptionalProtocolSequenceTypeVarUnion)BaseNodeT)Batcher	Unbatcher)ExceptionWrapperStartupExceptionWrapper)QueueSnapshotStoreSnapshotStore   )
_apply_udf)_populate_queue)QUEUE_TIMEOUTi,  c                   @   s$   e Zd Zdd Zdd Zdd ZdS )_MultiprocessContextc                 O      d S N selfargskwargsr   r   G/home/ubuntu/.local/lib/python3.10/site-packages/torchdata/nodes/map.pyProcess      z_MultiprocessContext.Processc                 O   r   r   r   r   r   r   r"   Event    r$   z_MultiprocessContext.Eventc                 O   r   r   r   r   r   r   r"   Queue#   r$   z_MultiprocessContext.QueueN)__name__
__module____qualname__r#   r%   r&   r   r   r   r"   r      s    r   Xsourcemap_fnreturnzParallelMapper[T]c                 C   s   t | |ddS )a  Returns a :class:`ParallelMapper` node with num_workers=0, which will execute map_fn in the current process/thread.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
    r   )r+   r,   num_workers)ParallelMapper)r+   r,   r   r   r"   Mapper*   s
   r0   c                   @   s>   e Zd Zdeegef fddZdee dee fddZdS )	MapOverBatchr,   c                 C   s
   || _ d S r   r,   )r   r,   r   r   r"   __init__=      
zMapOverBatch.__init__xlistr-   c                    s    fdd|D S )Nc                    s   g | ]}  |qS r   r2   ).0xr   r   r"   
<listcomp>A   s    z)MapOverBatch.__call__.<locals>.<listcomp>r   )r   r5   r   r8   r"   __call__@      zMapOverBatch.__call__N)	r'   r(   r)   r   r*   r   r3   r   r:   r   r   r   r"   r1   <   s    r1   in_qout_q
stop_eventc                 C   s  i }d}|  sz| jdtd\}}W n
 tjy   Y qw ||kr0|j||fdd |d7 }n1||v r]ztd|d| d	| tyQ   t	d
d}Y nw |j||fdd d S |||< ||v ry|j|
||fdd |d7 }||v se|  rd S d S )Nr   TblocktimeoutF)r@   r   zDuplicate index idx=z, buffer.keys()=z, item=zin _sort_worker)where)is_setgetr   queueEmptyput
ValueErrorkeys	Exceptionr   pop)r<   r=   r>   buffercur_idxitemidxr   r   r"   _sort_workerD   s2   
rP   c                
   @   sn   e Zd ZdZdZ	ddee deegef de	e
eef  fddZdd	 Zd
e
eef fddZdd ZdS )_InlineMapperIterz%Non-Parallel implementation of Mapperr+   Nr,   initial_statec                 C   s8   || _ || _|d ur| j || j  d S | j   d S r   )r+   r,   reset
SOURCE_KEY)r   r+   r,   rR   r   r   r"   r3   c   s
   z_InlineMapperIter.__init__c                 C   s   |  t| jS r   )r,   nextr+   r8   r   r   r"   __next__p      z_InlineMapperIter.__next__r-   c                 C      | j | j iS r   )rT   r+   
state_dictr8   r   r   r"   	get_states   rW   z_InlineMapperIter.get_statec                 C   r   r   r   r8   r   r   r"   	_shutdownv   r$   z_InlineMapperIter._shutdownr   )r'   r(   r)   __doc__rT   r   r*   r   r   r	   r   strr   r3   rV   rZ   r[   r   r   r   r"   rQ   ^   s    
rQ   c                   @   s   e Zd ZdZdee deegef dede	de
d ded	ee d
edeeeef  fddZdee fddZdefddZdeeef fddZdefddZdd Zdd ZdS )_ParallelMapperItera  _ParallelMapperIter will start at least two threads, one running
    _populate_queue, and one for _apply_udf. If in_order == True, a
    third thread will be started to read from _apply_udf's result q
    and block the output_q until the appropriate in_order element is available,
    buffering outputs as needed.

    A BoundedSemaphore with initial value max_concurrent will limit the number
    of items in flight, and in all of the queues.
    r+   r,   r.   in_ordermethodthreadprocess
mp_contextmax_concurrentsnapshot_frequencyrR   c
              
   C   sT  || _ || _|| _|| _|| _|| _|| _|dkrt n| | _	|dkr*t n| | _
|d u r8d| j n|| _tj| jd| _d| _t | _| | _d| _d}
|	d urj|	d | _|	d }
| j | j nd | _| j   t | _tjt| j | j	| j| j| j| jfdd	| _g | _t| jD ].}|| j	| j
| j| jdkr| jn| jf}| j| jdkrtjt|dd	n|jt|dd	 qt | _ tjt!| j
| j | jfdd	| _"| j
| _#| jr| j | _#| j$  | jD ]}|$  q| jr| j"$  t%&d
 | jj'| jt(d| _t|
D ]}zt)|  W q t*y'   t+d|
 d| dw d S )Nrb      valueFr   snapshotsteps_since_snapshotTtargetr    daemong{Gz?rb   rA   Tried to fast-forward / items during init but hit StopIteration after 4 items, this is likely a bug or malformed state_dict),r+   r,   r.   r_   r`   rd   rf   rE   r&   _in_q_intermed_q
_max_tasks	threadingBoundedSemaphore_sem_doner%   _stop_mp_stop_steps_since_snapshot	_snapshotrS   r   _snapshot_storeThreadr   _read_thread_workersrangeappendr   r#   _sort_qrP   _sort_thread_out_qstarttimesleepget_initial_snapshotACK_TIMEOUTrU   StopIterationrH   )r   r+   r,   r.   r_   r`   rd   re   rf   rR   fast_forward	worker_idr    tir   r   r"   r3      s   










z_ParallelMapperIter.__init__r-   c                 C      | S r   r   r8   r   r   r"   __iter__   r$   z_ParallelMapperIter.__iter__c                 C   s   	 | j  r	t | jr | jj| jkr | j   | j  t z| j	j
dtd\}}W n
 tjy6   Y q w t|trEd| _| j  q t|trXt|tsT| j  |  |  jd7  _| j  | | |S NTr?   r   )rz   rC   r   ry   rx   _valueru   setr{   r   rD   r   rE   rF   
isinstancereleaser   r   reraiser|   _maybe_update_snapshotr   rN   rO   r   r   r"   rV      s0   









z_ParallelMapperIter.__next__c                 C      | j | jdS N)rj   rk   r}   r|   r8   r   r   r"   rZ        z_ParallelMapperIter.get_staterO   c                 C   (   | j | }d ur|| _d| _d S d S Nr   r~   pop_versionr}   r|   r   rO   rj   r   r   r"   r        
z*_ParallelMapperIter._maybe_update_snapshotc                 C      |    d S r   r[   r8   r   r   r"   __del__     z_ParallelMapperIter.__del__c                 C   s   | j   | j  t| dr| j r| jjtd d t| dr0| j r0| jjtd d t| drG| j	D ]}| rF|jtd d q8d S d S )Nr      rA   r   r   )
rz   r   r{   hasattrr   is_alivejoinr   r   r   )r   r   r   r   r"   r[     s   



z_ParallelMapperIter._shutdownN)r'   r(   r)   r\   r   r*   r   r   intboolr   r   r	   r   r]   r   r3   r   r   rV   rZ   r   r   r[   r   r   r   r"   r^   z   s6    
	

`r^   c                       s   e Zd ZdZ					ddee deegef ded	e	d
e
d dee dee def fddZddeeeef  f fddZdeeeef  fddZdeeeef  fddZdefddZdeeef fddZ  ZS )_ParallelMapperImplaL  This class implements _ParallelMapperIter and _InlineMapperIter as a BaseNode,
    allowing them to be composed with other BaseNodes.

    TODO: In the future, this class may go away once we implement reset() on
    _ParallelMapperIter and _InlineMapperIter themselves so we don't need this
    additional level of abstraction.
    Trb   Nr   r+   r,   r.   r_   r`   ra   multiprocessing_contextre   rf   c	           	         s   t    |dv sJ || _|| _|| _|| _|| _|| _t| _	| jdkr1| jd ur1t
| j| _	|d urM|dkrMt|trM||krMtd|d|d|| _|| _d | _d S )Nra   rc   r   max_concurrent= should be <= num_workers=!)superr3   r+   r,   r.   r_   r`   r   mp_mp_contextget_contextr   r   rH   re   rf   _it)	r   r+   r,   r.   r_   r`   r   re   rf   	__class__r   r"   r3   )  s"   

z_ParallelMapperImpl.__init__rR   c                    sD   t  | | jd ur| `| jdkr| || _d S | || _d S r   )r   rS   r   r.   _parallel_reset_inline_resetr   rR   r   r   r"   rS   G  s   

z_ParallelMapperImpl.resetc                 C   s   t | j| j|dS )N)r+   r,   rR   )rQ   r+   r,   r   r   r   r"   r   Q  r;   z!_ParallelMapperImpl._inline_resetc                 C   s*   t | j| j| j| j| j| j| j| j|d	S )N)	r+   r,   r.   r_   r`   rd   re   rf   rR   )	r^   r+   r,   r.   r_   r`   r   re   rf   r   r   r   r"   r   T  s   z#_ParallelMapperImpl._parallel_resetr-   c                 C   
   t | jS r   rU   r   r8   r   r   r"   rU   a  r4   z_ParallelMapperImpl.nextc                 C   s
   | j  S r   )r   rZ   r8   r   r   r"   rZ   d  r4   z_ParallelMapperImpl.get_state)Trb   NNr   r   )r'   r(   r)   r\   r   r*   r   r   r   r   r   r	   r]   r3   r   r   rS   r   r   rU   rZ   __classcell__r   r   r   r"   r      s:    	 
r   c                       s   e Zd ZdZdZ						ddee deegef d	e	d
e
ded dee dee	 de	dee	 f fddZddeeeef  f fddZdefddZdeeef fddZ  ZS )r/   a  ParallelMapper executes map_fn in parallel either in num_workers threads or
    processes. For processes, multiprocessing_context can be spawn, forkserver, fork,
    or None (chooses OS default). At most max_concurrent items will be either processed
    or in the iterator's output queue, to limit CPU and Memory utilization. If None
    (default) the value will be 2 * num_workers.

    At most one iter() is created from source, and at most one thread will call
    next() on it at once.

    If in_order is true, the iterator will return items in the order from which they arrive
    from source's iterator, potentially blocking even if other items are available.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
        num_workers (int): The number of workers to use for parallel processing.
        in_order (bool): Whether to return items in the order from which they arrive from. Default is True.
        method (Literal["thread", "process"]): The method to use for parallel processing. Default is "thread".
        multiprocessing_context (Optional[str]): The multiprocessing context to use for parallel processing. Default is None.
        max_concurrent (Optional[int]): The maximum number of items to process at once. Default is None.
        snapshot_frequency (int): The frequency at which to snapshot the state of the source node. Default is 1.
        prebatch (Optional[int]): Optionally perform pre-batching of items from source before mapping.
          For small items, this may improve throughput at the expense of peak memory.
    it_stateTrb   Nr   r+   r,   r.   r_   r`   ra   r   re   rf   prebatchc
              
      s  t    |dv sJ || _|| _|| _|| _|d ur3|dkr3t|tr3||kr3td|d|d|| _	|| _
|	| _|	d u rG|| _|| _n|	dkrStd|	dt|d| _t||	d	d
| _t| j| j| j| j| j| j| j	| j
d}
| jd u r|
| _d S t|
| _d S )Nra   r   r   r   r   z	prebatch=z must be a positive integer!r2   F)
batch_size	drop_last)r+   r,   r.   r_   r`   r   re   rf   )r   r3   r.   r_   r`   r   r   r   rH   re   rf   r   r,   r+   r1   r   r   r   r   )r   r+   r,   r.   r_   r`   r   re   rf   r   r   r   r   r"   r3     s@   


zParallelMapper.__init__rR   c                    s8   t  | |d ur| j|| j  d S | j  d S r   )r   rS   r   IT_STATE_KEYr   r   r   r"   rS     s   zParallelMapper.resetr-   c                 C   r   r   r   r8   r   r   r"   rU     r4   zParallelMapper.nextc                 C   rX   r   )r   r   rY   r8   r   r   r"   rZ     rW   zParallelMapper.get_state)Trb   NNr   Nr   )r'   r(   r)   r\   r   r   r*   r   r   r   r   r   r	   r]   r3   r   r   rS   rU   rZ   r   r   r   r   r"   r/   h  s>    	
 1r/   c                   @   s   e Zd ZdZdee dedededee	e
ef  f
ddZd	ee fd
dZd	efddZd	e	e
ef fddZdefddZdd Zdd ZdS )_SingleThreadedMappera  Utility Iterator for performing mapping with a single thread.
    Because only a single thread is used, we don't need an input queue to guard
    against multiple threads reading from the same iterator. This is used for
    Prefetcher and PinMemory.

    A thread is started on __init__ and stopped on __del__/_shutdown.
    The thread runs _populate_queue, which acquires a BoundedSemaphore with initial value
    of `prefetch_factor`.

    When next() is called on this iterator, it will block until an item is available on _q.
    Next will perform the following depending on what is pulled from the q:
    - StopIteration: raise StopIteration. Any subsequent next() calls will also raise StopIteration
    - ExceptionWrapper: call reraise() on the exception wraper
    - any other item: return the item

    A Bounded semaphore is used to limit concurrency and memory utilization.
    If N items have been pulled from the source, and M items have been yielded by this iterator,
    we maintain the invariant that semaphore.value + (N - M) == prefetch_factor (modulo
    non-atomicness of operations).

    _populate_queue calls semaphore.acquire. When we pull an item from the queue, we
    call semaphore.release (unless it's a StartupExceptionWrapper, because _populate_queue
    does not acquire sempahores in this case). All outstanding items are either being
    processed in _populate_queue, in the _q, or about to be returned by an in-flight next() call.
    r+   prefetch_factorworkerrf   rR   c              
   C   s   || _ || _|| _|| _t | _tj|d| _	t
 | _d| _d| _|d ur9|d | _|d | _| j | j nd | _| j   t | _tj| j| j | j| j| j| j	| jfdd| _| j  | jj| jtd| _t| jD ]}zt|  W qo ty   td| j d	| d
w d| _d S )Nrh   r   rj   rk   Trl   ro   rp   rq   rr   )r+   r   r   rf   rE   r&   _qrv   rw   rx   r%   _stop_eventr|   _fast_forwardr}   rS   r   r~   r   _threadr   r   r   r   rU   r   rH   )r   r+   r   r   rf   rR   r   r   r   r"   r3     sP   







z_SingleThreadedMapper.__init__r-   c                 C   r   r   r   r8   r   r   r"   r   !  r$   z_SingleThreadedMapper.__iter__c                 C   s   	 | j  r	t z| jjdtd\}}W n
 tjy   Y q w t|tr1| j	
  | j   |t|trJt|ts@| j	
  | j   |  n| j	
  |  jd7  _| | |S qr   )r   rC   r   r   rD   r   rE   rF   r   rx   r   r   r   r   r   r|   r   r   r   r   r"   rV   $  s,   










z_SingleThreadedMapper.__next__c                 C   r   r   r   r8   r   r   r"   rZ   =  r   z_SingleThreadedMapper.get_staterO   c                 C   r   r   r   r   r   r   r"   r   C  r   z,_SingleThreadedMapper._maybe_update_snapshotc                 C   r   r   r   r8   r   r   r"   r   H  r   z_SingleThreadedMapper.__del__c                 C   s<   | j   t| dr| j r| jjtd d d S d S d S )Nr   r   r   )r   r   r   r   r   r   r   r8   r   r   r"   r[   K  s   
z_SingleThreadedMapper._shutdownN)r'   r(   r)   r\   r   r   r   _WorkerTyper	   r   r]   r   r3   r   r   rV   rZ   r   r   r[   r   r   r   r"   r     s&    
6r   )5rE   rv   r   typingr   r   r   r   r   r   r   r	   r
   r   r   r   torch.multiprocessingmultiprocessingr   torchdata.nodes.base_noder   r   torchdata.nodes.batchr   r   !torchdata.nodes.exception_wrapperr   r   torchdata.nodes.snapshot_storer   r   r   r   	constantsr   r   r   r*   r0   XseqTseqr1   r&   r%   rP   rQ   r^   r   r/   r   rw   r   r   r   r   r   r"   <module>   sJ   8$& 'H[