o
    Ti                     @   s\   d Z ddlZddlmZ ddlmZ ddlmZm	Z	 ddl
mZ dZdZG d	d
 d
eZdS )zC
Functionality of swapping tensors to/from (NVMe) storage devices.
    N)comm)logger)swap_out_tensors
SwapBuffer)get_acceleratorasync_swap_gradient_waitc                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd+d(d)Zd*S ),AsyncTensorSwapperc                 C   sT   g | _ g | _g | _t| _g | _|| _|| _d| _d| _	|| _
t | _d| _d | _d S Nr   )free_buffer_indexswapping_buffer_indexready_buffer_indexINVALID_BUFFER_INDEXcurrent_buffer_indexall_buffers
aio_handlenumel_alignment	max_numelnum_pending_swapstimerssettimer_namesnum_elements_swappeddtype)selfr   r   r    r   _/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/swap_tensor/async_swapper.py__init__   s   
zAsyncTensorSwapper.__init__c                 C   s   t | jdkS r
   )lenr   r   r   r   r   has_buffers$   s   zAsyncTensorSwapper.has_buffersc                    s   t | jdks	J tdd |D sJ |d j t fdd|D s&J  | _dd |D | _|  jdd tt | jD 7  _tdd |D | _t | _	d S )Nr   c                 S   s   g | ]}t  |qS r   )r   	is_pinned.0bufferr   r   r   
<listcomp>)   s    z2AsyncTensorSwapper.add_buffers.<locals>.<listcomp>c                    s   g | ]}|j  kqS r   r   r"   r&   r   r   r%   +   s    c                 S   s   g | ]}t |qS r   )r   r"   r   r   r   r%   .       c                 S   s   g | ]}|qS r   r   )r#   ir   r   r   r%   /   s    c                 S   s   g | ]}|  qS r   )numelr"   r   r   r   r%   0   r'   )
r   r   allr   r   rangemaxr   r   r   )r   buffer_listr   r&   r   add_buffers'   s   
"zAsyncTensorSwapper.add_buffersc                 C   s
   t | jS N)listr   r   r   r   r   get_timer_names3   s   
z"AsyncTensorSwapper.get_timer_namesc                 C   sN   |  d |   |  d dd | jD }g | _g | _t| _d| _d | _|S )NzSwapped out[Before flush]zSwapped out[After flush]c                 S   s   g | ]}|j qS r   )r$   )r#   bufr   r   r   r%   ;   s    z6AsyncTensorSwapper.release_buffers.<locals>.<listcomp>r   )_report_statistics_flush_buffers_until_completer   r   r   r   r   r   )r   pinned_buffersr   r   r   release_buffers6   s   

z"AsyncTensorSwapper.release_buffersc                 C   s$   t ||D ]
\}}| || qd S r/   )zip_swap_out_tensor)r   tensor_list	path_listtensor	swap_pathr   r   r   r   D   s   z#AsyncTensorSwapper.swap_out_tensorsc                 C   sV   t  dkr)tjg | jd }| j| d }t| d| j d|dd d S d S )Nr   r&   i   @z num_elems = z, z5.2fz GB)	distget_ranktorchr;   r   element_sizer   r   debug)r   messager@   
swapped_GBr   r   r   r3   H   s
   $z%AsyncTensorSwapper._report_statisticsc                 C   s`   t | jdks	J | | }|| jksJ | | | jtks#J |  }|	||| d S r
   )
r   r   _io_aligned_numelr)   r   _make_swap_spacer   r   _get_current_bufferinsert_tensor)r   r;   r<   aligned_numelswap_bufferr   r   r   r8   N   s   
z#AsyncTensorSwapper._swap_out_tensorc                 C   sT   | j tkr|   d S |  |s(t| jdkr|   n|   |   d S d S r
   )	r   r   _allocate_bufferrF   	has_spacer   r   _flush_ready_buffersr4   )r   r)   r   r   r   rE   Z   s   

z#AsyncTensorSwapper._make_swap_spacec                 C   s$   || j  }|dkr|S || j  | S r
   )r   )r   r)   	remainderr   r   r   rD   f   s   
z$AsyncTensorSwapper._io_aligned_numelc                 C   sR   | j tksJ t| jdksJ t| jdksJ | jd | _ | jd d | _d S )Nr   r   )r   r   r   r   r   r   r   r   r   rJ   j   s
   z#AsyncTensorSwapper._allocate_bufferc                 C   s*   | j tkr| j| j  t| _ |   d S r/   )r   r   r   append_swap_out_ready_buffersr   r   r   r   rL   q   s   
z'AsyncTensorSwapper._flush_ready_buffersc                 C   sP   |    t| jdksJ |   t| jdksJ t| jt| jks&J d S r
   )rL   r   r   _wait_for_swap_completer   r   r   r   r   r   r   r4   x   s
   z0AsyncTensorSwapper._flush_buffers_until_completec                 C   s`   | j D ]}| |}| }| }|  jt|7  _t| j|| q|  j| j 7  _g | _ d S r/   )	r   _get_bufferget_swap_tensorsget_swap_pathsr   r   r   r   r   )r   buffer_indexr$   swap_tensors
swap_pathsr   r   r   rO      s   


z*AsyncTensorSwapper._swap_out_ready_buffersc                 C   s   t | jdks	J | t | j | jksJ | t | j	t d| _| jD ]}| 
|}|  j| 7  _|  q)|  j| j7  _t | jt | jksRJ g | _d S r
   )r   r   _start_timerASYNC_SWAPPER_WAIT_TIMERr   waitr   _stop_timerr   addrQ   r   get_num_elemresetr   r   )r   rT   r$   r   r   r   rP      s   





z*AsyncTensorSwapper._wait_for_swap_completec                 C   s   |t ksJ | j| S r/   )r   r   )r   indexr   r   r   rQ      s   
zAsyncTensorSwapper._get_bufferc                 C   s   |  | jS r/   )rQ   r   r   r   r   r   rF      s   z&AsyncTensorSwapper._get_current_bufferc                 C      | j r|  |  d S d S r/   )r   startr   namer   r   r   rW         zAsyncTensorSwapper._start_timerc                 C   r_   r/   )r   stopra   r   r   r   rZ      rc   zAsyncTensorSwapper._stop_timerFc                 C   s"   | j r|r| j | d S d S d S r/   )r   log)r   	name_listforcer   r   r   _log_timers   s   
zAsyncTensorSwapper._log_timersN)F)__name__
__module____qualname__r   r    r.   r1   r6   r   r3   r8   rE   rD   rJ   rL   r4   rO   rP   rQ   rF   rW   rZ   rh   r   r   r   r   r	      s*    r	   )__doc__r?   	deepspeedr   r=   deepspeed.utils.loggingr   #deepspeed.runtime.swap_tensor.utilsr   r   deepspeed.acceleratorr   r   rX   objectr	   r   r   r   r   <module>   s   