o
    ziw                     @   s  d dl Z d dlZd dlmZmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"Z#d dl$m%Z%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z=m>Z>m?Z?m@Z@ d dl<mAZB d dlCmDZD d dlEmFZF d dlGmHZHmIZI d dlJmKZK d dlLmMZM d dlNmOZOmPZP d dlQmRZR d dlSmTZT d dlUmVZV d dlWmXZX d dlYmZZZ d dl[m\Z\ d dl]m^Z^ d dl_m`Z` d d lambZbmZmcZc e
rAd d!ldmeZe d d"lfmgZgmhZhmiZi d d#ljmkZk eeee  eeelemgelf ekf Zneeied$ f Zoe peqZrG d%d& d&eZZsdS )'    N)contextmanagernullcontext)	timedelta)Path)TYPE_CHECKINGAnyCallableDict	GeneratorListLiteralMappingOptionalSetTupleTypeUnion)rank_zero_only)Tensor)Module)	Optimizer)override)CheckpointIOClusterEnvironment)default_pg_timeout)_StrategyRegistry)_METADATA_FILENAME _activation_checkpointing_kwargs_auto_wrap_policy_kwargs_distributed_checkpoint_load_distributed_checkpoint_save_get_full_state_dict_context_get_sharded_state_dict_context_init_cpu_offload_init_sharding_strategy_is_full_checkpoint_is_sharded_checkpoint_move_torchmetrics_to_device_optimizer_has_flat_params_setup_activation_checkpointing)_load_raw_module_state)_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)_TORCH_GREATER_EQUAL_2_2)&_has_meta_device_parameters_or_buffers)
_lazy_load_materialize_tensors)_optimizers_to_device)
reset_seed)_PATHReduceOp)LightningOptimizer)	Precision)FSDPPrecision)_SubprocessScriptLauncher)ParallelStrategy)
TBroadcast)	TrainerFn)is_overridden)rank_zero_infor   rank_zero_warn)
DeviceMesh)
CPUOffloadMixedPrecisionShardingStrategy)ModuleWrapPolicy)
FULL_SHARDSHARD_GRAD_OPNO_SHARDHYBRID_SHARDc                #       s  e Zd ZU dZdZg Zee ed< dddddde	ddddddddfde
d d	e
eej  d
e
e de
e de
e de
e de
e deeddf de
d de
d de
eee eee  f  de
d ddded de
eee df  deddf" fdd Zeedejfd!d"Zedefd#d$Zede
e fd%d&Zede
d fd'd(Zeede fd)d*Z!e!j"ede
e  ddfd+d*Z!eede#fd,d-Z$eedefd.d/Z%eedefd0d1Z&edu fd2d3Z'defd4d5Z(dud6d7Z)edud8d9Z*ed:edefd;d<Z+edvd?d@Z,edv fdAdBZ-edudCdDZ.e/edwdEe
e de0dF fdGdHZ1e/ede0dF fdIdJZ2edwdKe
e ddfdLdMZ3edxdOe4dPede4fdQdRZ5e		SdydTee6ef dUe
e dVe
ee7ef  de6fdWdXZ8dee fdYdZZ9edud[d\Z:e;dee fd]d^Z<e;ed_e=ddfd`daZ>ede#eef fdbdcZ?edzdee@eef dfeddfdgdhZAedieBde#ee6f fdjdkZCedee@eef ddfdldmZDe	dwdee#eef dneEdoe
e ddf fdpdqZFedreEde#eef fdsdtZG  ZHS ){FSDPStrategyae  Strategy for Fully Sharded Data Parallel provided by torch.distributed.

    Fully Sharded Training shards the entire model across all available GPUs, allowing you to scale model
    size, whilst using efficient communication to reduce overhead. In practice, this means we can remain
    at parity with PyTorch DDP, whilst scaling our model sizes dramatically. The technique is similar
    to ZeRO-Stage 3.

    For more information check out
    `this blogpost <https://pytorch.org/blog/introducing-pytorch-fully-sharded-data-parallel-api>`__.

    Defaults have been set and options have been exposed, but may require configuration
    based on your level of memory/speed efficiency. We suggest having a look at
    `this tutorial <https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html>`__ for more information.

    Arguments:
        cpu_offload: See ``cpu_offload`` parameter in :class:`torch.distributed.fsdp.FullyShardedDataParallel`.
        mixed_precision: See ``mixed_precision`` parameter in :class:`torch.distributed.fsdp.FullyShardedDataParallel`.
        auto_wrap_policy: Same as ``auto_wrap_policy`` parameter in
            :class:`torch.distributed.fsdp.FullyShardedDataParallel`. For convenience, this also accepts a set of the
            layer classes to wrap.
        activation_checkpointing: Deprecated. Use ``activation_checkpointing_policy``.
        activation_checkpointing_policy: Same as ``auto_wrap_policy`` parameter in
            :class:`torch.distributed.fsdp.FullyShardedDataParallel` but used when selecting the modules for which you
            want to enable activation checkpointing. Enabling this can free up a significant amount of memory at the
            cost of speed since activations in these layers need to be recomputed during backpropagation. For
            convenience, this also accepts a set of the layer classes to wrap.
        sharding_strategy: Select whether to shard model parameters, gradients, optimizer states, or a combination of
            them. Available values are:

            - ``"FULL_SHARD"``: Shards model parameters, gradients, and optimizer states (default).
            - ``"SHARD_GRAD_OP"``: Shards gradients and optimizer states only. Model parameters get replicated.
            - ``"NO_SHARD"``: No sharding (identical to regular DDP).
            - ``"HYBRID_SHARD"``: Shards model parameters, gradients, and optimizer states within a single machine, but
              replicates across machines. See also the `device_mesh` parameter below.

            Also accepts a :class:`torch.distributed.fsdp.ShardingStrategy` enum value.

        device_mesh: A tuple `(replication size, sharding size)` that defines over how many devices to shard and
            replicate the model. The product of the two numbers must equal the world size. Only valid in combination
            with the `HYBRID_SHARD` sharding strategy.

        state_dict_type: The format in which the state of the model and optimizers gets saved into the checkpoint.

            - ``"full"``: The full weights and optimizer states get assembled on rank 0 and saved to a single file.
            - ``"sharded"``: Each rank saves its shard of weights and optimizer states to a file. The checkpoint is
              a folder with as many files as the world size.

        \**kwargs: See available parameters in :class:`torch.distributed.fsdp.FullyShardedDataParallel`.

    fsdp_registered_strategiesNrH   fullacceleratorzpl.accelerators.Acceleratorparallel_devicescluster_environmentcheckpoint_ioprecision_pluginprocess_group_backendtimeoutcpu_offloadrD   mixed_precisionrE   auto_wrap_policy_POLICYactivation_checkpointingactivation_checkpointing_policysharding_strategy_SHARDING_STRATEGYstate_dict_type)rO   shardeddevice_meshrC   kwargsreturnc                    s   t  j|||||d d| _|| _|| _t|| _|	| _t|
