o
    oit                  
   @   s  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2m3Z3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z; d dl<m=Z=m>Z>m?Z? e	rd dl@mAZA eee  ZBeeBeeeCeDgeCf f ZEG dd de*e3ZFded dedefdd ZGdeBd!ed"e
de
def
d#d$ZHdeeB dedefd%d&ZIG d'd( d(e2ZJdS ))    N)	ExitStacknullcontext)partial)Path)TYPE_CHECKINGAnyCallableContextManagerDictListLiteralOptionalSetTupleTypeUnion)Tensor)Module)	Optimizer)
DataLoader)override)Accelerator)_XLA_AVAILABLE)XLAPrecision)XLAEnvironment)XLACheckpointIO)ParallelStrategy_StrategyRegistry)_apply_filter)_XLALauncher)
TBroadcast_BackwardSyncControl_Sharded!_validate_keys_for_strict_loading)get_filesystem)
_EmptyInit)rank_zero_onlyrank_zero_warn)_PATHOptimizableReduceOpMpDeviceLoaderc                       sx  e Zd ZdZ								dkdee deeej  dee	 dee
 d	ee d
ee ded dededdf fddZeedejfddZedefddZeede	fddZejedee	 ddfddZeede
fddZejedee
 ddfddZeedef fddZeedef fdd Zeedef fd!d"Zeedef fd#d$Zedld%d&Zedl fd'd(Zed)ed*ee  de!eee  f fd+d,Z"ed)edefd-d.Z#ed)eddfd/d0Z$dmd1ee de%fd2d3Z&ede%fd4d5Z'ed6e(dd7fd8d9Z)ed:e de fd;d<Z*ed:e+dedefd=d>Z,e	?	@dnd)ed:e dAe-e.ef dBe-e.ef dCede/fdDdEZ0ed)ed:e dFe-e.ef ddfdGdHZ1edodIe/dJee dKede/fdLdMZ2e	dpdNe-e/ef dJee dOee-e3e4f  de/fdPdQZ5edmdRee4 dSededdfdTdUZ6edqdWe7dXede7fdYdZZ8e		dpd[e9d\e:e4e-ee ef f d]ee d^ee:e4e;e4egef f  ddf
