o
    8wic                     @   st  d dl Z d dlmZmZ d dlmZmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlZd d	lmZ d d
lmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5 edZ6edZ7e 8e9Z:G dd deZ;G dd dZ<dS )    N)ABCabstractmethod)	GeneratorMapping)contextmanager)AnyCallableOptionalTypeVarUnion)Tensor)Module)	Optimizer)CheckpointIO)_StrategyRegistry)move_data_to_device)ReduceOp)
_EmptyInit)_optimizer_to_device_optimizers_to_device)_PATH)LightningOptimizer"_init_optimizers_and_lr_schedulers)TorchCheckpointIO)_WrappingCheckpointIO)	Precision)	_Launcher)	TrainerFn)STEP_OUTPUTLRSchedulerConfig
TBroadcastTReducec                   @   s  e Zd ZdZ			dded dee dee ddfdd	Zedee	 fd
dZ
eded fddZejdddZedefddZejdeddfddZedefddZejdee ddfddZedee fddZejdee ddfddZdddZdddZdd d!Zdd$d%Zdd&d'Zdd(d)Zd*edeeef fd+d,Zd-ed*ee d.ed/edef
d0d1Z	dd*ed2eg ef deede f  d/edef
d3d4Z!de dee de"e ee f fd5d6Z#de de fd7d8Z$d*edefd9d:Z%dd<ed=ee&j' d>e(defd?d@Z)ee*de&j'fdAdBZ+e*ddCdDZ,ee*de-fdEdFZ.e*		GddHeeef dIee dJeee/ef  deeef fdKdLZ0e*ddMee ddfdNdOZ1e*ddPe2dQe(de2fdRdSZ3e*ddHedIee dUe-defdVdWZ4ddYe-dZe-de-fd[d\Z5d-eddfd]d^Z6d-eddfd_d`Z7edee  fdadbZ8e8jdcee  ddfdddbZ8eded fdedfZ9dge:deeef fdhdiZ;ddje<eef dke-ddfdldmZ=dje<eef ddfdndoZ>d.ed/ede?fdpdqZ@ddrdsZAd.ed/ede?fdtduZBd.ed/ede?fdvdwZCd.ed/edefdxdyZDdzeEdeEfd{d|ZFede-fd}d~ZGede-fddZHede-fddZIdeeef fddZJ	ddjeeef de:dee ddfddZKde:ddfddZLeMddee- deNd fddZOeMdeNd fddZPdddZQeRdeSddfddZTdddZUdddZVdddZWdddZXdddZYdddZZdddZ[dddZ\d<ede(ddfddZ]de^ddfddZ_dddZ`defddZadeddfddZbdS )StrategyzcBase class for all strategies that change the behaviour of the training, validation and test- loop.Nacceleratorpl.accelerators.Acceleratorcheckpoint_ioprecision_pluginreturnc                 C   sH   || _ || _d | _|| _d | _d | _d | _t | _g | _	g | _
g | _d S N)_accelerator_checkpoint_io_precision_pluginr&   _lightning_module_model	_launcher_ForwardRedirection_forward_redirection_optimizers_lightning_optimizerslr_scheduler_configs)selfr#   r%   r&    r5   b/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/pytorch_lightning/strategies/strategy.py__init__2   s   
zStrategy.__init__c                 C      | j S r(   )r.   r4   r5   r5   r6   launcherE      zStrategy.launcherc                 C   r8   r(   r)   r9   r5   r5   r6   r#   I   r;   zStrategy.acceleratorc                 C   
   || _ d S r(   r<   )r4   r#   r5   r5   r6   r#   M      
c                 C   s4   | j d u rt | _ | j S t| j trt | j _| j S r(   )r*   r   
isinstancer   r%   r9   r5   r5   r6   r%   Q   s   

