o
    ziG                  
   @   s  d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+m,Z,m-Z- edZ.edZ/e 0e1Z2G dd deZ3G dd deZ4G dd deZ5dee6 dee6 de7ddfddZ8d e6d!e
e6ee6ege7f f d"e9d#e
e6ef ddf
d$d%Z:dS )&    N)ABCabstractmethod)	ExitStack)
AnyCallableContextManagerDictIterableListOptionalTupleTypeVarUnion)Tensor)Module)	Optimizer)
DataLoader)Accelerator)CheckpointIO)TorchCheckpointIO)	Precision)	_Launcher)_StrategyRegistry)move_data_to_device)
_EmptyInit)_PATHOptimizableReduceOp	_Stateful
TBroadcastTReducec                   @   s  e Zd ZdZ			dsdee dee dee ddfddZe	e
dejfd	d
Ze	e
defddZe	dee fddZe	dee fddZejdeddfddZe	defddZejdeddfddZe	defddZejdee ddfddZdtddZdtddZdedefddZdefd d!Zdud"ee defd#d$Zd%ed&ee de eee f fd'd(Z!d%edefd)d*Z"d+edefd,d-Z#e
d%eddfd.d/Z$dud0e%d1eej de%fd2d3Z&d4e'd%ee d5e%d6e%ddf
d7d8Z(d+e)d6e%de%fd9d:Z*e
dvd4e'd<ee% d=ede'fd>d?Z+e
		@dwd4e,e'e%f d<ee% dAee,e-e.f  de,e'e%f fdBdCZ/e
dudDee. ddfdEdFZ0e
dxdHe1dIe2de1fdJdKZ3dydMedNedefdOdPZ4		dzdQe5dRe6e.e,eee%f f dSee% dTee6e.e7e.e%gef f  ddf
dUdVZ8d%ede6e.e,e%e'f f fdWdXZ9	Ldyd%edYe6e.e,e%e'f f dZeddfd[d\Z:d+ede6e.e'f fd]d^Z;		Ld{dQe5dRee,eee6e.e,eee%f f f  dZede6e.e%f fd_d`Z<dtdadbZ=	c	Ld|d%ej>jd+edde,e?e2f dee,e?e2f dfedej'fdgdhZ@d%ej>jd+edie,e?e2f ddfdjdkZAeBdleCddfdmdnZDde.fdodpZEdRe6e.e,eee%f f dTe6e.e7e.e%gef f de6e.e%f fdqdrZFdS )}StrategyzcBase class for all strategies that change the behaviour of the training, validation and test- loop.Nacceleratorcheckpoint_io	precisionreturnc                 C   s(   || _ || _d | _|| _d | _d | _d S N)_accelerator_checkpoint_io
_precisionr$   	_launcher_backward_sync_control)selfr"   r#   r$    r-   X/home/ubuntu/.local/lib/python3.10/site-packages/lightning_fabric/strategies/strategy.py__init__,   s   
zStrategy.__init__c                 C      dS )zReturns the root device.Nr-   r,   r-   r-   r.   root_device:       zStrategy.root_devicec                 C   r0   )zcWhether the current process is the rank zero process not only on the local node, but for all nodes.Nr-   r1   r-   r-   r.   is_global_zero?   r3   zStrategy.is_global_zeroc                 C      | j S r&   )r*   r1   r-   r-   r.   launcherD      zStrategy.launcherc                 C   r5   r&   r'   r1   r-   r-   r.   r"   H   r7   zStrategy.acceleratorc                 C   
   || _ d S r&   r8   )r,   r"   r-   r-   r.   r"   L      
c                 C   s   | j d u r	t | _ | j S r&   )r(   r   r1   r-   r-   r.   r#   P   s   
zStrategy.checkpoint_ioioc                 C   r9   r&   )r(   )r,   r;   r-   r-   r.   r#   V   r:   c                 C   s   | j d ur| j S t S r&   )r)   r   r1   r-   r-   r.   r$   Z   s   zStrategy.precisionc                 C   r9   r&   )r)   )r,   r$   r-   r-   r.   r$   ^   r:   c                 C   r0   )z&Attach the launcher based on Strategy.Nr-   r1   r-   r-   r.   _configure_launcherb   r3   zStrategy._configure_launcherc                 C   s    | j dusJ | j | j dS )zSetup any processes or distributed connections.

        This must be called by the framework at the beginning of every process, before any distributed communication
        takes place.

        N)r"   setup_devicer2   r1   r-   r-   r.   setup_environmente   s   zStrategy.setup_environment