|| _	|d ur1t
s,td|| j	d< t|| j	| _| j	dd t||| _|| _d S )N)rP   rQ   rR   rS   rT      z=The `device_mesh` argument is only supported in torch >= 2.2.ra   use_orig_paramsT)super__init__	num_nodes_process_group_backend_timeoutr#   rW   rX   r   rb   r1   
ValueErrorr$   r]   
setdefaultr   _state_dict_type)selfrP   rQ   rR   rS   rT   rU   rV   rW   rX   rY   r[   r\   r]   r_   ra   rb   	__class__ U/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/strategies/fsdp.pyrg      s.   


zFSDPStrategy.__init__c                 C   s   | j d usJ | j | j S N)rQ   
local_rankrn   rq   rq   rr   root_device   s   zFSDPStrategy.root_devicec                 C   s   | j d ur
t| j S dS )Nr   )rQ   lenru   rq   rq   rr   num_processes   s   zFSDPStrategy.num_processesc                 C      | j S rs   )ri   ru   rq   rq   rr   rU         z"FSDPStrategy.process_group_backendc                 C   s&   | j r| j S | j}t|tr|jS d S rs   )rX   rT   
isinstancer;   mixed_precision_configrn   pluginrq   rq   rr   r|      s   
z#FSDPStrategy.mixed_precision_configc                 C   s(   | j }|d urt|tsJ |S tdS )Nz32-true)_precision_pluginr{   r;   r}   rq   rq   rr   rT      s
   zFSDPStrategy.precision_pluginc                 C   s*   |d urt |tstd| || _d S )NzGThe FSDP strategy can only work with the `FSDPPrecision` plugin, found )r{   r;   	TypeErrorr   )rn   rT   rq   rq   rr   rT      s
   
