o
    Ti                     @   s   d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZ d	Zd
Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Z dS )#zM
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
    N)PoolBarrier)AsyncIOBuilder)
GDSBuilder)get_accelerator   )report_resultstask_logtask_barriercreate_filenamecreate_filebufferbounce_bufferc              	   C   s  |rdnd}| j rdnd}| j| \}}t|| j| j|}| jr5tj|r/tj|| jks5t	|| j t
|d| j d d }| jrjt |}	tjd| jftj|	d}
| jsi|sitjd| jftjd	d }ntjd| jftjd	d }
t
|| d
| d| j d|
j dd | jr| jnd}|rt  | j| j| j| j |}||
 nt  | j| j| j| j |}t
|d i }||d< | j|d< ||d< ||d< |
|t< ||t < d|d< |S )NReadWriteTFzAllocate tensor of size z bytes   )highsizedtypedevicecpuz file z	 of size z bytes from buffer on device )forcer   zcreated deepspeed aio handlefile	num_byteshandlegdsr   elapsed_sec)!use_gdsmapping_listr   readio_sizeospathisfilegetsizer   r	   gpur   device_nametorchrandintuint8slow_bounce_buffer
pin_memoryr   io_parallelr   load
gds_handle
block_sizequeue_depthsingle_submitsequential_requestspin_device_tensorr   
aio_handleBUFFERBOUNCE_BUFFER)argstidread_op	io_stringr   	device_idfolderfilenamer   r&   r   r,   r   ctxt r?   P/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/nvme/ds_aio_handle.py
pre_handle   sT   $


rA   c                 C      | \}}t ||d}|S )NTrA   pool_paramsr7   r8   r>   r?   r?   r@   pre_handle_readB      rF   c                 C   rB   )NFrC   rD   r?   r?   r@   pre_handle_writeH   rG   rH   c                 C   sV   | \}}}t tfD ]}|| d ur(|d r|d ||  ||   d ||< q	|S )Nr   r   )r5   r6   unpin_device_tensordetach)rE   _r>   bufr?   r?   r@   post_handleN   s   
rM   c           	      C   s   | \}}}|d }t   }|t d urtnt}||| |d |jd}|dks*J |  |tkr=|t j|t j t   }|d  || 7  < |S )Nr   r   Tr   )timer6   r5   preadvalidatewaitdatacopy_)	rE   r7   r8   r>   r   
start_timedest_bufferretend_timer?   r?   r@   main_parallel_readY   s   
rY   c           	      C   s   | \}}}t j|d rt |d  |d }t }|t d ur0t}|t j|t j nt}|	|| |d |j
d}|dksEJ |  t }|d  || 7  < |S )Nr   r   TrN   r   )r!   r"   r#   removerO   r6   rS   rT   r5   pwriterQ   rR   )	rE   r7   r8   r>   r   rU   source_bufferrW   rX   r?   r?   r@   main_parallel_writei   s   
r]   c           	      C   s   | \}}}|d }t   }|t d urtnt}||| |d |j}|dks)J |tkr8|t j|t j t   }|d  || 7  < |S )Nr   r   rN   r   )rO   r6   r5   r   rQ   rS   rT   )	
pool_parmsr7   r8   r>   r   rU   rV   rW   rX   r?   r?   r@   main_handle_read   s   
r_   c           	      C   s   | \}}}t j|d rt |d  |d }t }|t d ur0t}|t j|t j nt}|	|| |d |j
}|dksDJ t }|d  || 7  < |S )Nr   r   rN   r   )r!   r"   r#   rZ   rO   r6   rS   rT   r5   writerQ   )	r^   r7   r8   r>   r   rU   r\   rW   rX   r?   r?   r@   main_handle_write   s   
ra   c                 C   s@   i }|rt |d< t|d< t|d< |S t|d< t|d< t|d< |S )Nprepostmain)rF   rM   rY   rH   r]   )r7   r9   scheduler?   r?   r@   get_schedule   s   rf   c           
      C   s  | \}}}t |j}t||}t|d|  tt| t|d |d ||f}tt| d|d< t|jD ]*}t|d|  t }|d |||f}tt| t }	|d  |	| 7  < q7t|d |d	 |||f}tt| |d |d
 |d |j fS )Nzschedule = zrunning pre-taskrb   r   main_task_seczrunning main task rd   zrunning post-taskrc   r   r   )	lenmapping_dictrf   r	   r
   aio_barrierrangeloopsrO   )
rE   r7   r8   r9   num_processesre   r>   irU   	stop_timer?   r?   r@   _aio_handle_tasklet   s(   








rp   c                 C   s   | a d S )N)rj   )br?   r?   r@   _init_tasklet   s   rr   c                    sv   t  j}t|} fddt|D }t|t|fd}|t|}W d    n1 s.w   Y  t | d S )Nc                    s   g | ]} |fqS r?   r?   ).0pr7   r9   r?   r@   
<listcomp>   s    z.aio_handle_multiprocessing.<locals>.<listcomp>)	processesinitializerinitargs)	rh   ri   r   rk   r   rr   maprp   r   )r7   r9   rm   rq   rE   rt   pool_resultsr?   ru   r@   aio_handle_multiprocessing   s   
r|   )!__doc__r'   r!   rO   multiprocessingr   r   deepspeed.ops.aior   deepspeed.ops.op_builderr   deepspeed.acceleratorr   test_ds_aio_utilsr   r	   r
   r   r   r5   r6   rA   rF   rH   rM   rY   r]   r_   ra   rf   rp   rr   r|   r?   r?   r?   r@   <module>   s.   , 