zStrategy.checkpoint_ioioc                 C   r=   r(   )r*   )r4   r@   r5   r5   r6   r%   Z   r>   c                 C   s   | j d ur| j S t S r(   )r+   r   r9   r5   r5   r6   r&   ^   s   zStrategy.precision_pluginc                 C   r=   r(   )r+   )r4   r&   r5   r5   r6   r&   b   r>   c                 C   r8   r(   )r1   r9   r5   r5   r6   
optimizersf   r;   zStrategy.optimizersrA   c                    s   | _  fdd|D  _d S )Nc                    s   g | ]}t | qS r5   )r   _to_lightning_optimizer).0optr9   r5   r6   
<listcomp>m   s    z'Strategy.optimizers.<locals>.<listcomp>)r1   r2   )r4   rA   r5   r9   r6   rA   j   s   modelpl.LightningModulec                 C   s   || _ || _dS )z=Called by the Trainer to connect the strategy with the model.N)r,   rF   r4   rF   r5   r5   r6   connecto   s   
zStrategy.connectc                 C      dS )z&Attach the launcher based on Strategy.Nr5   r9   r5   r5   r6   _configure_launcherv       zStrategy._configure_launcherc                 C   s    | j dusJ | j | j dS )zSetup any processes or distributed connections.

        This is called before the LightningModule/DataModule setup hook which allows the user to access the accelerator
        environment before setup is complete.

        N)r#   setup_deviceroot_devicer9   r5   r5   r6   setup_environmenty   s   zStrategy.setup_environmenttrainer
pl.Trainerc                 C   s$   | j dusJ t| j \| _| _dS )zCreates optimizers and schedulers.

        Args:
            trainer: the Trainer, these optimizers should be connected to

        N)lightning_moduler   rA   r3   r4   rP   r5   r5   r6   setup_optimizers   s   zStrategy.setup_optimizersc                 C   s   | j dusJ | j | | jdusJ | j| j| _|   | | j| _|jjt	j
kr3| | |   |jjt	j
krGt| j| j dS dS )zSets up the accelerator, plugins and initializes the optimizers (if needed).

        Args:
            trainer: the trainer instance

        N)r#   setuprF   r&   convert_modulemodel_to_device_setup_modelstatefnr   FITTINGrT   setup_precision_pluginr   rA   rN   rS   r5   r5   r6   rU      s   
zStrategy.setupc                 C   s@   | j dusJ | j| j | j| j\}}}|| _ || _|| _dS )z.Attaches the precision plugin to the strategy.N)rF   r&   rI   rA   r3   )r4   rF   rA   r3   r5   r5   r6   r\      s   

zStrategy.setup_precision_plugin	optimizerc                 C   s<   t |tr|j}t|dr|  | jr| S i S | S )zReturns state of an optimizer.

        Allows for syncing/collating optimizer state from processes in custom strategies.

        consolidate_state_dict)r?   r   
_optimizerhasattrr^   is_global_zero
state_dictr4   r]   r5   r5   r6   optimizer_state   s   

zStrategy.optimizer_stateclosure_lossargskwargsc                 O   sf   |  | | jdusJ | j || j}| jj|| j|g|R i | | j|| j}| | |S )a'  Forwards backward-calls to the precision plugin.

        Args:
            closure_loss: a tensor holding the loss value to backpropagate
            optimizer: An optional optimizer that gets passed down to the precision plugin's backward
            \*args: Positional arguments that get passed down to the precision plugin's backward, intended as arguments
                for the actual function that performs the backward, like :meth:`~torch.Tensor.backward`.
            \**kwargs: Keyword arguments for the same purpose as ``*args``.

        N)pre_backwardrR   r&   backwardpost_backward)r4   re   r]   rf   rg   r5   r5   r6   ri      s   
 
zStrategy.backwardclosurec                 K   s4   |p| j }t|tjsJ | jj|f||d|S )aH  Performs the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            closure: closure calculating the loss value
            model: reference to the model, optionally defining optimizer step related hooks
            \**kwargs: Keyword arguments to ``optimizer.step``

        )rF   rk   )rR   r?   plLightningModuler&   optimizer_step)r4   r]   rk   rF   rg   r5   r5   r6   rn      s   
