o
    oi+                     @   s^  d dl Z d dlmZmZmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ d
e
eeeeef de
ee e j f fddZ!dededdfddZ"			d dedee# dee dee ddf
ddZ$dej%j&ddfddZ'G dd deZ(G dd de(Z)G dd dZ*dS )!    N)
AnyCallableDictIterableIteratorListOptionalSizedUnioncast)Tensor)DistributedDataParallel)DistributedSamplerSampler)Selfoverride)_DatasetSamplerWrapper)rank_zero_debugrank_zero_info)_SizedIterableobjreturnc                 C   sP   t | tr| gS t | ttfrtjtt|  S t | tr&tjtt| 	  S g S )z?Recursively find all tensors contained in the specified object.)

isinstancer   listtuple	itertoolschainmap_find_tensorsdictvalues)r    r!   [/home/ubuntu/.local/lib/python3.10/site-packages/lightning/pytorch/overrides/distributed.pyr      s   

r   modeloutputc                 C   sb   t  r,| jr,d| _| jr| jstt|ng }tt j	j
j| j}|  || d S d| _d S )NTF)torchis_grad_enabledrequire_backward_grad_syncrequire_forward_param_syncfind_unused_parametersstatic_graphr   r   r   _C_distributed_c10dReducerreducer_rebuild_bucketsprepare_for_backward)r#   r$   argsr.   r!   r!   r"   r0   ,   s   
r0   ddp_comm_stateddp_comm_hookddp_comm_wrapperc                 C   s^   |du rdS |}|durt d|j d|j d ||}td|j d | j||d dS )a  Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html.

    Args:
        model:
            DDP model
        ddp_comm_state:
            state is passed to the hook and can be used to maintain
            and update any state information that users would like to
            maintain as part of the training process. Examples: error
            feedback in gradient compression, peers to communicate with
            next in GossipGrad etc.
        ddp_comm_hook:
            hook(state: object, bucket: dist._GradBucket) -> torch.futures.Future

            This callable function is called once the bucket is ready. The
            hook can perform whatever processing is needed and return
            a Future indicating completion of any async work (ex: allreduce).
            If the hook doesn't perform any communication, it can also
            just return a completed Future. The Future should hold the
            new value of grad bucket's tensors. Once a bucket is ready,
            c10d reducer would call this hook and use the tensors returned
            by the Future and copy grads to individual parameters.
        ddp_comm_wrapper:
            communication hook wrapper to support a communication hook such
            as FP16 compression as wrapper, which could be combined with
            ddp_comm_hook

    Examples::

        from torch.distributed.algorithms.ddp_comm_hooks import (
            default_hooks as default,
            powerSGD_hook as powerSGD,
            post_localSGD_hook as post_localSGD,
        )

        # fp16_compress_hook for compress gradients
        ddp_model = ...
        _register_ddp_comm_hook(
            model=ddp_model,
            ddp_comm_hook=default.fp16_compress_hook,
        )

        # powerSGD_hook
        ddp_model = ...
        _register_ddp_comm_hook(
            model=ddp_model,
            ddp_comm_state=powerSGD.PowerSGDState(
                process_group=None,
                matrix_approximation_rank=1,
                start_powerSGD_iter=5000,
            ),
            ddp_comm_hook=powerSGD.powerSGD_hook,
        )

        # post_localSGD_hook
        subgroup, _ = torch.distributed.new_subgroups()
        ddp_model = ...
        _register_ddp_comm_hook(
            model=ddp_model,
            state=post_localSGD.PostLocalSGDState(
                process_group=None,
                subgroup=subgroup,
                start_localSGD_iter=1_000,
            ),
            ddp_comm_hook=post_localSGD.post_localSGD_hook,
        )

        # fp16_compress_wrapper combined with other communication hook
        ddp_model = ...
        _register_ddp_comm_hook(
            model=ddp_model,
            ddp_comm_state=powerSGD.PowerSGDState(
                process_group=None,
                matrix_approximation_rank=1,
                start_powerSGD_iter=5000,
            ),
            ddp_comm_hook=powerSGD.powerSGD_hook,
            ddp_comm_wrapper=default.fp16_compress_wrapper,
        )

    Nz$DDP comm wrapper is provided, apply (z).zRegistering DDP comm hook: .)statehook)r   __qualname__r   register_comm_hook)r#   r2   r3   r4   r!   r!   r"   _register_ddp_comm_hook=   s   Wr;   modulec                 C   sJ   t | dr
t| jnt }ddlm} ddlm} || | dd|d dS )zeTaken from https://github.com/pytorch/pytorch/blob/v2.0.0/torch/nn/parallel/distributed.py#L675-L682.!_ddp_params_and_buffers_to_ignorer   )_get_default_group)_sync_module_statesi  )srcparams_and_buffers_to_ignoreN)hasattrsetr=   "torch.distributed.distributed_c10dr>   torch.distributed.utilsr?   )r<   parameters_to_ignorer>   torch_sync_module_statesr!   r!   r"   r?      s   
r?   c                       sH   e Zd ZdZdededdf fddZedeee	  fdd	Z
  ZS )