d_d`Z<d[e=d\e:e4e-ee ef f d]ee d^ee:e4e;e4egef f  ddf
dadbZ>e		@drd[e9d\ee-ee e:e4e-ee ef f f  dcede:e4ef fdddeZ?e@edfeAddfdgdhZBde:fdidjZC  ZDS )sXLAFSDPStrategya  Strategy for training multiple XLA devices using the
    :func:`torch_xla.distributed.xla_fully_sharded_data_parallel.XlaFullyShardedDataParallel` method.

    .. warning::  This is an :ref:`experimental <versioning:Experimental API>` feature.

    For more information check out https://github.com/pytorch/xla/blob/master/docs/fsdp.md

    Args:
        auto_wrap_policy: Same as ``auto_wrap_policy`` parameter in
            :class:`torch_xla.distributed.fsdp.XlaFullyShardedDataParallel`.
            For convenience, this also accepts a set of the layer classes to wrap.
        activation_checkpointing_policy: Used when selecting the modules for
            which you want to enable activation checkpointing. Enabling this can free up a significant amount of memory
            at the cost of speed since activations in these layers need to be recomputed during backpropagation.
            This accepts a set of the layer classes to wrap.

        state_dict_type: The format in which the state of the model and optimizers gets saved into the checkpoint.

            - ``"full"``: The full weights and optimizer states get assembled on rank 0 and saved to a single file.
            - ``"sharded"``: Each rank saves its shard of weights and optimizer states to a file. The checkpoint is
              a folder with files for each shard in the host. Note that TPU VM multihost does not have a shared
              filesystem.

        sequential_save: With this enabled, individual ranks consecutively save their state dictionary shards, reducing
            peak system RAM usage, although it elongates the saving process.
        \**kwargs: See available parameters in :class:`torch_xla.distributed.fsdp.XlaFullyShardedDataParallel`.

    NshardedFacceleratorparallel_devicescheckpoint_io	precisionauto_wrap_policyactivation_checkpointing_policystate_dict_type)fullr.   sequential_savekwargsreturnc	           
         sX   t sttt t j||t ||d t | _|| _|| _	|	| _
|| _|| _d| _d S )N)r/   r0   cluster_environmentr1   r2   F)r   ModuleNotFoundErrorstrsuper__init__r   _XLAFSDPBackwardSyncControl_backward_sync_control_auto_wrap_policy _activation_checkpointing_policy_fsdp_kwargs_state_dict_type_sequential_save	_launched)
selfr/   r0   r1   r2   r3   r4   r5   r7   r8   	__class__ X/home/ubuntu/.local/lib/python3.10/site-packages/lightning/fabric/strategies/xla_fsdp.pyr>   S   s    
zXLAFSDPStrategy.__init__c                 C   s(   | j stddd lm  m} | S )NzFAccessing the XLA device before processes have spawned is not allowed.r   )rF   RuntimeErrortorch_xla.core.xla_modelcore	xla_model
xla_device)rG   xmrJ   rJ   rK   root_deviceq   s   zXLAFSDPStrategy.root_devicec                 C   s   | j d ur
t| j S dS Nr   )r0   lenrG   rJ   rJ   rK   num_processesz   s   zXLAFSDPStrategy.num_processesc                 C   s&   | j }|d urt|tsJ |S t S N)_checkpoint_io
isinstancer   rG   pluginrJ   rJ   rK   r1   ~   s
   zXLAFSDPStrategy.checkpoint_ioioc                 C   *   |d urt |tstd| || _d S )NzHThe XLA strategy can only work with the `XLACheckpointIO` plugin, found )rY   r   	TypeErrorrX   )rG   r\   rJ   rJ   rK   r1         
c                 C   s(   | j }|d urt|tsJ |S tdS )Nz32-true)
_precisionrY   r   rZ   rJ   rJ   rK   r2      s
   zXLAFSDPStrategy.precisionc                 C   r]   )NzJThe XLA FSDP strategy can only work with the `XLAPrecision` plugin, found )rY   r   r^   r`   )rG   r2   rJ   rJ   rK   r2      r_   c                       | j rt jS dS rS   )rF   r=   global_rankrU   rH   rJ   rK   rb         zXLAFSDPStrategy.global_rankc                    ra   rS   )rF   r=   
local_rankrU   rH   rJ   rK   rd      rc   zXLAFSDPStrategy.local_rankc                    ra   rS   )rF   r=   	node_rankrU   rH   rJ   rK   re      rc   zXLAFSDPStrategy.node_rankc                    ra   )N   )rF   r=   
world_sizerU   rH   rJ   rK   rg      rc   zXLAFSDPStrategy.world_sizec                 C   s   t | | _d S rW   )r   	_launcherrU   rJ   rJ   rK   _configure_launcher   s   z#XLAFSDPStrategy._configure_launcherc                    sN   | j d usJ t| j dkrtdt| j dd| _| jt_t	 
  d S )Nrf   zThe z does not support running on a single device with the PjRT runtime. Try using all devices or the `SingleDeviceXLAStrategy` strategyT)r0   rT   NotImplementedErrortype__name__rF   rb   r&   rankr=   setup_environmentrU   rH   rJ   rK   rn      s   z!XLAFSDPStrategy.setup_environmentmodule