zStrategy.optimizer_stepc                    s$     |} fdd|D }||fS )zSetup a model and multiple optimizers together.

        The returned objects are expected to be in the same order they were passed in. The default implementation will
        call :meth:`_setup_model` and :meth:`_setup_optimizer` on the inputs.

        c                    s   g | ]}  |qS r5   )_setup_optimizer)rC   r]   r9   r5   r6   rE      s    z8Strategy._setup_model_and_optimizers.<locals>.<listcomp>)rX   )r4   rF   rA   r5   r9   r6   _setup_model_and_optimizers   s   
z$Strategy._setup_model_and_optimizersc                 C      |S )zDPerforms setup for the model, e.g., by wrapping it by another class.r5   rH   r5   r5   r6   rX         zStrategy._setup_modelc                 C   rq   )zHPerforms setup for the optimizer, e.g., by wrapping it by another class.r5   rc   r5   r5   r6   ro     rr   zStrategy._setup_optimizerr   batchdevicedataloader_idxc                 C   s2   | j }|p| j}|dur|j|||dS t||S )az  Moves the batch to the correct device.

        The returned batch is of the same type as the input batch, just
        having all tensors on the correct device.

        Args:
            batch: The batch of samples to move to the correct device
            device: The target device
            dataloader_idx: The index of the dataloader to which the batch belongs.

        N)rt   ru   )rR   rN   _apply_batch_transfer_handlerr   )r4   rs   rt   ru   rF   r5   r5   r6   batch_to_device  s
   

zStrategy.batch_to_devicec                 C   rJ   )zReturns the root device.Nr5   r9   r5   r5   r6   rN     rL   zStrategy.root_devicec                 C   rJ   )z&Moves the model to the correct device.Nr5   r9   r5   r5   r6   rW     rL   zStrategy.model_to_devicec                 C   rJ   )zcWhether the current process is the rank zero process not only on the local node, but for all nodes.Nr5   r9   r5   r5   r6   ra   "  rL   zStrategy.is_global_zeromeantensorgroup	reduce_opc                 C   rJ   )a,  Reduces the given tensor (e.g. across GPUs/processes).

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to reduce
            reduce_op: the reduction operation. Defaults to 'mean'.
                Can also be a string 'sum' or ReduceOp.

        Nr5   )r4   ry   rz   r{   r5   r5   r6   reduce'  rL   zStrategy.reducenamec                 C   rJ   )zSynchronizes all processes which blocks processes until the whole group enters this function.

        Args:
            name: an optional name to pass into barrier.

        Nr5   )r4   r}   r5   r5   r6   barrier8  rL   zStrategy.barrierobjsrcc                 C   rJ   )zBroadcasts an object to all processes.

        Args:
            obj: the object to broadcast
            src: source rank

        Nr5   )r4   r   r   r5   r5   r6   	broadcastA  rL   zStrategy.broadcastF
sync_gradsc                 C   rJ   )a  Perform an all_gather on all processes.

        Args:
            tensor: the tensor to all_gather
            group: the process group to gather results from
            sync_grads: flag that allows users to synchronize gradients for all_gather op

        Nr5   )r4   ry   rz   r   r5   r5   r6   
