o
    ߥi4                     @   s  d dl Z d dlZd dlmZ d dlZd dlZd dlmZ	 d dl
m  mZ d dlmZ g dZdd ZdLddZdLd	d
ZdLddZdd ZdLddZdLddZe	jjdfddZe	jjdfddZdMddZdNddZe dOddZe   dd  Z!d!d" Z"d#d$ Z#dLd%d&Z$dMd'd(Z%dPd)d*Z&e	jjdfd+d,Z'dLd-d.Z(dQd/d0Z)dLd1d2Z*dQd3d4Z+dLd5d6Z,d7d8 Z-d9d: Z.d;d< Z/G d=d> d>eZ0G d?d@ d@eZ1G dAdB dBeZ2G dCdD dDeZ3e0j4Z5e1j4Z6e2j4Z7e3j4Z8e dRdFdGZ9e dSdJdKZ:dS )T    N)OrderedDict)Function)is_dist_initializedget_world_sizeget_rank	new_groupdestroy_process_groupbarrier	broadcast
all_reducereducegather
all_gatherreduce_dictget_global_gloo_groupgeneralized_all_gathergeneralized_gatherscatterreduce_scattersendrecvisendirecvshared_random_seeddiff_all_gatherdiff_all_reducediff_scatter	diff_copyspherical_kmeanssinkhornc                   C   s   t  ot  S N)distis_availableis_initialized r$   r$   o/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/models/multi_modal/videocomposer/ops/distributed.pyr      s   r   c                 C      t  rt| S dS N   )r   r!   r   groupr$   r$   r%   r         r   c                 C   r&   )Nr   )r   r!   r   r)   r$   r$   r%   r       r+   r   c                 K   s   t  rtj| fi |S d S r    )r   r!   r   )rankskwargsr$   r$   r%   r   $   s   r   c                   C   s   t  r	t  d S d S r    )r   r!   r   r$   r$   r$   r%   r   *   s   r   c                 K   s&   t | dkrtj| fi | d S d S r'   )r   r!   r	   )r*   r-   r$   r$   r%   r	   /      r	   c                 K   &   t |dkrtj| ||fi |S d S r'   )r   r!   r
   tensorsrcr*   r-   r$   r$   r%   r
   4   r.   r
   c                 K   r/   r'   )r   r!   r   )r1   opr*   r-   r$   r$   r%   r   9   r.   r   c                 K   (   t |dkrtj| |||fi |S d S r'   )r   r!   r   )r1   dstr3   r*   r-   r$   r$   r%   r   >   s   r   c                    sZ   t  }t|}|dkr gS ||kr fddt|D nd }tj |||fi | |S )Nr(   c                       g | ]}t  qS r$   torch
empty_like.0_r1   r$   r%   
<listcomp>H   s    zgather.<locals>.<listcomp>)r   r   ranger!   r   )r1   r5   r*   r-   rank
world_sizetensor_listr$   r=   r%   r   C   s   
r   Tc                    s
  t |}|dkr gS   sJ d|r- fddt|D }tj| |fi | |S t j}t||} d t	t
|}dd |D }	t|	}
||
krb |
| }tj |gdd  fd	dt|D }tj| |fi | d
d t||	|D }|S )Nr(   z5ops.all_gather requires the tensor to be contiguous()c                    r6   r$   r7   r:   r=   r$   r%   r>   V       zall_gather.<locals>.<listcomp>c                 S   s   g | ]	}t t|qS r$   )intnpprod)r;   ur$   r$   r%   r>   a   s    r   dimc                    r6   r$   r7   r:   r=   r$   r%   r>   j   rC   c                 S   s$   g | ]\}}}|d |  |qS r    )view)r;   tnsr$   r$   r%   r>   n   s    )r   is_contiguousr?   r!   r   tupleshaper   reshaperE   rF   rG   max	new_zerosr8   catzip)r1   uniform_sizer*   r-   rA   rB   rQ   
shape_listsize	size_listmax_sizepaddingr$   r=   r%   r   N   s2   