optimizersc                 C   s   t dt| j d)z]Returns NotImplementedError since for XLAFSDP optimizer setup must happen after module setup.zThe `z` does not support the joint setup of module and optimizer(s). Please do it in this order: Create the model, call `setup_module`, create the optimizer, call `setup_optimizer`.)rj   rk   rl   )rG   ro   rp   rJ   rJ   rK   setup_module_and_optimizers   s   z+XLAFSDPStrategy.setup_module_and_optimizersc                    sd   ddl m  |  }t fdd| D r"d|v r"td |d= t| s0 dd|i|}|S )	Nr   XlaFullyShardedDataParallelc                 3   s    | ]}t | V  qd S rW   rY   ).0modXLAFSDPrJ   rK   	<genexpr>   s    z/XLAFSDPStrategy.setup_module.<locals>.<genexpr>r3   zoA XLAFSDP `auto_wrap_policy` is set, but at least one submodule is already wrapped. The policy will be ignored.ro   rJ   )torch_xla.distributed.fsdprs   _parse_fsdp_kwargsanymodulesr'   rY   )rG   ro   r8   rJ   rw   rK   setup_module   s   "
zXLAFSDPStrategy.setup_modulec                 C   s   d S rW   rJ   )rG   ro   rJ   rJ   rK   module_to_device   s   z XLAFSDPStrategy.module_to_device
empty_initc                 C   sD   | j  }|  }t }|tt|d || || |S )N)enabled)r2   module_init_contextmodule_sharded_contextr   enter_contextr%   bool)rG   r   precision_init_ctxmodule_sharded_ctxstackrJ   rJ   rK   r      s   


z#XLAFSDPStrategy.module_init_contextc                 C   s   t  S rW   )r   rU   rJ   rJ   rK   r      s   z&XLAFSDPStrategy.module_sharded_context
dataloaderr,   c                 C   sD   ddl m} t||r|S ||| j}|jj|_t|jdd |_|S )Nr   r+   batch_sampler)%torch_xla.distributed.parallel_loaderr,   rY   rR   _loaderdatasetgetattrr   )rG   r   r,   rJ   rJ   rK   process_dataloader   s   

z"XLAFSDPStrategy.process_dataloader	optimizerc                 C   s    t dd |jD r|S td)aL  Set up an optimizer for a model wrapped with XLAFSDP.

        This setup method doesn't modify the optimizer or wrap the optimizer. The only thing it currently does is verify
        that the optimizer was created after the model was wrapped with :meth:`setup_module` with a reference to the
        flattened parameters.

        c                 s   s*    | ]}|d  D ]	}t |ddV  qqdS )params_is_shardedFN)r   )ru   groupprJ   rJ   rK   ry     s   ( z2XLAFSDPStrategy.setup_optimizer.<locals>.<genexpr>zThe optimizer does not seem to reference any XLAFSDP parameters. HINT: Make sure to create the optimizer after setting up the model.)r|   param_groups
ValueError)rG   r   rJ   rJ   rK   setup_optimizer   s
   	zXLAFSDPStrategy.setup_optimizerc                 K   s.   |j di |}ddlm  m} |  |S )a(  Overrides default tpu optimizer_step since FSDP should not call `torch_xla.core.xla_model.optimizer_step`.
        Performs the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            **kwargs: Any extra arguments to ``optimizer.step``

        r   NrJ   )steprM   rN   rO   	mark_step)rG   r   r8   lossrQ   rJ   rJ   rK   optimizer_step  s   
zXLAFSDPStrategy.optimizer_step       @Tmax_norm	norm_typeerror_if_nonfinitec                 C   s   | j | |j||dS )zClip gradients by norm.)r   r   )r2   unscale_gradientsclip_grad_norm_)rG   ro   r   r   r   r   rJ   rJ   rK   clip_gradients_norm  s   
z#XLAFSDPStrategy.clip_gradients_normclip_valc                 C   s   t d)zClip gradients by value.z~XLA's FSDP strategy does not support to clip gradients by value. Consider clipping by norm instead or choose another strategy!)rj   )rG   ro   r   r   rJ   rJ   rK   clip_gradients_value,  s   z$XLAFSDPStrategy.clip_gradients_valuetensorr   
sync_gradsc                 C   s   | j s|S t|tstdt| j d| | dkr"|d}|j}|	| j
}ddlm  m} ddlm  m} |rD||n||}|	|}|S )aC  Function to gather a tensor from several distributed processes.

        Args:
            tensor: tensor to all-gather.
            group: unused.
            sync_grads: flag that allows users to synchronize gradients for the all-gather operation.
        Return:
            A tensor of shape (world_size, ...)

        `z4.all_gather` is only implemented for tensors. Given r   N)rF   rY   r   rj   rk   rl   dim	unsqueezedevicetorR   torch_xla.core.functionsrN   	functionsrM   rO   
