o
    zi3                     @   s<  d dl Z d dlZd dlmZmZmZmZmZmZ d dl	Z	d dl	m
Z
 d dlmZ d dlmZ d dlZd dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ erd dl0m1Z1 G dd de$Z2dS )    N)TYPE_CHECKINGAnyDictListOptionalUnion)Tensor)Module)override)_XLA_AVAILABLE_XLA_GREATER_EQUAL_2_1)XLACheckpointIO)XLAEnvironment)_StrategyRegistry)_optimizers_to_device)_PATHReduceOp)XLAPrecision)_WrappingCheckpointIO)DDPStrategy)_XLALauncher)
TBroadcast)	TrainerFn)find_shared_parametersset_shared_parameters)rank_zero_onlyMpDeviceLoaderc                       s0  e Zd ZdZdZ						dZded deeej  d	ee	e
ef  d
ee dedededdf fddZeede	e
ef fddZejedee	e
ef  ddfddZeedefddZejed
ee ddfddZeedejfddZeedef fddZeedef fddZeedef fddZeedef fd d!Zed[d"d#Zed\d&d'Zed(edefd)d*Zeedee ef fd+d,Z!ed-e"dd.fd/d0Z#ed[d1d2Z$ed[d3d4Z%ed]d5ee  d6ed7eddfd8d9Z&ed^d;e'd<ede'fd=d>Z(e	d_d?e	e)ef d@ee dAee	e*e f  de)fdBdCZ+ed[ fdDdEZ,ed[dFdGZ-ed[dHdIZ.e	d]dJee ef dKe/dLee ddf fdMdNZ0edKe/ddfdOdPZ1ed`dQe)d@ee dRede)fdSdTZ2ed[ fdUdVZ3e4edWe5ddfdXdYZ6  Z7S )aXLAStrategyzxStrategy for training multiple TPU devices using the :func:`torch_xla.distributed.xla_multiprocessing.spawn`
    method.xlaNFTacceleratorzpl.accelerators.Acceleratorparallel_devicescheckpoint_ioprecision_plugindebugsync_module_states_returnc                    s@   t sttt t j||t ||dd || _d| _|| _d S )Nfork)r    r!   cluster_environmentr"   r#   start_methodF)	r   ModuleNotFoundErrorstrsuper__init__r   r$   	_launched_sync_module_states)selfr    r!   r"   r#   r$   r%   r&   	__class__ T/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/strategies/xla.pyr.   1   s   

zXLAStrategy.__init__c                 C   s*   | j }|d urt|ttfsJ |S t S N)_checkpoint_io
isinstancer   r   r1   pluginr4   r4   r5   r"   I   s
   zXLAStrategy.checkpoint_ioioc                 C   s.   |d urt |ttfstd| || _d S )NzHThe XLA strategy can only work with the `XLACheckpointIO` plugin, found )r8   r   r   	TypeErrorr7   )r1   r;   r4   r4   r5   r"   R   s   
c                 C   s&   | j }|d urt|tsJ |S t S r6   )_precision_pluginr8   r   r9   r4   r4   r5   r#   Y   s
   zXLAStrategy.precision_pluginc                 C   s*   |d urt |tstd| || _d S )NzEThe XLA strategy can only work with the `XLAPrecision` plugin, found )r8   r   r<   r=   )r1   r#   r4   r4   r5   r#   b   s   
c                 C   s(   | j stddd lm  m} | S )NzFAccessing the XLA device before processes have spawned is not allowed.r   )r/   RuntimeErrortorch_xla.core.xla_modelcore	xla_model
xla_device)r1   xmr4   r4   r5   root_devicei   s   zXLAStrategy.root_devicec                       | j rt jS dS Nr   )r/   r-   global_rankr1   r2   r4   r5   rG   r      zXLAStrategy.global_rankc                    rE   rF   )r/   r-   
local_rankrH   r2   r4   r5   rJ   w   rI   zXLAStrategy.local_rankc                    rE   rF   )r/   r-   	node_rankrH   r2   r4   r5   rK   |   rI   zXLAStrategy.node_rankc                    rE   )N   )r/   r-   
world_sizerH   r2   r4   r5   rM      rI   zXLAStrategy.world_sizec                 C   s   t | | _d S r6   )r   	_launcherrH   r4   r4   r5   _configure_launcher   s   zXLAStrategy._configure_launchertrainer
pl.Trainerc                 C   s   | j d usJ | j | | jrdtjd< | jd usJ | j| j t| j}| 	  t
| j| | | j| _| jrPtrEddlm} nddlm} || j |jjtjkr\| | |   |jjtjkrpt| j| j d S d S )N1PT_XLA_DEBUGr   )broadcast_master_param)r    setupr$   osenvironmodelr#   convert_moduler   model_to_devicer   _setup_modelr0   r   r?   rT   torch_xla.experimental.pjrtstatefnr   FITTINGsetup_optimizerssetup_precision_pluginr   
optimizersrD   )r1   rP   shared_paramsrT   r4   r4   r5   rU      s*   



zXLAStrategy.setuprX   c                 C   s   |S r6   r4   )r1   rX   r4   r4   r5   r[         zXLAStrategy._setup_modelc                 C   s   | j | jdS )N)num_replicasrank)rM   rG   rH   r4   r4   r5   distributed_sampler_kwargs   s   z&XLAStrategy.distributed_sampler_kwargs
dataloaderr   c                 C   sD   ddl m} t||r|S ||| j}|jj|_t|jdd |_|S )Nr   r   batch_sampler)%torch_xla.distributed.parallel_loaderr   r8   rD   _loaderdatasetgetattrri   )r1   rh   r   r4   r4   r5   process_dataloader   s   

