o
    oi!                     @   s   d dl Z d dlZd dlmZmZmZmZ d dlZd dlm	Z
 d dlmZ d dlmZmZ d dlmZ d dlmZmZmZ e
 rGd dlmZ ne jdd	ZG d
d deZdS )    N)AnyListOptionalUnion)Tensor)Selfoverride)
Collective)CollectibleGroup	RedOpTypeReduceOp)default_pg_timeouti  )secondsc                       s  e Zd ZdZdZdX fddZeedef fddZ	eede
fd	d
Zeede
fddZedede
defddZedYdedeeeef defddZedYdede
deeeef defddZedee dedee fddZedZdedee de
dee fddZedZdedee de
defd d!Ze	dYd"ed#ee deeeef defd$d%Zed&ee d'ee dee fd(d)ZedZdede
d*e
ddfd+d,Zed[dedee
 d*e
defd-d.Zd/ee d0edee fd1d2Z	d\d/ee de
d3ee j! dee fd4d5Z"dZd0ed6ee de
dee fd7d8Z#	dZd9ee d:ee de
dee fd;d<Z$ed\d=eee
  ddfd>d?Z%d]d@ee&j' dAe(ddfdBdCZ)ed^dDee dEee dFede*f fdGdHZ+ede*f fdIdJZ,e-ede(fdKdLZ.e-ede(fdMdNZ/e-edFeddfdOdPZ0e-edFedefdQdRZ1e-edSeddfdTdUZ2e-edeeeef deeef fdVdWZ3  Z4S )_TorchCollectivezCollective operations using `torch.distributed <https://pytorch.org/docs/stable/distributed.html>`__.

    .. warning:: This is an :ref:`experimental <versioning:Experimental API>` feature which is still in development.

    FreturnNc                    s   t  stdt   d S )Nz#Torch distributed is not available.)distis_availableRuntimeErrorsuper__init__self	__class__ i/home/ubuntu/.local/lib/python3.10/site-packages/lightning/fabric/plugins/collectives/torch_collective.pyr      s   zTorchCollective.__init__c                    s   | j d u r
tjj| _ t jS N)_groupr   GroupMemberWORLDr   groupr   r   r   r   r    !   s   

