o
    zic                     @   sx  d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlZd dlmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7 edZ8edZ9e :e;Z<G dd deZ=G dd dZ>dS )    N)ABCabstractmethod)contextmanager)
AnyCallableDict	GeneratorListMappingOptionalTupleTypeVarUnion)Tensor)Module)	Optimizer)CheckpointIO)_StrategyRegistry)move_data_to_device)ReduceOp)
_EmptyInit)_optimizer_to_device_optimizers_to_device)_PATH)LightningOptimizer"_init_optimizers_and_lr_schedulers)TorchCheckpointIO)_WrappingCheckpointIO)	Precision)	_Launcher)	TrainerFn)STEP_OUTPUTLRSchedulerConfig
TBroadcastTReducec                   @   s  e Zd ZdZ			dded dee dee ddfdd	Zedee	 fd
dZ
eded fddZejdddZedefddZejdeddfddZedefddZejdee ddfddZedee fddZejdee ddfddZdddZdddZdd d!Zdd$d%Zdd&d'Zdd(d)Zd*edeeef fd+d,Zd-ed*ee d.ed/edef
d0d1Z	dd*ed2eg ef deede f  d/edef
d3d4Z!de dee de"e ee f fd5d6Z#de de fd7d8Z$d*edefd9d:Z%dd<ed=ee&j' d>e(defd?d@Z)ee*de&j'fdAdBZ+e*ddCdDZ,ee*de-fdEdFZ.e*		GddHeeef dIee dJeee/ef  deeef fdKdLZ0e*ddMee ddfdNdOZ1e*ddPe2dQe(de2fdRdSZ3e*ddHedIee dUe-defdVdWZ4ddYe-dZe-de-fd[d\Z5d-eddfd]d^Z6d-eddfd_d`Z7edee  fdadbZ8e8jdcee  ddfdddbZ8eded fdedfZ9dge:deeef fdhdiZ;ddje<eef dke-ddfdldmZ=dje<eef ddfdndoZ>d.ed/ede?fdpdqZ@ddrdsZAd.ed/ede?fdtduZBd.ed/ede?fdvdwZCd.ed/edefdxdyZDdzeEdeEfd{d|ZFede-fd}d~ZGede-fddZHede-fddZIdeeef fddZJ	ddjeeef de:dee ddfddZKde:ddfddZLeMddee- deNd fddZOeMdeNd fddZPdddZQeRdeSddfddZTdddZUdddZVdddZWdddZXdddZYdddZZdddZ[dddZ\d<ede(ddfddZ]de^ddfddZ_dddZ`defddZadeddfddZbdS )StrategyzcBase class for all strategies that change the behaviour of the training, validation and test- loop.Nacceleratorpl.accelerators.Acceleratorcheckpoint_ioprecision_pluginreturnc                 C   sH   || _ || _d | _|| _d | _d | _d | _t | _g | _	g | _
g | _d S N)_accelerator_checkpoint_io_precision_pluginr)   _lightning_module_model	_launcher_ForwardRedirection_forward_redirection_optimizers_lightning_optimizerslr_scheduler_configs)selfr&   r(   r)    r8   Y/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/strategies/strategy.py__init__1   s   
zStrategy.__init__c                 C      | j S r+   )r1   r7   r8   r8   r9   launcherD      zStrategy.launcherc                 C   r;   r+   r,   r<   r8   r8   r9   r&   H   r>   zStrategy.acceleratorc                 C   
   || _ d S r+   r?   )r7   r&   r8   r8   r9   r&   L      
