o
    oi@                     @   s   d dl mZmZmZmZmZ d dlZd dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZ G dd deZdS )    )AnyDictListOptionalUnionN)Tensor)DataParallelModule)override)Accelerator)CheckpointIO)	Precision)ParallelStrategy)_StrategyRegistry)
TBroadcastTReduce)apply_to_collection)ReduceOpc                       s  e Zd ZdZ				d4dee deeej  dee	 dee
 f fddZeed	ejfd
dZeed5ddZeded	efddZeded	dfddZed6dedeej d	efddZe	d7dedee deeeef  d	efddZededed	dfdd Zed8d"ed#ed	efd$d%Zed9d'ed(ed	efd)d*Z eded	e!eeee"f f f fd+d,Z#e	&d9ded-e!eeee"f f d.ed	df fd/d0Z$e%ed1e&d	dfd2d3Z'  Z(S ):DataParallelStrategyzImplements data-parallel training in a single process, i.e., the model gets replicated to each device and each
    gets a split of the data.Nacceleratorparallel_devicescheckpoint_io	precisionc                    s   t  j||d ||d d S )N)r   r   cluster_environmentr   r   )super__init__)selfr   r   r   r   	__class__ R/home/ubuntu/.local/lib/python3.10/site-packages/lightning/fabric/strategies/dp.pyr   #   s   
zDataParallelStrategy.__init__returnc                 C   s   | j d usJ | j d S )Nr   )r   r   r   r   r    root_device2   s   
z DataParallelStrategy.root_devicec                 C      d S Nr   r"   r   r   r    distributed_sampler_kwargs8      z/DataParallelStrategy.distributed_sampler_kwargsmodulec                 C   s   t || jdS )zDWraps the given model into a :class:`~torch.nn.DataParallel` module.)r(   
device_ids)r   r   r   r(   r   r   r    setup_module=   s   z!DataParallelStrategy.setup_modulec                 C   s   | | j d S r%   )tor#   r*   r   r   r    module_to_deviceB   s   z%DataParallelStrategy.module_to_devicebatchdevicec                 C      |S r%   r   )r   r.   r/   r   r   r    batch_to_deviceF   r'   z$DataParallelStrategy.batch_to_devicemean
collectiongroup	reduce_opc                 C   s   dt dt fdd}t|t |S )Ntr!   c                 S   s   | j }|   |S r%   )dtypefloatr2   r,   )r6   original_dtyper   r   r    r2   O   s   z-DataParallelStrategy.all_reduce.<locals>.mean)r   r   )r   r3   r4   r5   r2   r   r   r    
all_reduceK   s   zDataParallelStrategy.all_reduceargskwargsc                 O   r$   r%   r   )r   r;   r<   r   r   r    barrierU      zDataParallelStrategy.barrierr   objsrcc                 C   r0   r%   r   )r   r?   r@   r   r   r    	broadcastY   r>   zDataParallelStrategy.broadcastTdecisionallc                 C   r0   r%   r   )r   rB   rC   r   r   r    reduce_boolean_decision]   r>   z,DataParallelStrategy.reduce_boolean_decisionc                    s   t |tr|j}t |S r%   )
isinstancer   r(   r   get_module_state_dictr*   r   r   r    rF   a   s   
z*DataParallelStrategy.get_module_state_dict
state_dictstrictc                    s&   t |tr|j}t j|||d d S )N)r(   rG   rH   )rE   r   r(   r   load_module_state_dict)r   r(   rG   rH   r   r   r    rI   g   s   
z+DataParallelStrategy.load_module_state_dictstrategy_registryc                 C   s   |j d| | jd d S )Ndp)description)register__name__)clsrJ   r   r   r    register_strategieso   s   z(DataParallelStrategy.register_strategies)NNNN)r!   Nr%   )Nr2   )r   )T))rN   
__module____qualname____doc__r   r   r   torchr/   r   r   r   propertyr
   r#   r&   r	   r   r+   r-   r   r1   r   r   r   strr:   r=   r   intrA   boolrD   r   r   rF   rI   classmethodr   rP   __classcell__r   r   r   r    r      sx     	(r   )typingr   r   r   r   r   rT   r   torch.nnr   r	   typing_extensionsr
   lightning.fabric.acceleratorsr   )lightning.fabric.plugins.io.checkpoint_ior   "lightning.fabric.plugins.precisionr   $lightning.fabric.strategies.parallelr   $lightning.fabric.strategies.registryr   $lightning.fabric.strategies.strategyr   r   %lightning.fabric.utilities.apply_funcr   &lightning.fabric.utilities.distributedr   r   r   r   r   r    <module>   s   