o
    X۷i*                     @  s   d dl mZ d dlZd dlmZ d dlmZ d dlZd dlm	  m
Z d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d.ddZd/ddZd0d d!Zd1d#d$Zd1d%d&Zd2d(d)Zd3d,d-ZdS )4    )annotationsN)Sequence)chain)ndarray)Device)Stream)get_current_stream)_array)_chunk)_data_transfer)_index_arith)_modesargs!Sequence[_array.DistributedArray]kwargs"dict[str, _array.DistributedArray]devintchunk_ireturn#list[_data_transfer._PartialUpdate]c           	      C  s   g }d}t | | D ]}|j| | j}|r|rd} n|}q|r$|S t | | D ]}t |j D ]}||j q5q+g S NTF)r   values_chunks_mapupdatesfrom_iterableflush_mode)	r   r   r   r   r   at_most_one_updateargupdates_nowchunk r"   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/cupyx/distributed/array/_elementwise.py_find_updates   s"   
r$   streamr   (tuple[list[ndarray], dict[str, ndarray]]c                   s@   fdd  fdd|D } fdd|  D }||fS )Nc                   s    | j    }|j |jS N)r   
wait_eventreadyarray)d_arrayr!   )r   r   r%   r"   r#   access_array9   s   z+_prepare_chunks_array.<locals>.access_arrayc                   s   g | ]} |qS r"   r"   .0r   r,   r"   r#   
<listcomp>>   s    z)_prepare_chunks_array.<locals>.<listcomp>c                   s   i | ]	\}}| |qS r"   r"   )r.   keyr   r/   r"   r#   
<dictcomp>?   s    z)_prepare_chunks_array.<locals>.<dictcomp>)items)r%   r   r   r   r   
arg_arrayskwarg_arraysr"   )r,   r   r   r%   r#   _prepare_chunks_array3   s   r6   list[_array.DistributedArray]Nonec                 C  s2   dd | D | d d < | dd | D  d S )Nc                 S  s   g | ]}| tjqS r"   _to_op_moder   REPLICAr-   r"   r"   r#   r0   G   s    z/_change_all_to_replica_mode.<locals>.<listcomp>c                 s  s$    | ]\}}|| tjfV  qd S r'   r9   )r.   kr   r"   r"   r#   	<genexpr>H   s    
z._change_all_to_replica_mode.<locals>.<genexpr>)updater3   )r   r   r"   r"   r#   _change_all_to_replica_modeD   s   
r?   _array.DistributedArrayc              	   C  s  t |}t|| d }i }|p| D ]}|j} | D ]\}}g ||< t| t }	t|D ]\}
}t||||
}t	|	||||
\}}d }t
|| D ]}t|tjrg|d u s]J tj|j|j|}qO|d u r| |i |}|j}tj||	 |||fd}|| | |sq1d gt| }i }|D ]^\}}t|D ]\}}t|tjr|j||< q|| ||< q| D ]\}}t|tjr|j||< q|| ||< q|	|j | |i |}|j}|	 }tj||||fd}||| qq1W d    n	1 sw   Y  qt
| D ]}t|jttjfs tdqd  }}|p,| D ]}|j}|j} |d us=J t !|||t"j#|S )N
prevent_gc;Kernels returning other than single array are not supported)$listr?   r   	index_mapr3   r   r   	enumerater$   r6   r   
isinstancer
   _ArrayPlaceholder_Chunkcreate_placeholdershapedevicedtyperecordappendlenr*   r(   r)   r   
_AsyncData
add_updater   r   RuntimeError_commsr	   DistributedArrayr   r;   )kernelr   r   	out_dtypeout_chunks_mapr   rE   r   idxsr%   r   idxr   r4   r5   	out_chunkdata	out_array
arg_sliceskwarg_slicesr>   ir<   out_update_arrayr)   
out_updater!   rK   commsr"   r"   r#   _execute_kernelM   s   




<rd   c              
   C  s  t |dksJ t |dkrtd|rtdt|}t|D ]\}}|tj||< t|| j	
 D ]}|tj q6q |\}}t| tjjjro| j|j|jf}|du rktd| j d|jj|jjf |j}	nt| tjjjsyJ | |jj|jjfd\}
}	}
t |	dkrt|	 td	|	d
 }|j}|j}i }t|j	
 D ]}|jjj}| }t !|jj|}t|j	
 D ]i}t"#|j$|j$|}|du rq|jjj}tj%j&'||dkrt()|||j$|j|j}n	tjj*|j| |+|j, t"-|j$||}t"-|j$||}| j.dksJ | t/0t1|j| t/0t1|j| ||  qt2j3||4 |j$|j	d}|5|g 6| W d   n	1 sQw   Y  qt(7|||tj|S )z&Arguments must be in the replica mode.   zsElement-wise operation over more than two distributed arrays is not supported unless they share the same index_map.zLKeyword argument is not supported unless arguments share the same index_map.Nz#Could not guess the return type of z with arguments of type r"      rC   r   rA   )8rP   rS   rD   rF   r:   r   r;   r   r   r   r   r   rG   cupy_core_kernelufunc_ops_guess_routine_from_in_typesrM   nametype	out_typesElementwiseKernel_decide_params_typeprintrK   rT   r*   rL   idon_ready_creation_basicemptyr   _index_intersectionindexcudaruntimedeviceCanAccessPeerr	   _make_chunk_async_check_peer_accessr(   r)   _index_for_subindexnintypingcastr   r
   rI   rN   
setdefaultrO   rU   )rV   r   r   r`   r   r!   abopro   _rM   rK   rc   rX   a_chunka_devr%   r]   b_chunkintersectionb_dev	a_new_idx	b_new_idxr[   r"   r"   r#   _execute_peer_access   s   





#r   boolc                 C  s<   d }t | | D ]}|d u r|j}q	|j|kr dS q	dS r   )r   r   rE   )r   r   rE   r   r"   r"   r#   _is_peer_access_needed  s   
r   tupledictc                 C  sN   t || D ]}t|tjstdqt||}|r!t| ||S t| ||S )NzFMixing a distributed array with a non-distributed one is not supported)	r   r   rG   r	   rU   rS   r   r   rd   )rV   r   r   r   needs_peer_accessr"   r"   r#   _execute  s   
r   )
r   r   r   r   r   r   r   r   r   r   )r%   r   r   r   r   r   r   r   r   r   r   r&   )r   r7   r   r   r   r8   )r   r   r   r   r   r@   )r   r   r   r   r   r   )r   r   r   r   ) 
__future__r   r   collections.abcr   	itertoolsr   rg   cupy._creation.basic	_creationbasicru   cupy._core.corer   cupy.cuda.devicer   cupy.cuda.streamr   r   cupyx.distributed.arrayr	   r
   r   r   r   r$   r6   r?   rd   r   r   r   r"   r"   r"   r#   <module>   s,    



	
a
X