c                 C   s4   | j d u rt | _ | j S t| j trt | j _| j S r+   )r-   r   
isinstancer   r(   r<   r8   r8   r9   r(   P   s   

zStrategy.checkpoint_ioioc                 C   r@   r+   )r-   )r7   rC   r8   r8   r9   r(   Y   rA   c                 C   s   | j d ur| j S t S r+   )r.   r   r<   r8   r8   r9   r)   ]   s   zStrategy.precision_pluginc                 C   r@   r+   )r.   )r7   r)   r8   r8   r9   r)   a   rA   c                 C   r;   r+   )r4   r<   r8   r8   r9   
optimizerse   r>   zStrategy.optimizersrD   c                    s   | _  fdd|D  _d S )Nc                    s   g | ]}t | qS r8   )r   _to_lightning_optimizer).0optr<   r8   r9   
<listcomp>l   s    z'Strategy.optimizers.<locals>.<listcomp>)r4   r5   )r7   rD   r8   r<   r9   rD   i   s   modelpl.LightningModulec                 C   s   || _ || _dS )z=Called by the Trainer to connect the strategy with the model.N)r/   rI   r7   rI   r8   r8   r9   connectn   s   
zStrategy.connectc                 C      dS )z&Attach the launcher based on Strategy.Nr8   r<   r8   r8   r9   _configure_launcheru       zStrategy._configure_launcherc                 C   s    | j dusJ | j | j dS )zSetup any processes or distributed connections.

        This is called before the LightningModule/DataModule setup hook which allows the user to access the accelerator
        environment before setup is complete.

        N)r&   setup_deviceroot_devicer<   r8   r8   r9   setup_environmentx   s   zStrategy.setup_environmenttrainer
pl.Trainerc                 C   s$   | j dusJ t| j \| _| _dS )zCreates optimizers and schedulers.

        Args:
            trainer: the Trainer, these optimizers should be connected to

        N)lightning_moduler   rD   r6   r7   rS   r8   r8   r9   setup_optimizers   s   zStrategy.setup_optimizersc                 C   s   | j dusJ | j | | jdusJ | j| j| _|   | | j| _|jjt	j
kr3| | |   |jjt	j
krGt| j| j dS dS )zSets up the accelerator, plugins and initializes the optimizers (if needed).

        Args:
            trainer: the trainer instance

        N)r&   setuprI   r)   convert_modulemodel_to_device_setup_modelstatefnr    FITTINGrW   setup_precision_pluginr   rD   rQ   rV   r8   r8   r9   rX      s   
zStrategy.setupc                 C   s@   | j dusJ | j| j | j| j\}}}|| _ || _|| _dS )z.Attaches the precision plugin to the strategy.N)rI   r)   rL   rD   r6   )r7   rI   rD   r6   r8   r8   r9   r_      s   

zStrategy.setup_precision_plugin	optimizerc                 C   s<   t |tr|j}t|dr|  | jr| S i S | S )zReturns state of an optimizer.

        Allows for syncing/collating optimizer state from processes in custom strategies.

        consolidate_state_dict)rB   r   
_optimizerhasattrra   is_global_zero
state_dictr7   r`   r8   r8   r9   optimizer_state   s   

zStrategy.optimizer_stateclosure_lossargskwargsc                 O   sf   |  | | jdusJ | j || j}| jj|| j|g|R i | | j|| j}| | |S )a'  Forwards backward-calls to the precision plugin.

        Args:
            closure_loss: a tensor holding the loss value to backpropagate
            optimizer: An optional optimizer that gets passed down to the precision plugin's backward
            \*args: Positional arguments that get passed down to the precision plugin's backward, intended as arguments
                for the actual function that performs the backward, like :meth:`~torch.Tensor.backward`.
            \**kwargs: Keyword arguments for the same purpose as ``*args``.

        N)pre_backwardrU   r)   backwardpost_backward)r7   rh   r`   ri   rj   r8   r8   r9   rl      s   
 