dataloaderc                 C      |S )zWraps the dataloader if necessary.

        Args:
            dataloader: iterable. Ideally of type: :class:`torch.utils.data.DataLoader`

        r-   )r,   r?   r-   r-   r.   process_dataloadero   s   zStrategy.process_dataloaderc                 C   s*   | j  }t }|| j || |S )z1Controls how tensors get created (device, dtype).)r$   tensor_init_contextr   enter_contextr2   )r,   precision_init_ctxstackr-   r-   r.   rB   x   s
   

zStrategy.tensor_init_context
empty_initc                 C   s>   | j  }t }|| j |tt|d || |S )a  A context manager wrapping the model instantiation.

        Here, the strategy can control how the parameters of the model get created (device, dtype) and or apply other
        patches to the model.

        Args:
            empty_init: Whether to initialize the model with empty weights (uninitialized memory).
                If ``None``, the strategy will decide. Some strategies may not support all options.

        )enabled)r$   module_init_contextr   rC   r2   r   bool)r,   rF   precision_module_ctxrE   r-   r-   r.   rH      s   

zStrategy.module_init_contextmodule
optimizersc                    s$     |} fdd|D }||fS )zSet up a model and multiple optimizers together.

        The returned objects are expected to be in the same order they were passed in. The default implementation will
        call :meth:`setup_module` and :meth:`setup_optimizer` on the inputs.

        c                    s   g | ]}  |qS r-   )setup_optimizer).0	optimizerr1   r-   r.   
<listcomp>   s    z8Strategy.setup_module_and_optimizers.<locals>.<listcomp>)setup_module)r,   rK   rL   r-   r1   r.   setup_module_and_optimizers   s   
	z$Strategy.setup_module_and_optimizersc                 C   r@   )zDPerforms setup for the model, e.g., by wrapping it by another class.r-   r,   rK   r-   r-   r.   rQ         zStrategy.setup_modulerO   c                 C   r@   )zHPerforms setup for the optimizer, e.g., by wrapping it by another class.r-   r,   rO   r-   r-   r.   rM      rT   zStrategy.setup_optimizerc                 C   r0   )z&Moves the model to the correct device.Nr-   rS   r-   r-   r.   module_to_device   r3   zStrategy.module_to_devicebatchdevicec                 C   s   |p| j }t||S )a&  Moves the batch to the correct device.

        The returned batch is of the same type as the input batch, just
        having all tensors on the correct device.

        Args:
            batch: The batch of samples to move to the correct device
            device: The target device

        )r2   r   )r,   rW   rX   r-   r-   r.   batch_to_device   s   

