o
    Ti-                     @   s"  d Z ddlZddlmZmZ ddlZddlmZ ddlmZ	 ddl
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddedefddZdaee dee di dee fddZee 	ddee di dedee fddZee 	ddee dedee fddZdS )zZ
batched collective operations for overhead amortization and better
bandwidth utilization
    N)ListAny)Tensor)comm)ProcessGroupall_to_all_single)get_accelerator)instrument_w_nvtx)
op_builder)loggerFinput_tensoroutput_tensorc                 C   s   t tj|| |ddS )NF)groupasync_op)r	   distreduce_scatter_fn)r   r   r   r   prof r   `/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/comm/coalesced_collectives.py_torch_reduce_scatter_fn   s   r   tensorsgroupsreturnc              
   C   s  t d u r
t  a t  }t }|| }t }t	|| }|| }d gt
|  }t| D ]\}	}
|
 dkrDt|
gd ||	< q0|
 d|  dkrhtd|
  dd|  d t|
gd ||	< q0t|
jd |
jd |}|| }t |
|dt jd||\}}t|}t|}t|||d|  d	 t|||d|  d	 t ||||dt j|\}}t|}t|}t|||d
|  d	 t|||d
|  d	 t ||| dt j}| | dksJ d|  d| tt||| d||	< q0|S )N   r      7qgZ falls back to reduce_scatter because tensor size = / is not divisible by (2 * global_world_size) = 6. Please consider allocating a new world to enable qgZ   local_r   global_final_output.numel()= is not divisible by num_nodes=)quantizer_moduler
   QuantizerBuilderloadr   device_countr   get_world_sizeget_rankintlen	enumeratedimreduce_scatter_coalescednumelr   warningmaxshapeswizzle_quant	Symmetrictorch
empty_liker   quantized_reduction
dequantizesumlistchunkview)r   r   local_world_sizeglobal_world_size	num_nodes	this_rank	intra_idx	inter_idx
output_lstidxtensorintra_quant_groupinter_quant_groupintra_quant_int4intra_q_scaleslocal_outputscale_outputglobal_input_tensorglobal_scalesglobal_outputglobal_scale_outputfinal_outputr   r   r   all_to_all_quant_reduce   s^   





"rR   params
loco_paramc                 C   s  t d u r
t  a t  }t }|| }t }t	|| }|| }d gt
|  }	t| D ]J\}
}|j}| dkrHt|gd |	|
< q0| d|  dkrltd|  dd|  d t|gd |	|
< q0|d }|d }t|d	r}t|krdat|j}tj| | |j|jd
}n*t |jd |jd |jd  dt j}t |jd |jd |jd  dt j}t|jd |jd |}|| }t ||||dt jd||	\}}t |}t |}t!|||d|  d t!|||d|  d t "||dt j|_t #||||||dt j|	\}}t |}t |}t!|||d|  d t!|||d|  d t "||dt j|_t ||| dt j}| | dksgJ d|  d| t$t%|&|| 'd|	|
< td7 aq0|	S )Nr   r   r   r   r   r   err_betareset_Tintra_ef_buf)devicedtype   r   r   r    r!   r"   r#   r$   )(r%   r
   r&   r'   r   r(   r   r)   r*   r+   r,   r-   gradr.   r/   r0   r   r1   hasattrloco_idxr6   
zeros_likezerosrX   rY   r9   rW   r5   inter_ef_bufr2   r3   loco_swizzle_quantr7   r   quantizeloco_quantized_reductionr:   r;   r<   r=   )rS   r   rT   r>   r?   r@   rA   rB   rC   rD   rE   prF   rU   rV   	intra_err	inter_errrG   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   r   r   r   all_to_all_loco_quant_reduceO   s   	





 
rg   r   c              	      s  t |}t |dgt|  }t| D ]!\}}|dt|    fddt	dD ||< qt
fdd| D }t| dkrZ| d   dkrZ| d d}n=g }t	D ]/}	t	t| D ]&}|| |	 }
||
 || |
  }|dkr|tj||
j|
jd	 qhq`ttj|}| t|}t||| |d
 dgt|  }d}t	t| D ]}|| d||| |  ||< ||| 7 }q|S )zsimultaneously reduce-scatter a list of tensors - this can be done more
    efficiently than individual reduce scatter calls
    TODO. see if PyTorch team wants a c++ version of this for ProcessGroupNCCL
    Nr$   c                    s$   g | ]}|  |     qS r   r   ).0rank)chunk_szflattened_tensorr   r   
<listcomp>   s    z,reduce_scatter_coalesced.<locals>.<listcomp>r   c                 3   s"    | ]}t |   V  qd S N)mathceilr0   )rh   t)world_szr   r   	<genexpr>   s     z+reduce_scatter_coalesced.<locals>.<genexpr>r   )rY   rX   r    )r   r*   r)   r,   r-   r=   rn   ro   r0   rangetupleappendr6   emptyrY   rX   r	   catdiv_r<   r   narrow)r   r   rA   partition_lst_for_each_tensor
tensor_idxrF   #padded_partition_sz_for_each_tensortensor_partition_flat_buffer"tensor_partitions_lst_with_paddingri   tensor_chunk
padding_sz%tensor_partition_buffer_for_each_rankrD   offsetr   )rj   rk   rq   r   r/      sN   



 

r/   )NFFrm   )__doc__rn   typingr   r   r6   r   	deepspeedr   r   deepspeed.commr   r   deepspeed.acceleratorr   deepspeed.utilsr	   deepspeed.opsr
   r   r   r%   no_gradrR   rg   r/   r   r   r   r   <module>   sL   "0K