o
    ̳i3                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZ d dl	m
Z
 d dlmZ eG dd dZG dd	 d	eZG d
d deZdS )    N)	dataclass)AnyOptionalProtocol)QUEUE_TIMEOUT)ExceptionWrapperc                   @   s0   e Zd ZU dZeed< dd ZdefddZdS )	MonotonicIndexr   initialc                 C   s   | j | _d S N)r	   _idxself r   R/home/ubuntu/.local/lib/python3.10/site-packages/torchdata/nodes/snapshot_store.py__post_init__   s   zMonotonicIndex.__post_init__returnc                 C   s   | j }|  j d7  _ |S )N   )r   )r   idxr   r   r   get   s   zMonotonicIndex.getN)__name__
__module____qualname__r	   int__annotations__r   r   r   r   r   r   r      s   
 r   c                   @   s^   e Zd ZdZdedefddZdedee fddZdefd	d
Z	de
jdedefddZdS )SnapshotStorezHProtocol for passing snapshot state around between threads and processessnapshotversionc                 C      d S r
   r   r   r   r   r   r   r   append      zSnapshotStore.appendr   c                 C   r   r
   r   )r   r   r   r   r   pop_version   r    zSnapshotStore.pop_versionc                 C   r   r
   r   r   r   r   r   r   append_initial_snapshot"   r    z%SnapshotStore.append_initial_snapshotthreadtimeoutc                 C   r   r
   r   )r   r$   r%   r   r   r   get_initial_snapshot%   r    z"SnapshotStore.get_initial_snapshotN)r   r   r   __doc__r   r   r   r   r!   r#   	threadingThreadfloatr&   r   r   r   r   r      s    r   c                   @   sv   e Zd ZdZdZdddZdededdfd	d
Zdede	e fddZ
deddfddZddejdedefddZdS )QueueSnapshotStorez5A snapshot store that uses a queue to store snapshotsr   Nc                 C   s   t  | _t | _d| _d S )Ni)queueQueue_qr(   Lock_lock_max_versionr   r   r   r   __init__.   s   


zQueueSnapshotStore.__init__r   r   c                 C   sb   | j $ || jkrtd|d| j|| _| j||f W d    d S 1 s*w   Y  d S )Nzversion=z0 is not strictly greater than self._max_version=)r1   r2   
ValueErrorr/   putr   r   r   r   r   3   s   
"zQueueSnapshotStore.appendc                 C   s   d\}}| j + | jjr+|| jjd d kr+| j \}}| jjr+|| jjd d ksW d    n1 s5w   Y  ||kr@|S d S )N)NNr   )r1   r/   r-   
get_nowait)r   r   vervalr   r   r   r!   :   s   zQueueSnapshotStore.pop_versionc                 C   s   |  || j d S r
   )r   SNAPSHOT_INIT_VERSIONr"   r   r   r   r#   E   s   z*QueueSnapshotStore.append_initial_snapshot      N@r$   r%   c              	   C   s   d }d }t   }|d u r;t   | |k r;z| jjtd\}}W n
 tjy)   Y nw | s/n|d u r;t   | |k s|d urHt|trH|	  |d u sQ|| j
krgtdt   |  d| d|d||S )N)r%   z%Failed to get initial snapshot after z seconds! thread.is_alive()=z, snapshot=z, ver=)timer/   r   r   r-   Emptyis_alive
isinstancer   reraiser9   RuntimeError)r   r$   r%   r   r7   ack_t0r   r   r   r&   H   s&   
&z'QueueSnapshotStore.get_initial_snapshot)r   N)r:   )r   r   r   r'   r9   r3   r   r   r   r   r!   r#   r(   r)   r*   r&   r   r   r   r   r+   )   s    
r+   )r-   r(   r;   dataclassesr   typingr   r   r   torchdata.nodes.constantsr   !torchdata.nodes.exception_wrapperr   r   r   r+   r   r   r   r   <module>   s    