c                 C   s   | j | j | jdS )N)num_replicasrank)rh   rx   global_rankru   rq   rq   rr   distributed_sampler_kwargs   s   z'FSDPStrategy.distributed_sampler_kwargsc                 C      dS )NTrq   ru   rq   rq   rr   restore_checkpoint_after_setup      z+FSDPStrategy.restore_checkpoint_after_setupc                 C   r   )NFrq   ru   rq   rq   rr   lightning_restore_optimizer   r   z(FSDPStrategy.lightning_restore_optimizerc                    s   t    t| jj d t  |   |  | _	| j
d us"J t| j
| j	| jd t| jdtrHddlm} |d| jd | jd< d S d S )Nz: setting up distributed...)rV   ra   r   )init_device_meshcuda)rf   setup_environmentlogdebugrp   __name__r6   set_world_ranks_get_process_group_backendri   rR   r-   rj   r{   rb   gettupletorch.distributed.device_meshr   )rn   r   ro   rq   rr   r     s   

zFSDPStrategy.setup_environmentc                 C   s   | j pt| jS rs   )ri   r,   rv   ru   rq   rq   rr   r     s   z'FSDPStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rs   )rR   set_global_rank	node_rankrx   rt   set_world_sizerh   r   r   r   utils_rank_zero_onlyru   rq   rq   rr   r     s   
zFSDPStrategy.set_world_ranksc                 C   s2   | j d usJ | j jst| j | j| j| _d S d S rs   )rR   creates_processes_externallyr<   rx   rh   	_launcherru   rq   rq   rr   _configure_launcher   s   z FSDPStrategy._configure_launchermodelc                    s   ddl m  t fdd| D r)t|rtd d| jv r(td | jd= n!td| j	j
 d	| j   d|| j| j| j| j	j
d
| j}t|| j	 t|| j |S )z|Wraps the model into a :class:`~torch.distributed.fsdp.fully_sharded_data_parallel.FullyShardedDataParallel`
        module.r   FullyShardedDataParallelc                 3   s    | ]}t | V  qd S rs   )r{   ).0modr   rq   rr   	<genexpr>,      z,FSDPStrategy._setup_model.<locals>.<genexpr>zYThe model is already wrapped in `FSDP` but there are still parameters on the meta device.rY   z_A FSDP `auto_wrap_policy` is set, but the model is already wrapped. The policy will be ignored.z&setting up FSDP model with device id: z
, kwargs: )modulerW   rX   r]   	device_idNrq   )torch.distributed.fsdpr   anymodulesr2   rB   rb   r   r   rv   indexrW   r|   r]   r'   r)   r   )rn   r   rq   r   rr   _setup_model&  s2   
	zFSDPStrategy._setup_modeltrainer
pl.Trainerc                 C   s   | j d usJ | j | | jd usJ |jjtjkr&| jr&| j| j| _| j	
| j| _td| jr9td n| | j| _|   |jjtjkrP| | |   |jjtjkrdt| j| j d S d S )Nconfigure_sharded_modelzYou have overridden `LightningModule.configure_sharded_model` hook. It will assume that all the layers are already wrapped for sharding and won't wrap the entire model using `FullyShardedDataParallel`.)rP   setupr   statefnr?   FITTING_layer_syncapplyrT   convert_moduler@   lightning_modulerA   r   barriersetup_optimizerssetup_precision_pluginr5   
optimizersrv   )rn   r   rq   rq   rr   r   I  s$   
zFSDPStrategy.setupc              
      s   |    | jdrt |S d}zt | W n ty5 } zdt|vr) d}W Y d }~nd }~ww |sBtdd | jD rFtdd S )Nre   Fz%optimizer got an empty parameter listTc                 s   s    | ]}t | V  qd S rs   )r(   )r   	optimizerrq   rq   rr   r   x  r   z0FSDPStrategy.setup_optimizers.<locals>.<genexpr>zThe optimizer does not seem to reference any FSDP parameters. HINT: Make sure to create the optimizer after setting up the model by referencing `self.trainer.model.parameters()` in the `configure_optimizers()` hook.)	 _reset_optimizers_and_schedulersrb   r   rf   r   rk   strr   r   )rn   r   invalid_params_errorexro   rq   rr   r   d  s"   zFSDPStrategy.setup_optimizersc                 C      d S rs   rq   ru   rq   rq   rr   model_to_device  r   zFSDPStrategy.model_to_device
empty_init)NNNc              	   c   s    |rt dnt }|) | j  d V  W d    n1 s!w   Y  W d    d S W d    d S 1 s9w   Y  d S )Nmeta)torchdevicer   rT   tensor_init_context)rn   r   empty_init_contextrq   rq   rr   r     s
   Pz FSDPStrategy.tensor_init_contextc                 c   s    t | jj d ddlm} ddlm} |d|| j| j	| j