zStrategy.batch_to_devicetensorargskwargsc                 O   s<   | j || | j j||g|R i | | j || dS )z0Forwards backward-calls to the precision plugin.N)r$   pre_backwardbackwardpost_backward)r,   rZ   rK   r[   r\   r-   r-   r.   r^      s   zStrategy.backwardc                 K   s   | j j|fi |S )zPerforms the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            **kwargs: Any extra arguments to ``optimizer.step``

        )r$   optimizer_step)r,   rO   r\   r-   r-   r.   r`      s   zStrategy.optimizer_stepFgroup
sync_gradsc                 C   r0   )a  Perform an all_gather on all processes.

        Args:
            tensor: the tensor to all_gather
            group: the process group to gather results from
            sync_grads: flag that allows users to synchronize gradients for all_gather op

        Nr-   )r,   rZ   ra   rb   r-   r-   r.   
all_gather   r3   zStrategy.all_gathermean	reduce_opc                 C   r0   )a,  Reduces the given tensor (e.g. across GPUs/processes).

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to reduce
            reduce_op: the reduction operation. Defaults to 'mean'.
                Can also be a string 'sum' or ReduceOp.

        Nr-   )r,   rZ   ra   re   r-   r-   r.   
all_reduce   r3   zStrategy.all_reducenamec                 C   r0   )zSynchronizes all processes which blocks processes until the whole group enters this function.

        Args:
            name: an optional name to pass into barrier.

        Nr-   )r,   rg   r-   r-   r.   barrier   r3   zStrategy.barrierr   objsrcc                 C   r0   )zBroadcasts an object to all processes.

        Args:
            obj: the object to broadcast
            src: source rank

        Nr-   )r,   ri   rj   r-   r-   r.   	broadcast   r3   zStrategy.broadcastTdecisionallc                 C   r@   )z/Reduce a boolean decision across all processes.r-   )r,   rl   rm   r-   r-   r.   reduce_boolean_decision   rT   z Strategy.reduce_boolean_decisionpathstatestorage_optionsfilterc                 C   s2   | j ||pi d}| jr| jj|||d dS dS )a  Save model, optimizer, and other state as a checkpoint file.

        Args:
            path: A path to where the file(s) should be saved
            state: A dictionary with contents to be saved. If the dict contains modules or optimizers, their
                state-dict will be retrieved and converted automatically.
            storage_options: Additional options for the ``CheckpointIO`` plugin
            filter: An optional dictionary containing filter callables that return a boolean indicating whether the
                given item should be saved (``True``) or filtered out (``False``). Each filter key should match a
                state key, where its filter will be applied to the ``state_dict`` generated.

        )rr   )
checkpointro   rq   N)"_convert_stateful_objects_in_stater4   r#   save_checkpoint)r,   ro   rp   rq   rr   r-   r-   r.   ru      s   zStrategy.save_checkpointc                 C   s   |  S )zReturns model state.)
state_dictrS   r-   r-   r.   get_module_state_dict  s   zStrategy.get_module_state_dictrv   strictc                 C   s   |j ||d dS )z%Loads the given state into the model.rx   N)load_state_dict)r,   rK   rv   rx   r-   r-   r.   load_module_state_dict  s   zStrategy.load_module_state_dictc                 C   s,   t |dr|  | jr| S i S | S )zReturns state of an optimizer.

        Allows for syncing/collating optimizer state from processes in custom plugins.

        consolidate_state_dict)hasattrr|   r4   rv   rU   r-   r-   r.   get_optimizer_state!  s   
zStrategy.get_optimizer_statec                 C   s   t j  | j|}|s|S t|tr| j|||d i S t|tr*|	| i S t
| | |d |  D ]/\}}||vrDq;t|trct|trZ| j||||d q;|	|| q;||||< q;|S )a  Load the contents from a checkpoint and restore the state of the given objects.

        Args:
            path: A path to where the file is located
            state: Can be one of:

                - A dictionary of objects whose state will be restored in-place from the checkpoint path.
                - ``None`` or the empty dict: The loaded checkpoint will be returned in full.
                - A :class:`~torch.nn.Module` instance, if the checkpoint file contains a raw module state dict.
                - A :class:`~torch.optim.Optimizer` instance, if the checkpoint file contains a raw optimizer state.

            strict: Whether to enforce that the keys in `state` match the keys in the checkpoint.

        Returns:
            The remaining items that were not restored into the given state dictionary. If no state dictionary is
            given, the full checkpoint will be returned.

        )rK   rv   rx   ry   )torchcudaempty_cacher#   load_checkpoint
isinstancer   r{   r   rz   !_validate_keys_for_strict_loadingkeyscopyitemsr   pop)r,   ro   rp   rx   rs   rg   ri   r-   r-   r.   r   0  s(   





zStrategy.load_checkpointc                 C   s0   | j   | jdusJ | j  | j  dS )zThis method is called to teardown the training process.

        It is the right place to release memory and free other resources.

        N)r$   teardownr"   r#   r1   r-   r-   r.   r   b  s   

zStrategy.teardown       @max_norm	norm_typeerror_if_nonfinitec                 C   s.   | j | | j |}tjjj||||dS )zClip gradients by norm.)r   r   r   )r$   unscale_gradientsmain_paramsr   nnutilsclip_grad_norm_)r,   rK   rO   r   r   r   
parametersr-   r-   r.   clip_gradients_normm  s
   	zStrategy.clip_gradients_normclip_valc                 C   s*   | j | | j |}tjjj||dS )zClip gradients by value.)
clip_value)r$   r   r   r   r   r   clip_grad_value_)r,   rK   rO   r   r   r-   r-   r.   clip_gradients_value|  s   zStrategy.clip_gradients_valuestrategy_registryc                 C   s   d S r&   r-   )clsr   r-   r-   r.   register_strategies  rT   zStrategy.register_strategiesc                 C   s   dt | j dS )NzThe `z` does not support setting up the module and optimizer(s) independently. Please call `setup_module_and_optimizers(model, [optimizer, ...])` to jointly set them up.)type__name__r1   r-   r-   r.   _err_msg_joint_setup_required  s   z&Strategy._err_msg_joint_setup_requiredc                 C   sp   i }|  D ]/\}}t|tr| j|d}nt|tr"| j|d}nt|tr,| }n|}t|||| q|S )N)rK   )rO   )	r   r   r   rw   r   r~   r   rv   _apply_filter)r,   rp   rr   converted_statekeyri   	convertedr-   r-   r.   rt     s   



z+Strategy._convert_stateful_objects_in_state)NNN)r%   Nr&   )NF)Nrd   )r   )T)NN)NT)r   T)Gr   
__module____qualname____doc__r   r   r   r   r/   propertyr   r   rX   r2   rI   r4   r   r6   r"   setterr#   r$   r<   r>   r   rA   r   rB   rH   r   r
   r   r   rR   rQ   rM   rV   r   rY   r   r^   r   r`   rc   r   r   strrf   rh   r   intrk   rn   r   r   r   ru   rw   r{   r~   r   r   r   floatr   r   classmethodr   r   r   rt   r-   r-   r-   r.   r!   )   s   



	