r   meanc                    s   |dv sJ t |}|dkr S t trt j}nt  } fdd|D }tj|dd}tj	|fd|d| t
|dkrL|dkrL|| }tj|fd|d	| t d
d t||D }|S )N)r]   sumr(   c                    s   g | ]} | qS r$   r$   )r;   key
input_dictr$   r%   r>      s    zreduce_dict.<locals>.<listcomp>r   rI   r5   r*   r]   )r2   r*   c                 S   s   g | ]\}}||fqS r$   r$   )r;   r_   valr$   r$   r%   r>      s    )r   
isinstancer   listkeyssortedr8   stackr!   r   r   r
   typerV   )ra   r*   	reductionr-   rA   rf   valsreduced_dictr$   r`   r%   r   u   s"   
r   c                  C   s0   t  } | dv s
J | dkrt jddS t jjS )Nglooncclro   rn   backend)r!   get_backendr   r*   WORLDrp   r$   r$   r%   r      s
   r   c                 C   s   t |}|dv sJ t|dkrdnd}t| }t|dkr5tt	}|
dt t|d | tj|}t|j|d}|S )Nrm   rn   cpucudai   @z:Rank {} trying to all-gather {:.2f} GB of data on device{})device)r!   rr   r8   rv   pickledumpslenlogging	getLogger__name__warningformatr   ByteStoragefrom_buffer
ByteTensorto)datar*   rq   rv   bufferloggerstorager1   r$   r$   r%   _serialize_to_tensor   s   


r   c                    s   t j|d}|dksJ dtj  gtj jd} fddt|D }t j|||d dd |D }t	|}||krStj
|| ftj jd}tj |fdd	 | fS )
Nr)   r(   zAgather/all_gather must be called from ranks withinthe give group!dtyperv   c                    s"   g | ]}t jd gt j jdqS )r(   r   )r8   zerosint64rv   r:   r=   r$   r%   r>          z*_pad_to_largest_tensor.<locals>.<listcomp>c                 S   s   g | ]}t | qS r$   )rE   item)r;   rY   r$   r$   r%   r>      s    r   rI   )r!   r   r8   r1   numelr   rv   r?   r   rS   r   uint8rU   )r1   r*   rA   
local_sizerZ   r[   r\   r$   r=   r%   _pad_to_largest_tensor   s*   

r   c                    s   t |dkr	| gS |d u rt }t| |t|\}t|  fdd|D }tj||d g }t||D ]\} 	 
 d | }|t| q9|S )Nr(   c                    "   g | ]}t j ft jjd qS r   r8   emptyr   rv   r:   r[   r1   r$   r%   r>      r   z*generalized_all_gather.<locals>.<listcomp>r)   )r   r   r   r   rS   r!   r   rV   rt   numpytobytesappendrw   loads)r   r*   rZ   rB   	data_listrY   r   r$   r   r%   r      s    
r   c           
         s   t |}|dkr| gS |d u rt }t }t| |t|\}||kr_t|  fdd|D }tj|||d g }t||D ]\}	 
  d | }	|t|	 qD|S tjg ||d g S )Nr(   c                    r   r   r   r:   r   r$   r%   r>      r   z&generalized_gather.<locals>.<listcomp>rb   )r   r   r!   r   r   r   rS   r   rV   rt   r   r   r   rw   r   )
r   r5   r*   rA   r@   rZ   rB   r   rY   r   r$   r   r%   r      s*   
r   c                 K   s(   t |dkrtj| |||fi |S dS )z2NOTE: only supports CPU tensor communication.
    r(   N)r   r!   r   )r   scatter_listr2   r*   r-   r$   r$   r%   r      s   r   c                 K   r4   r'   )r   r!   r   )output
input_listr3   r*   r-   r$   r$   r%   r      s   r   c                 K   6   t |dkr|  sJ dtj| ||fi |S d S )Nr(   z/ops.send requires the tensor to be contiguous())r   rO   r!   r   r1   r5   r*   r-   r$   r$   r%   r        
r   c                 K   r   )Nr(   z/ops.recv requires the tensor to be contiguous())r   rO   r!   r   r0   r$   r$   r%   r     r   r   c                 K   r   )Nr(   z0ops.isend requires the tensor to be contiguous())r   rO   r!   r   r   r$   r$   r%   r     r   r   c                 K   r   )Nr(   z0ops.irecv requires the tensor to be contiguous())r   rO   r!   r   r0   r$   r$   r%   r     r   r   c                 C   s   t jd}t|| }|d S )Nl        r   )rF   randomrandintr   )r*   seed	all_seedsr$   r$   r%   r   $  s   
r   c                    sl   t  rt  rt  dkr S t  }t  } fddt|D } ||< t |  tj|dd	 S )Nr(   c                    r6   r$   r7   r:   xr$   r%   r>   0  rC   z_all_gather.<locals>.<listcomp>r   rI   )
r!   r"   r#   r   r   r?   r   r8   rU   
contiguous)r   r@   rA   tensorsr$   r   r%   _all_gather*  s   r   c                 C   s.   t  rt  rt  dkr| S t |  | S r'   )r!   r"   r#   r   r   r   r$   r$   r%   _all_reduce6  s   
r   c                 C   sF   t  rt  rt  dkr| S t  }t  }| j|dd|  S )Nr(   r   rI   )r!   r"   r#   r   r   chunkr   )r   r@   rA   r$   r$   r%   _split>  s   r   c                   @   4   e Zd ZdZedd Zedd Zedd ZdS )	DiffAllGatherzDifferentiable all-gather.
    c                 C      t |S r    r   graphinputr$   r$   r%   symbolicK     zDiffAllGather.symbolicc                 C   r   r    r   ctxr   r$   r$   r%   forwardO  r   zDiffAllGather.forwardc                 C   r   r    r   r   grad_outputr$   r$   r%   backwardS  r   zDiffAllGather.backwardNr|   
__module____qualname____doc__staticmethodr   r   r   r$   r$   r$   r%   r   G      

r   c                   @   r   )	DiffAllReducezDifferentiable all-reducd.
    c                 C   r   r    r   r   r$   r$   r%   r   \  r   zDiffAllReduce.symbolicc                 C   r   r    r   r   r$   r$   r%   r   `  r   zDiffAllReduce.forwardc                 C      |S r    r$   r   r$   r$   r%   r   d     zDiffAllReduce.backwardNr   r$   r$   r$   r%   r   X  r   r   c                   @   s(   e Zd ZdZedd Zedd ZdS )DiffScatterzDifferentiable scatter.
    c                 C   r   r    r   r   r$   r$   r%   r   m  r   zDiffScatter.symbolicc                 C   r   r    r   r   r$   r$   r%   r   q  r   zDiffScatter.backwardN)r|   r   r   r   r   r   r   r$   r$   r$   r%   r   i  s    
