o
    ̳i                     @   s   d dl Z d dlZd dlZd dlmZmZmZmZ d dlZd dl	Zd dl
mZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ ded	ejd
ededejdejdeeef dee fddZG dd dee ZdS )    N)AnyDictOptionalUnion)
pin_memory)BaseNodeT)ExceptionWrapperStartupExceptionWrapper)_SingleThreadedMapper)MonotonicIndexSnapshotStoresourceqsnapshot_storesnapshot_frequency	semaphore
stop_event	device_iddevicec              
      s  t   		ddtdtttttf tf  f fdd}zOt	d tj
d |d	kr4tj| n|d
kr?tj| n|tj krStttj }	|	| t|tr\|dkscJ d| j|  d W n ty   td| d}
j|
d Y dS w d}| s|jdddsqz&t| }t||}|d7 }d}|dkr|| dkr|  }||d|d W n2 ty }
 z|
}||dd W Y d}
~
dS d}
~
w ty   td| d}||dd Y dS w | rdS dS )zThis is fork of from torch.utils.data._utils.pin_memory import _pin_memory_loop
    to remove the index tuples.

    This setting is thread local, and prevents the copy in pin_memory from
    consuming all CPU cores.
    TNblocksnapshotc                    s0      }|rj||d j| |f|d d S )N)r   versionr   )getappendput)itemr   r   _idxidxr   r    N/home/ubuntu/.local/lib/python3.10/site-packages/torchdata/nodes/pin_memory.py_put+   s   z_pin_memory_loop.<locals>._put   pt_data_pincudaxpur   z5snapshot_frequency must be non-negative integer! Got )r   z'in _pin_memory_loop startup for device )whereg?)blockingtimeoutF)r   r   r   zin _pin_memory_loop for device )TN)r   boolr   r   r   strr   r
   torchset_num_threadsmultiprocessing_set_thread_namer&   
set_devicer'   _C_get_privateuse1_backend_namegetattr
isinstanceintappend_initial_snapshot
state_dict	Exceptionis_setacquirenextr   StopIterationr	   )r   r   r   r   r   r   r   r   r#   custom_device_modeyieldedr   r   r!   r   r"   _pin_memory_loop   sh   




rA   c                       sv   e Zd ZdZ		ddee dedef fddZdd
e	e
eef  f fddZdd Zde
eef fddZ  ZS )	PinMemoryau  Pins the data of the underlying node to a device. This is backed by torch.utils.data._utils.pin_memory._pin_memory_loop.

    Args:
        source (BaseNode[T]): The source node to pin the data from.
        pin_memory_device (str): The device to pin the data to. Default is "".
        snapshot_frequency (int): The frequency at which to snapshot the state of the source node. Default is
            1, which means that the state of the source node will be snapshotted after every item. If set
            to a higher value, the state of the source node will be snapshotted after every snapshot_frequency
            items.
     r$   r   pin_memory_devicer   c                    s   t    || _|| _t|dkrd | _n|| _| jdkr$tj | _	n| jtj
 kr:tttj
 }| | _	ntj | _	d | _d S )Nr   r'   )super__init__r   r   len_pin_memory_devicer-   r'   current_device_current_devicer2   r3   r4   r&   _it)selfr   rD   r   r>   	__class__r!   r"   rF   m   s   


zPinMemory.__init__Ninitial_statec                    sP   t  | | jd ur| j  | `t| jdtjt| j	| j
d| j|d| _d S )Nr$   )r   r   )r   prefetch_factorworkerr   rO   )rE   resetrK   	_shutdownr   r   	functoolspartialrA   rJ   rH   r   )rL   rO   rM   r!   r"   rR      s   

zPinMemory.resetc                 C   s
   t | jS N)r<   rK   rL   r!   r!   r"   r<         
zPinMemory.nextreturnc                 C   s
   | j  S rV   )rK   	get_staterW   r!   r!   r"   rZ      rX   zPinMemory.get_state)rC   r$   rV   )__name__
__module____qualname____doc__r   r   r,   r6   rF   r   r   r   rR   r<   rZ   __classcell__r!   r!   rM   r"   rB   a   s     rB   )rT   queue	threadingtypingr   r   r   r   r-   torch.multiprocessing"torch.utils.data._utils.pin_memoryr   torchdata.nodes.base_noder   r   !torchdata.nodes.exception_wrapperr	   r
   torchdata.nodes.mapr   torchdata.nodes.snapshot_storer   r   Queuer6   BoundedSemaphoreEventr,   rA   rB   r!   r!   r!   r"   <module>   s:   

I