all_gatherK  rL   zStrategy.all_gatherTdecisionallc                 C   rq   )z/Reduce a boolean decision across all processes.r5   )r4   r   r   r5   r5   r6   reduce_boolean_decisionV     z Strategy.reduce_boolean_decisionc                 C   rJ   )z.Run before precision plugin executes backward.Nr5   r4   re   r5   r5   r6   rh   Z  rL   zStrategy.pre_backwardc                 C   rJ   )z-Run after precision plugin executes backward.Nr5   r   r5   r5   r6   rj   ]  rL   zStrategy.post_backwardc                 C   s   | j dur| j S | jS )z0Returns the potentially wrapped LightningModule.N)r-   r,   r9   r5   r5   r6   rF   `  s   zStrategy.model	new_modelc                 C   r=   r(   )r-   )r4   r   r5   r5   r6   rF   e  r>   c                 C   r8   )z<Returns the pure LightningModule without potential wrappers.)r,   r9   r5   r5   r6   rR   i  s   zStrategy.lightning_modulecheckpoint_pathc                 C   s   t j  | j|S r(   )torchcudaempty_cacher%   load_checkpoint)r4   r   r5   r5   r6   r   n  s   
zStrategy.load_checkpoint
checkpointstrictc                 C   s&   | j d usJ | j j|d |d d S )Nrb   )r   )rR   load_state_dict)r4   r   r   r5   r5   r6   load_model_state_dictr  s   zStrategy.load_model_state_dictc                 C   s8   |d }t | j|D ]\}}|| t|| j q
d S )Noptimizer_states)ziprA   r   r   rN   )r4   r   r   r]   	opt_stater5   r5   r6   load_optimizer_state_dictv  s
   
z"Strategy.load_optimizer_state_dictc                 O      | j dusJ | jdusJ | j . | j| j kr1| j| j| j dg|R i |W  d   S | j j|i |W  d   S 1 sDw   Y  dS )zThe actual training step.

        See :meth:`~pytorch_lightning.core.LightningModule.training_step` for more details

        Ntraining_step)rR   rF   r&   train_step_contextr0   r   r4   rf   rg   r5   r5   r6   r   |     $zStrategy.training_stepc                 C   rJ   )zSThis hook is deprecated.

        Override :meth:`training_step` instead.

        Nr5   r9   r5   r5   r6   post_training_step  s   zStrategy.post_training_stepc                 O   r   )zThe actual validation step.

        See :meth:`~pytorch_lightning.core.LightningModule.validation_step` for more details

        Nvalidation_step)rR   rF   r&   val_step_contextr0   r   r   r5   r5   r6   r     r   zStrategy.validation_stepc                 O   r   )zwThe actual test step.

        See :meth:`~pytorch_lightning.core.LightningModule.test_step` for more details

        N	test_step)rR   rF   r&   test_step_contextr0   r   r   r5   r5   r6   r     r   zStrategy.test_stepc                 O   r   )z}The actual predict step.

        See :meth:`~pytorch_lightning.core.LightningModule.predict_step` for more details

        Npredict_step)rR   rF   r&   predict_step_contextr0   r   r   r5   r5   r6   r     r   zStrategy.predict_step
dataloaderc                 C   rq   )zWraps the dataloader if necessary.

        Args:
            dataloader: iterable. Ideally of type: :class:`torch.utils.data.DataLoader`

        r5   )r4   r   r5   r5   r6   process_dataloader     zStrategy.process_dataloaderc                 C   rJ   )a  Override to delay restoring from checkpoint till after the setup phase has completed. This is useful when
        the strategy requires all the setup hooks to run before loading checkpoint.

        Returns:
            If ``True``, restore checkpoint after strategy setup.

        Fr5   r9   r5   r5   r6   restore_checkpoint_after_setup  s   	z'Strategy.restore_checkpoint_after_setupc                 C   rJ   )zOverride to disable Lightning restoring optimizers/schedulers.

        This is useful for strategies which manage restoring optimizers/schedulers.

        Tr5   r9   r5   r5   r6   lightning_restore_optimizer  r   z$Strategy.lightning_restore_optimizerc                 C   rJ   )z>Whether the strategy handles gradient accumulation internally.Fr5   r9   r5   r5   r6   handles_gradient_accumulation  rr   z&Strategy.handles_gradient_accumulationc                 C   s   | j dusJ | j  S )zReturns model state.N)rR   rb   r9   r5   r5   r6   lightning_module_state_dict  s   
z$Strategy.lightning_module_state_dictfilepathstorage_optionsc                 C   s    | j r| jj|||d dS dS )a?  Save model/training states as a checkpoint file through state-dump and file-write.

        Args:
            checkpoint: dict containing model and trainer state
            filepath: write-target file's path
            storage_options: parameter for how to save to storage, passed to ``CheckpointIO`` plugin

        )r   N)ra   r%   save_checkpoint)r4   r   r   r   r5   r5   r6   r     s   zStrategy.save_checkpointc                 C   s   | j r| j| dS dS )zqRemove checkpoint filepath from the filesystem.

        Args:
            filepath: Path to checkpoint

        N)ra   r%   remove_checkpoint)r4   r   r5   r5   r6   r     s   zStrategy.remove_checkpoint
empty_init)NNNc              
   c   s    t t|d}|< | j  | j  dV  W d   n1 s"w   Y  W d   n1 s1w   Y  W d   dS W d   dS 1 sIw   Y  dS )a  Controls how tensors get created (device, dtype).

        Args:
            empty_init: Whether to initialize the model with empty weights (uninitialized memory).
                If ``None``, the strategy will decide. Some strategies may not support all options.

        )enabledN)r   boolrN   r&   tensor_init_context)r4   r   empty_init_contextr5   r5   r6   r     s   	P zStrategy.tensor_init_contextc                 c   s    dV  dS )a  Provide hook to create modules in a distributed aware context. This is useful for when we'd like to shard
        the model instantly, which is useful for extremely large models which can save memory and initialization time.

        Returns: Model parallel context.

        Nr5   r9   r5   r5   r6   model_sharded_context  s   
zStrategy.model_sharded_contextc                 C   sj   t | jtd | jdurt| jj d | j	  | j
  | jdus)J | j  | j  dS )zThis method is called to teardown the training process.

        It is the right place to release memory and free other resources.

        cpuNz: moving model to CPU)r   rA   r   rt   rR   logdebug	__class____name__r   r&   teardownr#   r%   r9   r5   r5   r6   r     s   



zStrategy.teardownstrategy_registryc                 C      d S r(   r5   )clsr   r5   r5   r6   register_strategies  r   zStrategy.register_strategiesc                 C   rJ   )zCalled when train begins.Nr5   r9   r5   r5   r6   on_train_start"  r   zStrategy.on_train_startc                 C   rJ   )zCalled when validation begins.Nr5   r9   r5   r5   r6   on_validation_start&  r   zStrategy.on_validation_startc                 C   rJ   )zCalled when test begins.Nr5   r9   r5   r5   r6   on_test_start*  r   zStrategy.on_test_startc                 C   rJ   )zCalled when predict begins.Nr5   r9   r5   r5   r6   on_predict_start.  r   zStrategy.on_predict_startc                 C   rJ   )zCalled when train ends.Nr5   r9   r5   r5   r6   on_train_end2  r   zStrategy.on_train_endc                 C   rJ   )zCalled when validation ends.Nr5   r9   r5   r5   r6   on_validation_end6  r   zStrategy.on_validation_endc                 C   rJ   )zCalled when test end.Nr5   r9   r5   r5   r6   on_test_end:  r   zStrategy.on_test_endc                 C   rJ   )zCalled when predict ends.Nr5   r9   r5   r5   r6   on_predict_end>  r   zStrategy.on_predict_end	batch_idxc                 C   rJ   )zCCalled in the training loop before anything happens for that batch.Nr5   )r4   rs   r   r5   r5   r6   on_train_batch_startB  r   zStrategy.on_train_batch_start	exceptionc                 C   rJ   )zACalled when the trainer execution is interrupted by an exception.Nr5   )r4   r   r5   r5   r6   on_exceptionF  r   zStrategy.on_exceptionc                 C   s   g | _ g | _g | _d S r(   )r1   r2   r3   r9   r5   r5   r6    _reset_optimizers_and_schedulersJ  s   
z)Strategy._reset_optimizers_and_schedulersc                 C   s   t t| }g |d< |S )Nr2   )dictvarsr4   rY   r5   r5   r6   __getstate__O  s   zStrategy.__getstate__rY   c                 C   s   || _ | j| _d S r(   )__dict__rA   r   r5   r5   r6   __setstate__U  s   zStrategy.__setstate__)r#   r$   r'   N)rF   rG   r'   N)r'   N)rP   rQ   r'   Nr(   )Nr   )Nrx   )r   )NF)T)cr   
__module____qualname____doc__r	   r   r   r7   propertyr   r:   r#   setterr%   r&   listr   rA   rI   rK   rO   rT   rU   r\   r   strr   rd   r   ri   r   r   r   rn   tuplerp   rX   ro   r   rt   intrw   r   rN   rW   r   ra   r   r|   r~   r    r   r   r   rh   rj   rF   rR   r   r   r   r   r   r   r   r   r   r   r   objectr   r   r   r   r   r   r   r   r   r   r   r   classmethodr   r   r   r   r   r   r   r   r   r   r   BaseExceptionr   r   r   r   r5   r5   r5   r6   r"   /   s(   










 

&"

	"
 
	




	








r"   c                   @   s^   e Zd ZdZdedddedededefd	d
ZdeddddfddZ	deddddfddZ
dS )r/   zImplements the `forward-redirection`.

    A method call to a wrapped module gets rerouted through the wrapper's `forward` method instead.

    wrapper_moduleoriginal_modulerG   method_namerf   rg   r'   c                    sX    dksJ j dtdtdtf fdd}|_ |i |} |S )a  Reroutes a method call through the `wrapper_module`'s `forward` method.

        Args:
            wrapper_module: The module that has `original_module` wrapped.
            original_module: The module that was wrapped inside `wrapper_module`.
            method_name: The name of the method that should be called on the `original_module` after inputs get
                redirected through the `wrapper_module`'s `forward` method.
            *args: The positional arguments to the method `method_name`. They will get passed to a patched
                `forward` method instead.
            **kwargs: The keyword arguments to the method `method_name`. They will get passed to a patched
                `forward` method instead.

        forward_args_kwargsr'   c                     s.   _ t }|| i |} |S r(   )r   getattron_after_inner_forward)r   r   methodoutr   original_forwardr   r4   r   r5   r6   wrapped_forwardt  s
   
z5_ForwardRedirection.__call__.<locals>.wrapped_forward)r   r   on_after_outer_forward)r4   r   r   r   rf   rg   r   wrapper_outputr5   r   r6   __call__a  s   "z_ForwardRedirection.__call__Nc                 C   r   r(   r5   r4   r   r   r5   r5   r6   r        z*_ForwardRedirection.on_after_inner_forwardc                 C   r   r(   r5   r   r5   r5   r6   r     r   z*_ForwardRedirection.on_after_outer_forward)r   r   r   r   r   r   r   r   r   r   r   r5   r5   r5   r6   r/   Z  s"    
$r/   )=loggingabcr   r   collections.abcr   r   
contextlibr   typingr   r   r	   r
   r   r   r   torch.nnr   torch.optimr   pytorch_lightningrl   lightning_fabric.pluginsr   lightning_fabric.strategiesr   lightning_fabric.utilitiesr   &lightning_fabric.utilities.distributedr   lightning_fabric.utilities.initr   $lightning_fabric.utilities.optimizerr   r    lightning_fabric.utilities.typesr    pytorch_lightning.core.optimizerr   r   pytorch_lightning.pluginsr   $pytorch_lightning.plugins.io.wrapperr   #pytorch_lightning.plugins.precisionr   /pytorch_lightning.strategies.launchers.launcherr    pytorch_lightning.trainer.statesr   !pytorch_lightning.utilities.typesr   r   r    r!   	getLoggerr   r   r"   r/   r5   r5   r5   r6   <module>   sB   
    /