zStrategy.backwardclosurec                 K   s4   |p| j }t|tjsJ | jj|f||d|S )aH  Performs the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            closure: closure calculating the loss value
            model: reference to the model, optionally defining optimizer step related hooks
            \**kwargs: Keyword arguments to ``optimizer.step``

        )rI   rn   )rU   rB   plLightningModuler)   optimizer_step)r7   r`   rn   rI   rj   r8   r8   r9   rq      s   
zStrategy.optimizer_stepc                    s$     |} fdd|D }||fS )zSetup a model and multiple optimizers together.

        The returned objects are expected to be in the same order they were passed in. The default implementation will
        call :meth:`_setup_model` and :meth:`_setup_optimizer` on the inputs.

        c                    s   g | ]}  |qS r8   )_setup_optimizer)rF   r`   r<   r8   r9   rH      s    z8Strategy._setup_model_and_optimizers.<locals>.<listcomp>)r[   )r7   rI   rD   r8   r<   r9   _setup_model_and_optimizers   s   
z$Strategy._setup_model_and_optimizersc                 C      |S )zDPerforms setup for the model, e.g., by wrapping it by another class.r8   rK   r8   r8   r9   r[         zStrategy._setup_modelc                 C   rt   )zHPerforms setup for the optimizer, e.g., by wrapping it by another class.r8   rf   r8   r8   r9   rr     ru   zStrategy._setup_optimizerr   batchdevicedataloader_idxc                 C   s2   | j }|p| j}|dur|j|||dS t||S )az  Moves the batch to the correct device.

        The returned batch is of the same type as the input batch, just
        having all tensors on the correct device.

        Args:
            batch: The batch of samples to move to the correct device
            device: The target device
            dataloader_idx: The index of the dataloader to which the batch belongs.

        N)rw   rx   )rU   rQ   _apply_batch_transfer_handlerr   )r7   rv   rw   rx   rI   r8   r8   r9   batch_to_device  s
   

zStrategy.batch_to_devicec                 C   rM   )zReturns the root device.Nr8   r<   r8   r8   r9   rQ     rO   zStrategy.root_devicec                 C   rM   )z&Moves the model to the correct device.Nr8   r<   r8   r8   r9   rZ     rO   zStrategy.model_to_devicec                 C   rM   )zcWhether the current process is the rank zero process not only on the local node, but for all nodes.Nr8   r<   r8   r8   r9   rd   !  rO   zStrategy.is_global_zeromeantensorgroup	reduce_opc                 C   rM   )a,  Reduces the given tensor (e.g. across GPUs/processes).

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to reduce
            reduce_op: the reduction operation. Defaults to 'mean'.
                Can also be a string 'sum' or ReduceOp.

        Nr8   )r7   r|   r}   r~   r8   r8   r9   reduce&  rO   zStrategy.reducenamec                 C   rM   )zSynchronizes all processes which blocks processes until the whole group enters this function.

        Args:
            name: an optional name to pass into barrier.

        Nr8   )r7   r   r8   r8   r9   barrier7  rO   zStrategy.barrierobjsrcc                 C   rM   )zBroadcasts an object to all processes.

        Args:
            obj: the object to broadcast
            src: source rank

        Nr8   )r7   r   r   r8   r8   r9   	broadcast@  rO   zStrategy.broadcastF
sync_gradsc                 C   rM   )a  Perform an all_gather on all processes.

        Args:
            tensor: the tensor to all_gather
            group: the process group to gather results from
            sync_grads: flag that allows users to synchronize gradients for all_gather op

        Nr8   )r7   r|   r}   r   r8   r8   r9   