"
"


	
"
"


2


&
r!   c                   @   s*   e Zd ZdZedededefddZdS )_BackwardSyncControla  Interface for any :class:`Strategy` that wants to offer a functionality to enable or disable gradient
    synchronization during/after back-propagation.

    The most common use-case is gradient accumulation. If a :class:`Strategy` implements this interface, the user can
    implement their gradient accumulation loop very efficiently by disabling redundant gradient synchronization.

    rK   rG   r%   c                 C   r0   )zBlocks the synchronization of gradients during the backward pass.

        This is a context manager. It is only effective if it wraps a call to `.backward()`.

        Nr-   )r,   rK   rG   r-   r-   r.   no_backward_sync  r3   z%_BackwardSyncControl.no_backward_syncN)	r   r   r   r   r   r   rI   r   r   r-   r-   r-   r.   r     s    r   c                   @   s"   e Zd ZdZedefddZdS )_ShardedzkMixin-interface for any :class:`Strategy` that wants to expose functionality for sharding model parameters.r%   c                 C   r0   )a  A context manager that goes over the instantiation of an :class:`torch.nn.Module` and handles sharding of
        parameters on creation.

        By sharding layers directly on instantiation, one can reduce peak memory usage and initialization time.

        Nr-   r1   r-   r-   r.   module_sharded_context  r3   z_Sharded.module_sharded_contextN)r   r   r   r   r   r   r   r-   r-   r-   r.   r     s    r   requested_keyscheckpoint_keysrx   r%   c                    s6    fdd| D }|r|rt d|d  dd S d S )Nc                    s   g | ]}| vr|qS r-   r-   )rN   kr   r-   r.   rP     s    z5_validate_keys_for_strict_loading.<locals>.<listcomp>z$The requested state contains a key 'r   z^' that does not exist in the loaded checkpoint. To disable strict loading, set `strict=False`.)KeyError)r   r   rx   invalid_keysr-   r   r.   r     s   r   r   rr   source_dicttarget_dictc                 C   s^   | |v r)t |tr)||  }| D ]\}}|||r&|| i  |||  |< qd S ||| < d S r&   )r   dictr   
setdefault)r   rr   r   r   	filter_fnr   vr-   r-   r.   r     s   
r   );loggingabcr   r   
contextlibr   typingr   r   r   r   r	   r
   r   r   r   r   r   r   torch.nnr   torch.optimr   torch.utils.datar   lightning_fabric.acceleratorsr   )lightning_fabric.plugins.io.checkpoint_ior   $lightning_fabric.plugins.io.torch_ior   "lightning_fabric.plugins.precisionr   .lightning_fabric.strategies.launchers.launcherr   $lightning_fabric.strategies.registryr   %lightning_fabric.utilities.apply_funcr   lightning_fabric.utilities.initr    lightning_fabric.utilities.typesr   r   r   r   r   r    	getLoggerr   logr!   r   r   r   rI   r   objectr   r-   r-   r-   r.   <module>   s\   0
  w