all_gather)rG   r   r   r   original_devicexfrQ   rJ   rJ   rK   r   4  s   


zXLAFSDPStrategy.all_gatheroutput	reduce_opc                 C   s   t |tstj|| jd}t |to|tjk}t |to!| dv}|s&|r-t	d| dd l
m  m} |d|t}t |trM| dv rM|| j }|S )N)r   )summeanavgzaCurrently, the XLAFSDPStrategy only supports `sum`, `mean`, `avg` for the reduce operation, got: r   reduce)r   r   )rY   r   torchr   rR   r*   SUMr<   lowerr   rM   rN   rO   mesh_reducer   rg   )rG   r   r   r   invalid_reduce_opinvalid_reduce_op_strrQ   rJ   rJ   rK   
all_reduceR  s   

zXLAFSDPStrategy.all_reducenameargsc                 O   s6   | j sd S dd lm  m} |d u rd}|| d S )Nr    )rF   rM   rN   rO   
rendezvous)rG   r   r   r8   rQ   rJ   rJ   rK   barrieri  s   zXLAFSDPStrategy.barrierr   objsrcc                 C   s   | j s|S dd lm  m} t|t}|r*| dkr |d}|j}|	| j
}nt }t|| tjt| | j
tjd}|g}|j||d |d }|sdt|   }t|}|S |	|}|S )Nr   )r   dtype)root_ordinal)rF   rM   rN   rO   rY   r   r   r   r   r   rR   r\   BytesIOr   saver   	bytearray	getbufferfloatcollective_broadcastcpubytenumpyload)rG   r   r   rQ   	is_tensorr   bufferrJ   rJ   rK   	broadcastt  s.   



zXLAFSDPStrategy.broadcastpathstatestorage_optionsfilterc                    s  t | |}| rt| rtd| ddlm   fdd| D }t	|dkr3t
dt	|dkr=t
ddd	lm  m} |  | j}|d	usSJ | jrvtt	|D ]}|| jkrk| |||| | d
| d q\n| |||| | jdkrt|d }	d}
t	|| jkrtd|	d|
dddlm} | d | jr|jd }||	|
t| | j| t|t|t| | d d	S d	S )a  Save model, optimizer, and other state in the provided checkpoint directory.

        If the user specifies sharded checkpointing, the directory will contain one file per process, with model- and
        optimizer shards stored per file. If the user specifies full checkpointing, the directory will contain a
        consolidated checkpoint combining all of the sharded checkpoints.

        z:The checkpoint directory already exists and is not empty: r   rr   c                    s   g | ]	}t | r|qS rJ   rt   )ru   ro   rw   rJ   rK   
<listcomp>  s    z3XLAFSDPStrategy.save_checkpoint.<locals>.<listcomp>a  Could not find a XLAFSDP model in the provided checkpoint state. Please provide the model as part of the state like so: `save_checkpoint(..., state={'model': model, ...})`. Make sure you set up the model (and optimizers if any) through the strategy before saving the checkpoint.rf   zFound multiple XLAFSDP modules in the given state. Saving checkpoints with FSDP is currently limited to a single model per checkpoint. To save multiple models, call the save method for each model separately with a different path.Nz	wait-for-z-saver6   
checkpointz_rank-*-of-*.ptha  Multihost setups do not have a shared filesystem, so the checkpoint shards cannot be consolidated into a single checkpoint after saving them. Please switch to `XLAFSDPStrategy(state_dict_type='sharded')`. TIP: You can consolidate them manually by getting them together into a single directory and running `python -m torch_xla.distributed.fsdp.consolidate_sharded_ckpts --ckpt_prefix z --ckpt_suffix z* --save_path 'path/to/consolidated.ckpt'`.)%consolidate_sharded_model_checkpointsbefore_ckpt_consolidationzconsolidated.ckptafter_ckpt_consolidation) r   r   is_dirr|   iterdirFileExistsErrorrz   rs   valuesrT   r   rM   rN   rO   r   r0   rE   rangerd   _save_checkpoint_shardr   rD   r<   rg   OSErrorr   is_global_zeroparentr1   remove_checkpointr$   mv)rG   r   r   r   r   r}   rQ   r0   rm   ckpt_prefixckpt_suffixr   	save_pathrJ   rw   rK   save_checkpoint  sX   

	