zTorchCollective.groupc                 C      t | jS r   )r   get_rankr    r   r   r   r   rank(   s   zTorchCollective.rankc                 C   r!   r   )r   get_world_sizer    r   r   r   r   
world_size.   s   zTorchCollective.world_sizetensorsrcc                 C      t j||| jd |S N)r    )r   	broadcastr    )r   r&   r'   r   r   r   r*   3      zTorchCollective.broadcastsumopc                 C   s    |  |}tj||| jd |S N)r-   r    )_convert_to_native_opr   
all_reducer    )r   r&   r-   r   r   r   r0   8   s   
zTorchCollective.all_reducedstc                 C   "   |  |}tj|||| jd |S r.   )r/   r   reducer    )r   r&   r1   r-   r   r   r   r3   >   s   
zTorchCollective.reducetensor_listc                 C   r(   r)   )r   
all_gatherr    )r   r4   r&   r   r   r   r5   D   r+   zTorchCollective.all_gatherr   gather_listc                 C      t j|||| jd |S r)   )r   gatherr    )r   r&   r6   r1   r   r   r   r8   I      zTorchCollective.gatherscatter_listc                 C      t j|||| jd |S r)   )r   scatterr    )r   r&   r:   r'   r   r   r   r<   N   r9   zTorchCollective.scatteroutput
input_listc                 C   r2   r.   )r/   r   reduce_scatterr    )r   r=   r>   r-   r   r   r   r?   S   s   
zTorchCollective.reduce_scatteroutput_tensor_listinput_tensor_listc                 C   r(   r)   )r   
all_to_allr    )r   r@   rA   r   r   r   rB   [   r+   zTorchCollective.all_to_alltagc                 C   s   t j|||| jd d S N)rC   r    )r   sendr    )r   r&   r1   rC   r   r   r   rE   `   s   zTorchCollective.sendc                 C   r;   rD   )r   recvr    )r   r&   r'   rC   r   r   r   rF   d   r9   zTorchCollective.recvobject_listobjc                 C   r(   r)   )r   all_gather_objectr    )r   rG   rH   r   r   r   rI   i   s   z!TorchCollective.all_gather_objectdevicec                 C   s   t j||| j|d |S )N)r    rJ   )r   broadcast_object_listr    )r   rG   r'   rJ   r   r   r   rK   m      z%TorchCollective.broadcast_object_listobject_gather_listc                 C   r7   r)   )r   gather_objectr    )r   rH   rM   r1   r   r   r   rN   s   s   zTorchCollective.gather_objectscatter_object_output_listscatter_object_input_listc                 C   r;   r)   )r   scatter_object_listr    )r   rO   rP   r'   r   r   r   rQ   w   rL   z#TorchCollective.scatter_object_list
device_idsc                 C   s&   | j tjjkr	d S tj| j |d d S )N)r    rR   )r    r   r   NON_GROUP_MEMBERbarrier)r   rR   r   r   r   rT   }   s   zTorchCollective.barriertimeoutwait_all_ranksc                 C   s   t j| j||d d S )N)r    rU   rV   )r   monitored_barrierr    )r   rU   rV   r   r   r   rW      s   z!TorchCollective.monitored_barriermain_address	main_portkwargsc                    s   |   r| S d}d}|d ur|tjvr|tj|< d}d}d}|d ur0|tjvr0t|tj|< d}t jdi | dt_|rEtjdd  |rNtjdd  | S )NFMASTER_ADDRTMASTER_PORTr   )	is_initializedosenvironstrr   setupr   manages_default_grouppop)r   rX   rY   rZ   set_addraddr_keyset_portport_keyr   r   r   ra      s&   
zTorchCollective.setupc                    sr   | j tjjk}t   |r+tjr+tjj }d ur+t	tj
jdkr+| | dt_| S tjr7tjjd u r7dt_| S )N   F)r    r   r   rS   r   teardownr   rb   r   lendistributed_c10d_pg_mapdestroy_group)r   group_memberdefault_groupr   r   r   ri      s   

zTorchCollective.teardownc                 C   s   t  S r   )r   r   clsr   r   r   r      s   zTorchCollective.is_availablec                 C   s   |   ot S r   )r   r   r]   rp   r   r   r   r]         zTorchCollective.is_initializedc                 K   s   t jdi | d S Nr   )r   init_process_grouprq   rZ   r   r   r   
init_group   s   zTorchCollective.init_groupc                 K   s   t jdi |S rs   )r   	new_groupru   r   r   r   rw      rr   zTorchCollective.new_groupr    c                 C   s   |t jjv rt | d S d S r   )r   rk   rl   destroy_process_group)rq   r    r   r   r   rm      s   zTorchCollective.destroy_groupc                 C   sf   t |ttfr	|S t |tstd|dt|j | }tt|d }|d u r1td|d|S )NzUnsupported op z	 of type zop z is not a member of `ReduceOp`)	
isinstancer   r   r`   
ValueErrortype__name__uppergetattr)rq   r-   valuer   r   r   r/      s   
z%TorchCollective._convert_to_native_op)r   N)r,   )r   )Nr   r   )NF)NN)5r|   
__module____qualname____doc__rb   r   propertyr   r
   r    intr#   r%   r   r*   r   r`   r   r   r0   r3   r   r5   r8   r<   r?   rB   rE   r   rF   r   rI   torchrJ   rK   rN   rQ   rT   datetime	timedeltaboolrW   r   ra   ri   classmethodr   r]   rv   rw   rm   r/   __classcell__r   r   r   r   r      s    $( &"$"
$
*0r   )r   r^   typingr   r   r   r   r   torch.distributeddistributedr   r   typing_extensionsr   r   /lightning.fabric.plugins.collectives.collectiver	    lightning.fabric.utilities.typesr
   r   r   r   torch.distributed.constantsr   r   r   r   r   r   r   <module>   s    