o
    X۷i"                     @  s   d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dl	Z	d dl
mZ d dlm  mZ d dlm  mZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ G dd dZ G dd dZ!d!dd Z"dS )"    )annotationsN)chain)Any)Iterator)ndarray)Device)Event)Stream)get_current_stream)_modes)_index_arith)_data_transfer)_Communicatorc                   @  s<   e Zd ZU ded< ded< dddZdd
dZdddZdS )_ArrayPlaceholdertuple[int, ...]shaper   devicereturnNonec                 C  s   || _ || _d S N)r   r   )selfr   r    r   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/cupyx/distributed/array/_chunk.py__init__   s   
z_ArrayPlaceholder.__init__	new_shapec                 C  s   t || jS r   )r   r   )r   r   r   r   r   reshape#   s   z_ArrayPlaceholder.reshapemode_modes.Modedtypenumpy.dtyper   c                 C  sh   | j ' |tju rt| j|}n||}t| j||}t	|W  d    S 1 s-w   Y  d S r   )
r   r   REPLICA_creation_basicemptyr   identity_offull_manipulation_dims
atleast_1d)r   r   r   datavaluer   r   r   
to_ndarray&   s   

$z_ArrayPlaceholder.to_ndarrayN)r   r   r   r   r   r   )r   r   r   r   )r   r   r   r   r   r   )__name__
__module____qualname____annotations__r   r   r)   r   r   r   r   r      s   
 

r   c                   @  s   e Zd ZU ded< ded< ded< ded< d	Zd
ed< 				d1d2ddZe		d3d4ddZej	d5ddZ
d6ddZd7d d!Zd8d$d%Zd9d+d,Zd:d-d.Zd;d/d0Zd	S )<_Chunkndarray | _ArrayPlaceholderarrayr   readytuple[slice, ...]indexz#list[_data_transfer._PartialUpdate]updatesNr   
prevent_gcr'   *list[_data_transfer._PartialUpdate] | Noner   r   c                 C  s.   || _ || _|| _|d ur|ng | _|| _d S r   )r0   r1   r3   r4   r5   )r   r'   r1   r3   r4   r5   r   r   r   r   <   s
   
z_Chunk.__init__r   r   r   int | Devicec                 C  s`   t |tr	t|}t||}| t }W d    n1 sw   Y  |d u r)g }t||||S r   )
isinstanceintr   r   r   r.   )clsr   r   r3   r4   r'   r1   r   r   r   create_placeholderH   s   

z_Chunk.create_placeholderIterator[Stream]c                 c  sH    | j j t }|| j |V  W d    d S 1 sw   Y  d S r   )r0   r   r
   
wait_eventr1   )r   streamr   r   r   on_readyY   s   
"z_Chunk.on_readyupdate_data_transfer._AsyncDataidxc                 C  s   | j ||f d S r   )r4   append)r   r@   rB   r   r   r   
add_update`   s   z_Chunk.add_updatec                 C  sp   t | jtr| j}| j}n|  }| j }| }W d    n1 s%w   Y  t||| jt	| j
| jdS )N)r5   )r8   r0   r   r1   r?   copyrecordr.   r3   listr4   r5   )r   r'   r1   r>   r   r   r   rE   e   s   


z_Chunk.copyr   r   c                 C  s   t | jdkr	dS t| jtr | jd d jj}| j||| _|  @}| jD ]#\}}||j	 |t
ju r>|j| j|< q(|| j| |j| j|< q(|| j	 | j| jf| _g | _W d   dS 1 sgw   Y  dS )zApply all updates in-place.r   N)lenr4   r8   r0   r   r   r)   r?   r=   r1   r   r    funcrF   r5   )r   r   r   r>   update_datarB   r   r   r   flushr   s"   

"z_Chunk.flushtargetcomms'dict[int, _data_transfer._Communicator]streamsdict[int, Stream]c                 C  sJ  | }|}t |jdksJ t|jtsJ |jjj}|jjj}	|j}
|j}t	|
||}|d u r2d S t
|
||}t
|||}t|j| |j|j}|tjurX|jsX| }t|| || |||	 ||	 |	}||| |tjur|js|jj}| }|||j|< ||j W d    d S 1 sw   Y  d S d S d S )Nr   )rH   r4   r8   r0   r   r   idr3   r   _index_intersection_index_for_subindexr   
_AsyncDatar1   r5   r   r    
idempotentrE   	_transferrD   r   r?   r#   rF   )r   rL   r   r   rM   rO   	src_chunk	dst_chunksrc_devdst_devsrc_idxdst_idxintersectionsrc_new_idxdst_new_idxdata_to_transferr@   r   r>   r   r   r   apply_to   sJ   


"z_Chunk.apply_toc                 C  s   t | jtsJ t| j||}|d u rd S t| j||}|  }|| j|< || j	 W d    d S 1 s9w   Y  d S r   )
r8   r0   r   r   rR   r3   rS   r?   rF   r1   )r   rB   r   identityr]   self_new_idxr>   r   r   r   set_identity_on_intersection   s   

"z#_Chunk.set_identity_on_intersectionc                 C  sd   t | jtrd S |  }| jD ]	\}}|| j|< q|| j W d    d S 1 s+w   Y  d S r   )r8   r0   r   r?   r4   rF   r1   )r   rb   r>   _rB   r   r   r   #set_identity_on_overwritten_entries   s   
"z*_Chunk.set_identity_on_overwritten_entries)NN)r'   r/   r1   r   r3   r2   r4   r6   r5   r   r   r   r   )
r   r   r   r7   r3   r2   r4   r6   r   r.   )r   r<   )r@   rA   rB   r2   r   r   )r   r.   )r   r   r   r   )rL   r.   r   r   r   r   rM   rN   rO   rP   r   r   )rB   r2   r   r   r   r   )r   r   )r*   r+   r,   r-   r5   r   classmethodr;   
contextlibcontextmanagerr?   rD   rE   rK   ra   rd   rf   r   r   r   r   r.   3   s(   
 




0r.   op_mode_modes._OpModer   r   	chunk_mapdict[int, list[_Chunk]]rM   dict[int, _Communicator]rO   rP   r   r   c           
   	   C  s   t t| }tt|D ]$}|| }||  t|d t|D ]}|| }	||	| ||| q#qtt|d ddD ]!}|| }|tj	 t|D ]}|| }	||	tj	||| qNq>d S )N   )
rG   r   from_iterablevaluesrangerH   rK   ra   r   r    )
rj   r   rl   rM   rO   chunks_listirW   jrX   r   r   r   _all_reduce_intersections   s$   
rw   )rj   rk   r   r   rl   rm   rM   rn   rO   rP   r   r   )#
__future__r   rh   	itertoolsr   typingr   collections.abcr   numpycupy._core.corer   cupy._creation.basic	_creationbasicr!   cupy._manipulation.dims_manipulationdimsr%   cupy.cuda.devicer   cupy.cuda.streamr   r	   r
   cupyx.distributed.arrayr   r   r   &cupyx.distributed.array._data_transferr   r   r.   rw   r   r   r   r   <module>   s*     