| jjd| j d V  W d    d S 1 s9w   Y  d S )Nz : entered model_sharded_context.r   r   )enable_wrap)wrapper_clsrW   rX   r]   r   rq   )r   r   rp   r   2torch.distributed.fsdp.fully_sharded_data_parallelr   torch.distributed.fsdp.wrapr   rW   r|   r]   rv   r   rb   )rn   r   r   rq   rq   rr   model_sharded_context  s   "z"FSDPStrategy.model_sharded_contextnamec                 C   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)
device_ids)r+   r   distributedget_backendr   _determine_device_ids)rn   r   rq   rq   rr   r     s
   zFSDPStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr/   r   )r+   r   r   broadcast_object_list_groupWORLD)rn   r   r   rq   rq   rr   	broadcast  s
   zFSDPStrategy.broadcastmeantensorr0   	reduce_opc                 C   s   t |trt|||dS |S )a  Reduces a tensor from several distributed processes to one aggregated tensor.

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to gather results from. Defaults to all processes (world)
            reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
                Can also be a string 'sum' to calculate the sum during reduction.

        Return:
            reduced value, except when the input was not a tensor the output remains is unchanged

        )r   )r{   r   r.   )rn   r   r0   r   rq   rq   rr   reduce  s   
zFSDPStrategy.reducec                 C   s
   | j jgS rs   )rv   r   ru   rq   rq   rr   r     s   
z"FSDPStrategy._determine_device_idsc                 C   s   t | jj d | j}|d ur0|jd ur0|jjjtj	kr0| j
r0| jd us(J | j
| j| _| jd us7J | jd us>J | j  | j  | j  d S )Nz: tearing down strategy...)r   r   rp   r   r   _trainerr   r   r?   r   r   r   revertrR   rP   teardownrT   )rn   	pl_modulerq   rq   rr   r     s   


zFSDPStrategy.teardownc                 C   ry   rs   )rN   )clsrq   rq   rr   get_registered_strategies  rz   z&FSDPStrategy.get_registered_strategiesstrategy_registryc                 C   sL   t j sd S |jd| dd | jd |jd| ddd | jd d S )NrM   z+Fully Sharded Data Parallel (FSDP) training)descriptionfsdp_cpu_offloadzQFully Sharded Data Parallel (FSDP) training with Full Sharding and CPU OffloadingT)r   rW   )r   r   is_availableregisterrN   append)r   r   rq   rq   rr   register_strategies  s   
z FSDPStrategy.register_strategiesc                 C   s   | j d usJ | jdkrt| j }n| jdkr t| j | jd}ntd| j | | j  W  d    S 1 s:w   Y  d S )Nr`   rO   
world_sizeUnknown state_dict_type: )r   rm   r"   r!   r   rk   
state_dict)rn   state_dict_ctxrq   rq   rr   lightning_module_state_dict  s   

$z(FSDPStrategy.lightning_module_state_dictT
checkpointstrictc                 C   r   rs   rq   )rn   r   r   rq   rq   rr   load_model_state_dict  r   z"FSDPStrategy.load_model_state_dictr   c                 C   s   ddl m} ddl m} t|tr|j}| jd usJ | jdkr=t| j |	| j|W  d    S 1 s7w   Y  n4| jdkrqt
| j| jd |	| j|}| jdkr`|||j| j}|W  d    S 1 slw   Y  td| j )Nr   r   OptimStateKeyTyper`   rO   r   r   )r   r   r   r{   r9   
_optimizerr   rm   r"   optim_state_dictr!   r   r   rekey_optim_state_dictPARAM_IDrk   )rn   r   FSDPr   r   rq   rq   rr   optimizer_state  s"   

"

 zFSDPStrategy.optimizer_statec                 C   r   rs   rq   )rn   r   rq   rq   rr   load_optimizer_state_dict#  r   z&FSDPStrategy.load_optimizer_state_dictfilepathstorage_optionsc                    s  |d urt dt| |}| r#| jdkr#t|s#td| | jdkrd| r0|  |j	ddd d|
di}|d	d
 t|
dg D  t|| | jdkrbt||t  d S d S | jdkrzt|rrt| t j||dS td| j )Nz`FSDPStrategy.save_checkpoint(..., storage_options=...)` is not supported because `FSDPStrategy` does not use the `CheckpointIO`.rO   z/The checkpoint path exists and is a directory: r`   T)parentsexist_okr   r   c                 S   s   i | ]
\}}d | |qS )
optimizer_rq   )r   idxoptim_staterq   rq   rr   
<dictcomp><  s    
z0FSDPStrategy.save_checkpoint.<locals>.<dictcomp>optimizer_statesr   )r   r   r   )r   r   r   is_dirrm   r&   IsADirectoryErroris_fileunlinkmkdirpopupdate	enumerater    r   r   saver   shutilrmtreerf   save_checkpointrk   )rn   r   r   r   pathconverted_statero   rq   rr   r  (  s0   





zFSDPStrategy.save_checkpointcheckpoint_pathc                 C   s  t | |}ddlm} | jd usJ | jd usJ t|rddlm} t	| j}|_ d| j
 i}t|| | jj|d | jjd | jjjjtjkr| jrddlm} ||d}t| jD ]"\}	}