zXLAStrategy.process_dataloaderc                 C      d S r6   r4   rH   r4   r4   r5   configure_ddp   rd   zXLAStrategy.configure_ddpc                 C   s"   | j d usJ | j | j| _ d S r6   )rX   torD   rH   r4   r4   r5   rZ      s   zXLAStrategy.model_to_devicenameargskwargsc                 O   s6   | j sd S dd lm  m} |d u rd}|| d S )Nr    )r/   r?   r@   rA   
rendezvous)r1   rr   rs   rt   rC   r4   r4   r5   barrier   s   zXLAStrategy.barrierr   objsrcc                 C   s   | j s|S dd lm  m} t|t}|r*| dkr |d}|j}|	| j
}nt }t|| tjt| | j
tjd}|g}|j||d |d }|sdt|   }t|}|S |	|}|S )Nr   )devicedtype)root_ordinal)r/   r?   r@   rA   r8   r   dim	unsqueezerz   rq   rD   r;   BytesIOtorchsavetensor	bytearray	getbufferfloatcollective_broadcastcpubytenumpyload)r1   rx   ry   rC   	is_tensororiginal_devicebufferr4   r4   r5   	broadcast   s.   



zXLAStrategy.broadcastoutputgroup	reduce_opc                 C   s   t |tstj|| jd}t |to|tjk}t |to!| dv}|s&|r-t	d| dd l
m  m} |d|t}t |trM| dv rM|| j }|S )N)rz   )summeanavgz]Currently, the XLAStrategy only supports `sum`, `mean`, `avg` for the reduce operation, got: r   reduce)r   r   )r8   r   r   r   rD   r   SUMr,   lower
ValueErrorr?   r@   rA   mesh_reducer   rM   )r1   r   r   r   invalid_reduce_opinvalid_reduce_op_strrC   r4   r4   r5   r      s   

zXLAStrategy.reducec                    s   d| _ t   d S )NT)r/   r-   setup_environmentrH   r2   r4   r5   r     s   zXLAStrategy.setup_environmentc                 C   s0   | j d usJ t| j dkrtd| jt_d S )NrL   zThe `XLAStrategy` does not support running on a single device with the PjRT runtime. Try using all devices or the `SingleDeviceXLAStrategy` strategy)r!   lenNotImplementedErrorrG   r   rf   rH   r4   r4   r5   setup_distributed  s   zXLAStrategy.setup_distributedc                 C   ro   r6   r4   rH   r4   r4   r5   set_world_ranks!  s   zXLAStrategy.set_world_ranks
checkpointfilepathstorage_optionsc                    s0   dd l m  m} |  t j|||d d S )Nr   )r   )r?   r@   rA   	mark_stepr-   save_checkpoint)r1   r   r   r   rC   r2   r4   r5   r   (  s   zXLAStrategy.save_checkpointc                 C   s   | j dkr| j| dS dS )zqRemove checkpoint filepath from the filesystem.

        Args:
            filepath: Path to checkpoint

        r   N)rJ   r"   remove_checkpoint)r1   r   r4   r4   r5   r   3  s   
zXLAStrategy.remove_checkpointr   
sync_gradsc                 C   s   | j s|S t|tstdt| j d| | dkr"|d}|j}|	| j
}ddlm  m} ddlm  m} |rD||n||}|	|}|S )aC  Function to gather a tensor from several distributed processes.

        Args:
            tensor: tensor to all-gather.
            group: unused.
            sync_grads: flag that allows users to synchronize gradients for the all-gather operation.
        Return:
            A tensor of shape (world_size, ...)

        `z4.all_gather` is only implemented for tensors. Given r   N)r/   r8   r   r   type__name__r}   r~   rz   rq   rD   torch_xla.core.functionsr@   	functionsr?   rA   
all_gather)r1   r   r   r   r   xfrC   r4   r4   r5   r   >  s   


zXLAStrategy.all_gatherc                    s"   t    d| _tjdd  d S )NFrS   )r-   teardownr/   rV   rW   poprH   r2   r4   r5   r   \  s   
zXLAStrategy.teardownstrategy_registryc                 C   s*   |j d| ddd |j | j| | jd d S )N	xla_debugz!XLA strategy with `debug` as TrueT)descriptionr$   )r   )registerstrategy_namer   )clsr   r4   r4   r5   register_strategiesb  s   
zXLAStrategy.register_strategies)NNNNFT)r'   N)rP   rQ   r'   Nr6   )r   )NN)NF)8r   
__module____qualname____doc__r   r   r   r   rz   r   r   r   r   boolr   r.   propertyr
   r"   setterr#   rD   intrG   rJ   rK   rM   rO   rU   r	   r[   r   r,   rg   objectrn   rp   rZ   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   classmethodr   r   __classcell__r4   r4   r2   r5   r   +   s    	"""



"r   )3r;   rV   typingr   r   r   r   r   r   r   r   torch.nnr	   typing_extensionsr
   pytorch_lightningpl!lightning_fabric.accelerators.xlar   r   lightning_fabric.pluginsr   %lightning_fabric.plugins.environmentsr   lightning_fabric.strategiesr   $lightning_fabric.utilities.optimizerr    lightning_fabric.utilities.typesr   r   pytorch_lightning.pluginsr   $pytorch_lightning.plugins.io.wrapperr    pytorch_lightning.strategies.ddpr   *pytorch_lightning.strategies.launchers.xlar   %pytorch_lightning.strategies.strategyr    pytorch_lightning.trainer.statesr   pytorch_lightning.utilitiesr   r   %pytorch_lightning.utilities.rank_zeror   rj   r   r   r4   r4   r4   r5   <module>   s2    