zXLAFSDPStrategy.save_checkpointc           
      C   s   ddl m} i }| D ].\}}t|tr%t||r%| }	| |d< nt|tr/| }	n|}	t||p6i |	| q| j	j
||d| jdd| jdd |d d S )	Nr   rr   shard_metadatacheckpoint_rank-08d-of-.pth)r   )rz   rs   itemsrY   r   
state_dictget_shard_metadatar   r   r1   r   rb   rg   )
rG   r   r   r   r   rx   converted_statekeyr   	convertedrJ   rJ   rK   r     s   


z&XLAFSDPStrategy._save_checkpoint_shardstrictc                    s  |s
t d|dt| |}t|ttfrtdddlm   fdd|	 D }dd |	 D }| j
d	kr|d
| jdd| jdd }| sXt dt|dt|dkrbt dt|dkrlt dt|	 d \}}t|}	|j|	d |d |	 D ]\}
}||	|
  q|	 |  |  }| |  |  }t|||d |D ]}||v r|	| ||< || qi }t|r|D ]}|	| ||< qd|v r|d |S | j
dkrD| st dt|dt|dkst| |  |  dkrtd t|dkrt dd|vs/t|d  }tjjs3tdt|}|j|d|d |S t d| j
 )a  Given a folder, load the contents from a checkpoint and restore the state of the given objects.

        The strategy currently only supports saving and loading sharded checkpoints which are stored in form of a
        directory of multiple files rather than a single file.

        z0Got `XLAFSDPStrategy.load_checkpoint(..., state=z)` but a state with at least  a model instance to reload is required. Pass it in like so: `FSDPStrategy.load_checkpoint(..., state={'model': model, ...})`zmLoading a single module or optimizer object from a checkpoint is not supported yet with the XLAFSDP strategy.r   rr   c                    s    i | ]\}}t | r||qS rJ   rt   )ru   r   ro   rw   rJ   rK   
<dictcomp>       z3XLAFSDPStrategy.load_checkpoint.<locals>.<dictcomp>c                 S   s    i | ]\}}t |tr||qS rJ   )rY   r   )ru   r   optimrJ   rJ   rK   r     r   r.   r   r   r   r   z	The path zv does not point to valid sharded checkpoints. Make sure the path points to a directory with XLAFSDP checkpoint shards.a  Could not find a XLAFSDP model in the provided checkpoint state. Please provide the model as part of the state like so: `load_checkpoint(..., state={'model': model, ...})`. Make sure you set up the model (and optimizers if any) through the strategy before loading the checkpoint.rf   zFound multiple XLAFSDP modules in the given state. Loading checkpoints with FSDP is currently limited to a single model per checkpoint. To load multiple models, call the load method for each model separately with a different path.model)r   r   r6   zt does not point to a valid full checkpoint. Make sure the path points to a directory with a full XLAFSDP checkpoint.ztLoading a full checkpoint will only load the full model. The optimizer and any additional metadata are not included.zmFound a XLAFSDP model in the provided checkpoint state. Please provide the model without any XLAFSDP wrapper.zFXLAFSDP only supports a single model instance with 'model' as the key.zUnknown state_dict_type: )r   r   r   rY   r   r   rj   rz   rs   r   rD   rb   rg   is_filer<   rT   listr   r   load_state_dictkeysr#   removepopr'   nn)rG   r   r   r   r}   rp   file_ro   sharded_ckptopt_keyoptloaded_metadata_keysrequested_metadata_keysr   metadatar   	full_ckptrJ   rw   rK   load_checkpoint  s   