UnrepeatedDistributedSampleraG  A fork of the PyTorch DistributedSampler that doesn't repeat data, instead allowing the number of batches per
    process to be off-by-one from each other. This makes this sampler usable for predictions (it's deterministic and
    doesn't require shuffling). It is potentially unsafe to use this sampler for training, because during training the
    DistributedDataParallel syncs buffers on each forward pass, so it could freeze if one of the processes runs one
    fewer batch. During prediction, buffers are only synced on the first batch, so this is safe to use as long as each
    process runs at least one batch. We verify this in an assert.

    Taken from https://github.com/jpuigcerver/PyLaia/blob/v1.0.0/laia/data/unpadded_distributed_sampler.py and
    https://github.com/pytorch/pytorch/issues/25162#issuecomment-634146002

    r1   kwargsr   Nc                    sn   t  j|i | t| jtstdtt| jt| j| j	| _
t| j| _| j
dks3| jdks5J d S d S )N6The given dataset must implement the `__len__` method.   r   )super__init__r   datasetr	   	TypeErrorlenrangeranknum_replicasnum_samples
total_size)selfr1   rI   	__class__r!   r"   rM      s    z%UnrepeatedDistributedSampler.__init__c                 C   s   t | jts
td| jr$t }|| j tj	t
| j|d }n	ttt
| j}t
|| jks6J || j| j| j }t
|| jksIJ t|S )NrJ   )	generator)r   rN   r	   rO   shuffler%   	Generatormanual_seedepochrandpermrP   tolistr   rQ   rU   rR   rS   rT   iter)rV   gindicesr!   r!   r"   __iter__   s   z%UnrepeatedDistributedSampler.__iter__)__name__
__module__r9   __doc__r   rM   r   r   r   intrc   __classcell__r!   r!   rW   r"   rH      s
    
 rH   c                       sP   e Zd ZdZdeeef dededdf fddZe	de
f fd	d
Z  ZS )#UnrepeatedDistributedSamplerWrapperz_Equivalent class to ``DistributedSamplerWrapper`` but for the ``UnrepeatedDistributedSampler``.samplerr1   rI   r   Nc                    s"   t  jt|g|R i | d S N)rL   rM   r   )rV   rj   r1   rI   rW   r!   r"   rM      s   "z,UnrepeatedDistributedSamplerWrapper.__init__c                    s"    j    fddt  D S )Nc                 3   s    | ]} j | V  qd S rk   )rN   ).0indexrV   r!   r"   	<genexpr>   s    z?UnrepeatedDistributedSamplerWrapper.__iter__.<locals>.<genexpr>)rN   resetrL   rc   rn   rW   rn   r"   rc      s   
z,UnrepeatedDistributedSamplerWrapper.__iter__)rd   re   r9   rf   r
   r   r   r   rM   r   r   rc   rh   r!   r!   rW   r"   ri      s
    &ri   c                   @   sf   e Zd ZdZdeddfddZdee fddZde	fd	d
Z
defddZdeeef fddZdS )_IndexBatchSamplerWrapperz\This class is used to wrap a :class:`torch.utils.data.BatchSampler` and capture its indices.batch_samplerr   Nc                 C   s,   g | _ dd |j D | _|| _d | _d S )Nc                 S   s   i | ]\}}|d vr||qS ))__next__rc   __len____getstate__r!   )rl   kvr!   r!   r"   
<dictcomp>   s
    z6_IndexBatchSamplerWrapper.__init__.<locals>.<dictcomp>)seen_batch_indices__dict__items_batch_sampler	_iterator)rV   rr   r!   r!   r"   rM      s   
z"_IndexBatchSamplerWrapper.__init__c                 C   s(   | j d usJ t| j }| j| |S rk   )r}   nextry   append)rV   batchr!   r!   r"   rs      s   
z"_IndexBatchSamplerWrapper.__next__c                 C   s   g | _ t| j| _| S rk   )ry   r`   r|   r}   rn   r!   r!   r"   rc     s   z"_IndexBatchSamplerWrapper.__iter__c                 C   s
   t | jS rk   )rP   r|   rn   r!   r!   r"   rt     s   
z!_IndexBatchSamplerWrapper.__len__c                 C   s   | j  }d |d< |S )Nr}   )rz   copy)rV   r7   r!   r!   r"   ru   	  s   
z&_IndexBatchSamplerWrapper.__getstate__)rd   re   r9   rf   r   rM   r   rg   rs   r   rc   rt   r   strr   ru   r!   r!   r!   r"   rq      s    rq   )NNN)+r   typingr   r   r   r   r   r   r   r	   r
   r   r%   r   torch.nn.parallel.distributedr   torch.utils.datar   r   typing_extensionsr   r   &lightning.fabric.utilities.distributedr   %lightning.pytorch.utilities.rank_zeror   r   !lightning.pytorch.utilities.typesr   r   r   r   r   r   r0   objectr;   nnModuler?   rH   ri   rq   r!   r!   r!   r"   <module>   sD   0

f,