all_gatherJ  rO   zStrategy.all_gatherTdecisionallc                 C   rt   )z/Reduce a boolean decision across all processes.r8   )r7   r   r   r8   r8   r9   reduce_boolean_decisionU     z Strategy.reduce_boolean_decisionc                 C   rM   )z.Run before precision plugin executes backward.Nr8   r7   rh   r8   r8   r9   rk   Y  rO   zStrategy.pre_backwardc                 C   rM   )z-Run after precision plugin executes backward.Nr8   r   r8   r8   r9   rm   \  rO   zStrategy.post_backwardc                 C   s   | j dur| j S | jS )z0Returns the potentially wrapped LightningModule.N)r0   r/   r<   r8   r8   r9   rI   _  s   zStrategy.model	new_modelc                 C   r@   r+   )r0   )r7   r   r8   r8   r9   rI   d  rA   c                 C   r;   )z<Returns the pure LightningModule without potential wrappers.)r/   r<   r8   r8   r9   rU   h  s   zStrategy.lightning_modulecheckpoint_pathc                 C   s   t j  | j|S r+   )torchcudaempty_cacher(   load_checkpoint)r7   r   r8   r8   r9   r   m  s   
zStrategy.load_checkpoint
checkpointstrictc                 C   s&   | j d usJ | j j|d |d d S )Nre   )r   )rU   load_state_dict)r7   r   r   r8   r8   r9   load_model_state_dictq  s   zStrategy.load_model_state_dictc                 C   s8   |d }t | j|D ]\}}|| t|| j q
d S )Noptimizer_states)ziprD   r   r   rQ   )r7   r   r   r`   	opt_stater8   r8   r9   load_optimizer_state_dictu  s
   
z"Strategy.load_optimizer_state_dictc                 O      | j dusJ | jdusJ | j . | j| j kr1| j| j| j dg|R i |W  d   S | j j|i |W  d   S 1 sDw   Y  dS )zThe actual training step.

        See :meth:`~pytorch_lightning.core.LightningModule.training_step` for more details

        Ntraining_step)rU   rI   r)   train_step_contextr3   r   r7   ri   rj   r8   r8   r9   r   {     $zStrategy.training_stepc                 C   rM   )zSThis hook is deprecated.

        Override :meth:`training_step` instead.

        Nr8   r<   r8   r8   r9   post_training_step  s   zStrategy.post_training_stepc                 O   r   )zThe actual validation step.

        See :meth:`~pytorch_lightning.core.LightningModule.validation_step` for more details

        Nvalidation_step)rU   rI   r)   val_step_contextr3   r   r   r8   r8   r9   r     r   zStrategy.validation_stepc                 O   r   )zwThe actual test step.

        See :meth:`~pytorch_lightning.core.LightningModule.test_step` for more details

        N	test_step)rU   rI   r)   test_step_contextr3   r   r   r8   r8   r9   r     r   zStrategy.test_stepc                 O   r   )z}The actual predict step.

        See :meth:`~pytorch_lightning.core.LightningModule.predict_step` for more details

        Npredict_step)rU   rI   r)   predict_step_contextr3   r   r   r8   r8   r9   r     r   zStrategy.predict_step
dataloaderc                 C   rt   )zWraps the dataloader if necessary.

        Args:
            dataloader: iterable. Ideally of type: :class:`torch.utils.data.DataLoader`

        r8   )r7   r   r8   r8   r9   process_dataloader     zStrategy.process_dataloaderc                 C   rM   )a  Override to delay restoring from checkpoint till after the setup phase has completed. This is useful when
        the strategy requires all the setup hooks to run before loading checkpoint.

        Returns:
            If ``True``, restore checkpoint after strategy setup.

        Fr8   r<   r8   r8   r9   restore_checkpoint_after_setup  s   	z'Strategy.restore_checkpoint_after_setupc                 C   rM   )zOverride to disable Lightning restoring optimizers/schedulers.

        This is useful for strategies which manage restoring optimizers/schedulers.

        Tr8   r<   r8   r8   r9   lightning_restore_optimizer  r   z$Strategy.lightning_restore_optimizerc                 C   rM   )z>Whether the strategy handles gradient accumulation internally.Fr8   r<   r8   r8   r9   handles_gradient_accumulation  ru   z&Strategy.handles_gradient_accumulationc                 C   s   | j dusJ | j  S )zReturns model state.N)rU   re   r<   r8   r8   r9   lightning_module_state_dict  s   
z$Strategy.lightning_module_state_dictfilepathstorage_optionsc                 C   s    | j r| jj|||d dS dS )a?  Save model/training states as a checkpoint file through state-dump and file-write.

        Args:
            checkpoint: dict containing model and trainer state
            filepath: write-target file's path
            storage_options: parameter for how to save to storage, passed to ``CheckpointIO`` plugin

        )r   N)rd   r(   save_checkpoint)r7   r   r   r   r8   r8   r9   r     s   zStrategy.save_checkpointc                 C   s   | j r| j| dS dS )zqRemove checkpoint filepath from the filesystem.

        Args:
            filepath: Path to checkpoint

        N)rd   r(   remove_checkpoint)r7   r   r8   r8   r9   r     s   zStrategy.remove_checkpoint
empty_init)NNNc              
   c   s    t t|d}|< | j  | j  dV  W d   n1 s"w   Y  W d   n1 s1w   Y  W d   dS W d   dS 1 sIw   Y  dS )a  Controls how tensors get created (device, dtype).

        Args:
            empty_init: Whether to initialize the model with empty weights (uninitialized memory).
                If ``None``, the strategy will decide. Some strategies may not support all options.

        )enabledN)r   boolrQ   r)   tensor_init_context)r7   r   empty_init_contextr8   r8   r9   r     s   	P zStrategy.tensor_init_contextc                 c   s    dV  dS )a  Provide hook to create modules in a distributed aware context. This is useful for when we'd like to shard
        the model instantly, which is useful for extremely large models which can save memory and initialization time.

        Returns: Model parallel context.

        Nr8   r<   r8   r8   r9   model_sharded_context  s   
zStrategy.model_sharded_contextc                 C   sj   t | jtd | jdurt| jj d | j	  | j
  | jdus)J | j  | j  dS )zThis method is called to teardown the training process.

        It is the right place to release memory and free other resources.

        cpuNz: moving model to CPU)r   rD   r   rw   rU   logdebug	__class____name__r   r)   teardownr&   r(   r<   r8   r8   r9   r     s   



zStrategy.teardownstrategy_registryc                 C      d S r+   r8   )clsr   r8   r8   r9   register_strategies  r   zStrategy.register_strategiesc                 C   rM   )zCalled when train begins.Nr8   r<   r8   r8   r9   on_train_start!  r   zStrategy.on_train_startc                 C   rM   )zCalled when validation begins.Nr8   r<   r8   r8   r9   on_validation_start%  r   zStrategy.on_validation_startc                 C   rM   )zCalled when test begins.Nr8   r<   r8   r8   r9   on_test_start)  r   zStrategy.on_test_startc                 C   rM   )zCalled when predict begins.Nr8   r<   r8   r8   r9   on_predict_start-  r   zStrategy.on_predict_startc                 C   rM   )zCalled when train ends.Nr8   r<   r8   r8   r9   on_train_end1  r   zStrategy.on_train_endc                 C   rM   )zCalled when validation ends.Nr8   r<   r8   r8   r9   on_validation_end5  r   zStrategy.on_validation_endc                 C   rM   )zCalled when test end.Nr8   r<   r8   r8   r9   on_test_end9  r   zStrategy.on_test_endc                 C   rM   )zCalled when predict ends.Nr8   r<   r8   r8   r9   on_predict_end=  r   zStrategy.on_predict_end	batch_idxc                 C   rM   )zCCalled in the training loop before anything happens for that batch.Nr8   )r7   rv   r   r8   r8   r9   on_train_batch_startA  r   zStrategy.on_train_batch_start	exceptionc                 C   rM   )zACalled when the trainer execution is interrupted by an exception.Nr8   )r7   r   r8   r8   r9   on_exceptionE  r   zStrategy.on_exceptionc                 C   s   g | _ g | _g | _d S r+   )r4   r5   r6   r<   r8   r8   r9    _reset_optimizers_and_schedulersI  s   
z)Strategy._reset_optimizers_and_schedulersc                 C   s   t t| }g |d< |S )Nr5   )dictvarsr7   r\   r8   r8   r9   __getstate__N  s   zStrategy.__getstate__r\   c                 C   s   || _ | j| _d S r+   )__dict__rD   r   r8   r8   r9   __setstate__T  s   zStrategy.__setstate__)r&   r'   r*   N)rI   rJ   r*   N)r*   N)rS   rT   r*   Nr+   )Nr   )Nr{   )r   )NF)T)cr   
__module____qualname____doc__r   r   r   r:   propertyr   r=   r&   setterr(   r)   r	   r   rD   rL   rN   rR   rW   rX   r_   r   strr   rg   r   rl   r   r   r   rq   r   rs   r[   rr   r   rw   intrz   r   rQ   rZ   r   rd   r   r   r   r#   r   r   r   rk   rm   rI   rU   r   r   r
   r   r   r!   r   r   r   r   r   objectr   r   r   r   r   r   r   r   r   r   r   r   classmethodr   r   r   r   r   r   r   r   r   r   r   BaseExceptionr   r   r   r   r8   r8   r8   r9   r%   .   s(   










 

&"

	"
 
	




	








r%   c                   @   s^   e Zd ZdZdedddedededefd	d
ZdeddddfddZ	deddddfddZ
dS )r2   zImplements the `forward-redirection`.

    A method call to a wrapped module gets rerouted through the wrapper's `forward` method instead.

    wrapper_moduleoriginal_modulerJ   method_nameri   rj   r*   c                    sX    dksJ j dtdtdtf fdd}|_ |i |} |S )a  Reroutes a method call through the `wrapper_module`'s `forward` method.

        Args:
            wrapper_module: The module that has `original_module` wrapped.
            original_module: The module that was wrapped inside `wrapper_module`.
            method_name: The name of the method that should be called on the `original_module` after inputs get
                redirected through the `wrapper_module`'s `forward` method.
            *args: The positional arguments to the method `method_name`. They will get passed to a patched
                `forward` method instead.
            **kwargs: The keyword arguments to the method `method_name`. They will get passed to a patched
                `forward` method instead.

        forward_args_kwargsr*   c                     s.   _ t }|| i |} |S r+   )r   getattron_after_inner_forward)r   r   methodoutr   original_forwardr   r7   r   r8   r9   wrapped_forwards  s
   
z5_ForwardRedirection.__call__.<locals>.wrapped_forward)r   r   on_after_outer_forward)r7   r   r   r   ri   rj   r   wrapper_outputr8   r   r9   __call__`  s   "z_ForwardRedirection.__call__Nc                 C   r   r+   r8   r7   r   r   r8   r8   r9   r        z*_ForwardRedirection.on_after_inner_forwardc                 C   r   r+   r8   r   r8   r8   r9   r     r   z*_ForwardRedirection.on_after_outer_forward)r   r   r   r   r   r   r   r!   r   r   r   r8   r8   r8   r9   r2   Y  s"    
$r2   )?loggingabcr   r   
contextlibr   typingr   r   r   r   r	   r
   r   r   r   r   r   r   torch.nnr   torch.optimr   pytorch_lightningro   lightning_fabric.pluginsr   lightning_fabric.strategiesr   lightning_fabric.utilitiesr   &lightning_fabric.utilities.distributedr   lightning_fabric.utilities.initr   $lightning_fabric.utilities.optimizerr   r    lightning_fabric.utilities.typesr    pytorch_lightning.core.optimizerr   r   pytorch_lightning.pluginsr   $pytorch_lightning.plugins.io.wrapperr   #pytorch_lightning.plugins.precisionr   /pytorch_lightning.strategies.launchers.launcherr    pytorch_lightning.trainer.statesr    !pytorch_lightning.utilities.typesr!   r"   r#   r$   	getLoggerr   r   r%   r2   r8   r8   r8   r9   <module>   s@   0
    /