0"
zXLAFSDPStrategy.load_checkpointstrategy_registryc                 C   s   |j d| | jd d S )Nxla_fsdp)description)registerrl   )clsr  rJ   rJ   rK   register_strategiesg  s   z#XLAFSDPStrategy.register_strategiesc                 C   s@   | j  }| j}t|tr|d|j t| j|}t	| j
|S )Ncompute_dtype)rC   copyr2   rY   r   
setdefault_desired_dtype_auto_wrap_policy_kwargsrA    _activation_checkpointing_kwargsrB   )rG   r8   r2   rJ   rJ   rK   r{   l  s   

z"XLAFSDPStrategy._parse_fsdp_kwargs)NNNNNNr.   F)r9   NrW   )r   T)NF)NN)r   )NT)Erl   
__module____qualname____doc__r   r   r   r   r   r   r   _POLICY_POLICY_SETr   r   r   r>   propertyr   rR   intrV   r1   setterr2   rb   rd   re   rg   ri   rn   r   r   r   rq   r~   r   r	   r   r   r   r   r   r)   r   r   r   r   r   r   r   r*   r<   r   r   r    r   r(   r
   r   r   r   r   r  classmethodr   r  r{   __classcell__rJ   rJ   rH   rK   r-   5   sB   	

	

$"
"
"K
"
gr-   policyr  r8   r9   c                 C   s:   | d u r|S t | trddlm} t|| d} | |d< |S )Nr   )transformer_auto_wrap_policy)transformer_layer_clsr3   )rY   settorch_xla.distributed.fsdp.wrapr$  r   )r#  r8   r$  rJ   rJ   rK   r  x  s   
r  ro   r   c                 O   sH   ddl m} ddl m} t|t| r||n|}||g|R i |S )Nr   rr   )checkpoint_module)rz   rs   r(  rY   tuple)r#  ro   r   r8   rx   r(  rJ   rJ   rK   &_activation_checkpointing_auto_wrapper  s   r*  c                 C   sH   | s|S d|v rt dt| tstd|  dtt| }||d< |S )Nauto_wrapper_callablez]You cannot set both `auto_wrapper_callable` and `activation_checkpointing_policy`. Choose onez7`activation_checkpointing_policy` must be a set, found zC. You can try defining and passing `auto_wrapper_callable` instead.)r   rY   r&  r^   r   r*  )r#  r8   r+  rJ   rJ   rK   r    s   


r  c                   @   s&   e Zd ZedededefddZdS )r?   ro   r   r9   c                 C   sF   |st  S ddlm} t||std| jj d|jj d| S )z|Blocks gradient synchronization inside the :class:`~torch_xla.distributed.fsdp.XlaFullyShardedDataParallel`
        wrapper.r   rr   zABlocking backward sync is only possible if the module passed to `zE.no_backward_sync` is wrapped in `XlaFullyShardedDataParallel`. Got: .)r   rz   rs   rY   r^   rI   rl   no_sync)rG   ro   r   rx   rJ   rJ   rK   no_backward_sync  s   
z,_XLAFSDPBackwardSyncControl.no_backward_syncN)rl   r  r  r   r   r   r	   r.  rJ   rJ   rJ   rK   r?     s    r?   )Kr\   
contextlibr   r   	functoolsr   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   r   r   r   r   torch.nnr   torch.optimr   torch.utils.datar   typing_extensionsr   lightning.fabric.acceleratorsr   !lightning.fabric.accelerators.xlar   lightning.fabric.pluginsr   %lightning.fabric.plugins.environmentsr   lightning.fabric.plugins.io.xlar   lightning.fabric.strategiesr   r    lightning.fabric.strategies.fsdpr   )lightning.fabric.strategies.launchers.xlar   $lightning.fabric.strategies.strategyr    r!   r"   r#   #lightning.fabric.utilities.cloud_ior$   lightning.fabric.utilities.initr%   $lightning.fabric.utilities.rank_zeror&   r'    lightning.fabric.utilities.typesr(   r)   r*   r   r,   r  r   r  r  r-   r  r*  r  r?   rJ   rJ   rJ   rK   <module>   sJ   8    G