o
    TiA                     @   s~   d dl mZmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZ d dlmZmZmZ G dd dZG d	d
 d
eZdS )    )
DataLoaderRandomSampler)DistributedSampler)get_accelerator)DeepSpeedDataSampler)CURRICULUM_LEARNINGDATA_EFFICIENCYDATA_SAMPLING_NUM_WORKERS)GRADIENT_ACCUMULATION_STEPSDATA_PARALLEL_GROUPGLOBAL_RANKc                   @   s$   e Zd Zdd Zdd Zdd ZdS )RepeatingLoaderc                 C   s   || _ t| j | _dS )zWraps an iterator to allow for infinite iteration. This is especially useful
        for DataLoader types that we wish to automatically restart upon completion.

        Args:
            loader (iterator): The data loader to repeat.
        N)loaderiter	data_iter)selfr    r   P/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/dataloader.py__init__   s   zRepeatingLoader.__init__c                 C   s   | S Nr   r   r   r   r   __iter__   s   zRepeatingLoader.__iter__c                 C   s<   zt | j}W |S  ty   t| j| _t | j}Y |S w r   )nextr   StopIterationr   r   )r   batchr   r   r   __next__    s   zRepeatingLoader.__next__N)__name__
__module____qualname__r   r   r   r   r   r   r   r      s    
r   c                   @   sD   e Zd Zddddddi fddZdd Zdd Zd	d
 Zdd ZdS )DeepSpeedDataLoaderNFc                 C   sR  || _ || _|| _d| _t|v r|t | _| jr?t| j t t|| j|
|	| j t | j t	 | j t
 |d	}t  }| j t }n+|dkrQ|d u rNt||	|
d}d}n|d u rYt|}t  }||9 }|d u rjd| }|| _|| _|| _|| _|| _|| _|| _d | _|| _d | _| jrt| j| j | _d S ddlm} |t| j| j | _d S )NF)	drop_lastr   )datasetnum_replicasrank      )ceil)deepspeed_dataloader_config
tput_timer
batch_sizecurriculum_learning_enabledr   r   r   lenr   r
   r   r   device_countr	   r   r   num_local_io_workersdata_samplerr!   
collate_fn
pin_memorydatadataloader_drop_lastpost_process_funcmathr&   )r   r!   r)   r0   
local_rankr(   r/   r-   r.   data_parallel_world_sizedata_parallel_rankr2   r'   r,   r&   r   r   r   r   +   s\   


	
zDeepSpeedDataLoader.__init__c                 C   s   |    | S r   )_create_dataloaderr   r   r   r   r   l   s   zDeepSpeedDataLoader.__iter__c                 C   s   | j S r   )r+   r   r   r   r   __len__p   s   zDeepSpeedDataLoader.__len__c                 C   sJ   | j r| j   | jr t| j}| jd ur| || j }|S t| jS r   )	r(   startr*   r   data_iteratorr3   r.   
state_dictr1   )r   r1   r   r   r   r   s   s   



zDeepSpeedDataLoader.__next__c              	   C   s   | j r.| jd u rt| j| j| j| jd| _nt| j| j| j| j| jd| _t| j| _	| jS | jd u rEt| j| j
| j| j| j| jd| _nt| j| j
| j| j| j| j| jd| _dd | jD | _| jS )N)r0   batch_samplernum_workers)r0   r=   r/   r>   )r)   r0   samplerr>   r    )r)   r0   r?   r/   r>   r    c                 s   s    | ]}|V  qd S r   r   ).0xr   r   r   	<genexpr>   s    z9DeepSpeedDataLoader._create_dataloader.<locals>.<genexpr>)r*   r/   r   r!   r0   r.   r-   
dataloaderr   r;   r)   r2   r1   r   r   r   r   r8   ~   sB   



z&DeepSpeedDataLoader._create_dataloader)r   r   r   r   r   r9   r   r8   r   r   r   r   r   )   s    
Ar   N)torch.utils.datar   r   torch.utils.data.distributedr   deepspeed.acceleratorr   :deepspeed.runtime.data_pipeline.data_sampling.data_samplerr   )deepspeed.runtime.data_pipeline.constantsr   r   r	   deepspeed.runtime.constantsr
   r   r   r   objectr   r   r   r   r   <module>   s   