o
    Ti                     @   s   d Z ddlZddlZddlZddlmZ ddlmZmZ ddl	m
Z
mZmZ dd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )zM
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
    N)AsyncIOBuilder)PoolBarrier   )report_resultstask_logtask_barrierc              	   C   s   |rdnd}|rt j| jn| j}|r| jn| j d| }t|d| d tj|tj	dd
 }t|| d| d	| d
|j  i }||d< ||d< ||d< d|d< |S )NReadWrite.zAllocate tensor of size z bytescpu)dtypedevicez file z	 of size z bytes from buffer on device file	num_bytesbufferr   elapsed_sec)ospathgetsize	read_file
write_size
write_filer   torchemptyuint8
pin_memoryr   )argstidread_op	io_stringr   r   r   ctxt r"   O/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/nvme/ds_aio_basic.py	pre_basic   s   "r$   c                 C      | \}}t ||d}|S )NTr$   pool_paramsr   r   r!   r"   r"   r#   pre_basic_read#      r)   c                 C   r%   )NFr&   r'   r"   r"   r#   pre_basic_write)   r*   r+   c                 C   s"   | \}}}|d    d |d< |S )Nr   )detach)r(   _r!   r"   r"   r#   
post_basic/   s   
r.   c              	   C   b   | \}}}t   }t  |d |d |j|j|j|j |j t   }|d  || 7  < |S Nr   r   r   )	timer   loadaio_read
block_sizequeue_depthsingle_submitsequential_requestsvalidater(   r   r   r!   
start_timeend_timer"   r"   r#   main_basic_read6      
r<   c              	   C   r/   r0   )	r1   r   r2   	aio_writer4   r5   r6   r7   r8   r9   r"   r"   r#   main_basic_writeA   r=   r?   c                 C   s@   i }|rt |d< t|d< t|d< |S t|d< t|d< t|d< |S )Nprepostmain)r)   r.   r<   r+   r?   )r   r   scheduler"   r"   r#   get_scheduleL   s   rD   c           
      C   s  | \}}}t |j}t||}t|d|  tt| t|d |d ||f}tt| d|d< t|jD ]*}t|d|  t }|d |||f}tt| t }	|d  |	| 7  < q7t|d |d	 |||f}tt| |d |d
 |d |j fS )Nzschedule = zrunning pre-taskr@   r   main_task_seczrunning main task rB   zrunning post-taskrA   r   r   )	lenmapping_dictrD   r   r   aio_barrierrangeloopsr1   )
r(   r   r   r   num_processesrC   r!   ir:   	stop_timer"   r"   r#   _aio_handle_taskletZ   s(   








rN   c                 C   s   | a d S )N)rH   )br"   r"   r#   _init_taskletz   s   rP   c                    sv   t  j}t|} fddt|D }t|t|fd}|t|}W d    n1 s.w   Y  t | d S )Nc                    s   g | ]} |fqS r"   r"   ).0pr   r   r"   r#   
<listcomp>   s    z-aio_basic_multiprocessing.<locals>.<listcomp>)	processesinitializerinitargs)	rF   rG   r   rI   r   rP   maprN   r   )r   r   rK   rO   r(   rR   pool_resultsr"   rS   r#   aio_basic_multiprocessing   s   
rZ   )__doc__r   r   r1   deepspeed.ops.aior   multiprocessingr   r   test_ds_aio_utilsr   r   r   r$   r)   r+   r.   r<   r?   rD   rN   rP   rZ   r"   r"   r"   r#   <module>   s"    