d|	 }||d ||d	}|j|| | j|
d
}|
| qaW d    n1 sw   Y  t|t }|S t|r?t|}t|d| j| j| jjd t |}ddlm} ddlm!} |"d}|d u s| jjjjtjkr|S t#| jt#|krt$dt#| j dt#| dt%| j| jdd: t&| j|D ]*\}}t't(|d ) d t*r|+||j,| j}|j|| j|d
}|| qW d    |S 1 s8w   Y  |S t-dt.|d)Nr   r   )!load_sharded_optimizer_state_dictr   )r   )FileSystemReader)r  r   )model_state_dictoptimizer_keystorage_reader)r   r   optimr   )r   r   r   r   r  zYou have configured z( optimizers but the checkpoint contains z optimizers to load. Please resume training with the same number of optimizers or edit the checkpoint manually to remove states.F)r   
rank0_onlyr   z	The path z does not point to a valid checkpoint. Make sure the path points to either a directory with FSDP checkpoint shards, or a single file with a full checkpoint.)/r   r   r   r   r   r   r&   &torch.distributed.checkpoint.optimizerr  r"   r   r   load_state_dictstrict_loadingr   r   r   r?   r   r   torch.distributed.checkpointr  r	  optim_state_dict_to_loadr   loadr   r%   r3   r*   r  r   r4   r   r   rw   RuntimeErrorr!   zipr{   listkeysintr   
PARAM_NAMErk   r   )rn   r  r  r   r  r   module_stater  readerr   r  	optim_keyr   flattened_osdmetadatar   r   r  r   	opt_staterq   rq   rr   load_checkpointL  s   




	

zFSDPStrategy.load_checkpoint)rc   N)r   r   rc   Nrs   )r   )Nr   )T)Ir   
__module____qualname____doc__strategy_namerN   r   r   __annotations__r   r   r   r   r   r   r:   r   r   boolr   r   r   r   r"  r   rg   propertyr   rv   rx   rU   r|   r;   rT   setterr	   r   r   r   r   r   r   r   r   r   r   r   r   r
   r   r   r   r>   r   r   r8   r   r   r   classmethodr   r   r   r   r   r   r   r   r   r7   r  r*  __classcell__rq   rq   ro   rr   rL   b   s  
 3	
1
" 
"
#$rL   )tloggingr  
contextlibr   r   datetimer   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   r   r   r   r   "lightning_utilities.core.rank_zeror   r   r   torch.nnr   torch.optimr   typing_extensionsr   pytorch_lightningpllightning_fabric.pluginsr   r   5lightning_fabric.plugins.collectives.torch_collectiver   lightning_fabric.strategiesr    lightning_fabric.strategies.fsdpr   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   *lightning_fabric.strategies.model_parallelr*   &lightning_fabric.utilities.distributedr+   r,   r-   r.   r0   r   "lightning_fabric.utilities.importsr1   lightning_fabric.utilities.initr2   lightning_fabric.utilities.loadr3   r4   $lightning_fabric.utilities.optimizerr5   lightning_fabric.utilities.seedr6    lightning_fabric.utilities.typesr7   r8    pytorch_lightning.core.optimizerr9   #pytorch_lightning.plugins.precisionr:   (pytorch_lightning.plugins.precision.fsdpr;   8pytorch_lightning.strategies.launchers.subprocess_scriptr<   %pytorch_lightning.strategies.parallelr=   %pytorch_lightning.strategies.strategyr>    pytorch_lightning.trainer.statesr?   )pytorch_lightning.utilities.model_helpersr@   %pytorch_lightning.utilities.rank_zerorA   rB   r   rC   r   rD   rE   rF   r   rG   r0  r"  rZ   r^   	getLoggerr   r   rL   rq   rq   rq   rr   <module>   sV   <@$