r   c                   @   r   )	DiffCopyzDDifferentiable copy that reduces all gradients during backward.
    c                 C   r   r    r$   r   r$   r$   r%   r   z  r   zDiffCopy.symbolicc                 C   r   r    r$   r   r$   r$   r%   r   ~  r   zDiffCopy.forwardc                 C   r   r    r   r   r$   r$   r%   r     r   zDiffCopy.backwardNr   r$   r$   r$   r%   r   v  r   r   
   c                 C   sH  |g|   R \}}}| j|tjd}t }t|d tt||  }tj	t
| | ddd | }	| ||}
| j|tjd}t|d D ]U}t| |	 }|jdd\}}||kra n>|
 d|dd||  t|
 |  |d|| t| |dk}|
| || dd |	|< tj|	ddd}	qI|	||fS )N)r   r   rI   r(   rD      )prJ   )rY   new_onesr8   longr   randpermrE   rF   ceilrU   r   rT   r?   mmrL   rS   zero_scatter_add_	unsqueezerepeatr   
index_add_rK   F	normalize)featsnum_clusters	num_iterskrM   conesrA   	rand_indsclustersnew_clusterscountsstepsimmatscoresassignsmaskr$   r$   r%   r     s0    

r         ?   c                 C   s   t | |  } |  }t| | | } |  \}}| |}| || }| ||t   }| jdd}	t|	 t	|D ]$}
|	}| || 
d9 } | || jdd 
d9 } | jdd}	t|	 q?| | jddd   S )Nr(   rI   r   T)rJ   keepdim)r8   exprL   r^   r   rY   rT   r   r   r?   r   float)Qepsr   sum_QrM   mrH   rr   cur_sumir$   r$   r%   r     s"   

r   r    )r   N)TN)Nr]   )Nr   N)NN)r   )r   r   );	functoolsrw   collectionsr   r   rF   r8   torch.distributeddistributedr!   torch.nn.functionalnn
functionalr   torch.autogradr   __all__r   r   r   r   r   r	   r
   ReduceOpSUMr   r   r   r   no_gradr   	lru_cacher   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   applyr   r   r   r   r   r   r$   r$   r$   r%   <module>   sh   






'
	


	

	



	'