o
    wiA                     @   sj   d Z ddlZddlZddlZddlZddlZddlZeja	e
 ZG dd dZdd ZG dd dZdS )	zAn alternative to DataLoader using ZMQ.

This implements MultiLoader, an alternative to DataLoader when torch
is not available. Subprocesses communicate with the loader through
ZMQ, provided for high performance multithreaded queueing.
    Nc                   @   s   e Zd ZdZdd ZdS )EOFzIndicate that a data stream is finished.

    This class is used to signal the end of a data stream in the MultiLoader.

    Args:
        **kw: Arbitrary keyword arguments to be set as instance variables.
    c                 K   s   | j | dS )zInitialize the EOF instance with keyword arguments.

        Args:
            **kw: Arbitrary keyword arguments to be set as instance variables.
        N)__dict__update)selfkw r   M/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/webdataset/multi.py__init__$   s   zEOF.__init__N)__name__
__module____qualname____doc__r	   r   r   r   r   r      s    r   c                 C   s   t |tjd< t |tjd< tj }|tj}|| | D ]}t	j
|td}|| q |t	
t|d |  dS )a  Read samples from the dataset and send them over the socket.

    This function is run in a separate process to read data from the dataset
    and send it to the main process through a ZMQ socket.

    Args:
        dataset: The source dataset to read samples from.
        sockname (str): The name of the ZMQ socket to send data to.
        index (int): The index of this reader process.
        num_workers (int): The total number of worker processes.

    Returns:
        None
    WORKERNUM_WORKERS)protocol)indexN)strosenvironzmqContextinstancesocketPUSHconnectpickledumpsthe_protocolsendr   close)datasetsocknamer   num_workersctxsocksampledatar   r   r   reader-   s   

r'   c                   @   s*   e Zd ZdZdddZdd Zd	d
 ZdS )MultiLoadera[  Alternative to PyTorch DataLoader based on ZMQ.

    This class creates a multi-process data loader using ZMQ for inter-process
    communication, providing an alternative to PyTorch's DataLoader.

    Args:
        dataset: The source dataset to load data from.
        workers (int): Number of worker processes to spawn. Defaults to 4.
        verbose (bool): Whether to report progress verbosely. Defaults to False.
        nokill (bool): If True, don't kill old processes when restarting. Defaults to False.
        prefix (str): Directory prefix for the ZMQ socket. Defaults to "/tmp/_multi-".
       F/tmp/_multi-c                 C   s:   || _ || _|| _g | _d| _tj | _|| _	|| _
dS )a  Initialize the MultiLoader instance.

        Args:
            dataset: The source dataset to load data from.
            workers (int): Number of worker processes to spawn. Defaults to 4.
            verbose (bool): Whether to report progress verbosely. Defaults to False.
            nokill (bool): If True, don't kill old processes when restarting. Defaults to False.
            prefix (str): Directory prefix for the ZMQ socket. Defaults to "/tmp/_multi-".
        N)r    workersverbosepidsr   r   r   r   r#   nokillprefix)r   r    r+   r,   r.   r/   r   r   r   r	   W   s   

zMultiLoader.__init__c                 C   sb   | j D ]}|du r
qtd| |  |d qg | _ | jdur,td| j | j  d| _dS )z3Kill all worker processes and close the ZMQ socket.Nkilling      ?closing)r-   printkilljoinr   r   )r   pidr   r   r   r4   j   s   




zMultiLoader.killc                 c   sR   | j s|   d| j tt  | _| jt	j
| _| j| j | jr,td| j dg| j | _t| jD ]}| j| j|| jf}tjt|d| j|< q8t| j | jD ]}|  qWd}| jdt| jk r| j }t|}t|tr| jrtd|j | j|j  d d| j|j< n|V  |d7 }| jdt| jk skdS dS )	a  Return an iterator over this dataloader.

        This method sets up the ZMQ socket, spawns worker processes, and yields
        samples from the dataset.

        Yields:
            Sample: A sample from the dataset.

        Raises:
            None
        zipc://#N)targetargsr   z# subprocess finishedr1      )!r.   r4   r/   r   uuiduuid4r!   r#   r   r   PULLbindr,   r3   r+   r-   ranger    mpProcessr'   all_pidsr   startcountlenrecvr   loads
isinstancer   r   r5   )r   r   r9   r6   rD   r&   r%   r   r   r   __iter__x   s6   




zMultiLoader.__iter__N)r)   FFr*   )r
   r   r   r   r	   r4   rI   r   r   r   r   r(   I   s
    
r(   )r   multiprocessingr@   r   r   r;   weakrefr   HIGHEST_PROTOCOLr   WeakSetrB   r   r'   r(   r   r   r   r   <module>   s   