o
    zi,                     @   s  d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z( erd dl)m*Z* G dd deZ+dS )    N)TYPE_CHECKINGAnyCallableDictListOptionalUnion)Tensor)Module)	Optimizer)
DataLoader)override)Accelerator)_XLA_GREATER_EQUAL_2_1)XLAPrecision)XLAEnvironment)XLACheckpointIO)ParallelStrategy_StrategyRegistry)_XLALauncher)
TBroadcast)rank_zero_only)_PATHReduceOpMpDeviceLoaderc                       s  e Zd ZdZ					dIdee deeej  dee	 dee
 ded	df fd
dZeed	ejfddZed	efddZeed	e	fddZejedee	 d	dfddZeed	e
fddZejedee
 d	dfddZeed	ef fddZeed	ef fddZeed	ef fddZeed	ef fddZedJdd ZedJ fd!d"Zed#ed	efd$d%Zed#ed	dfd&d'Zed(ed	d)fd*d+ZedKd-e d.ee! d/ed	e fd0d1Z"e	dLd2e#e e!f d.ee! d3ee#e$e%f  d	e fd4d5Z&edMd6ee% d7e!d8e!d	dfd9d:Z'edNd<e(d=ed	e(fd>d?Z)e		dLd@e*dAe+e%e#ee,e!f f dBee! dCee+e%e-e%e!gef f  d	df
 fdDdEZ.e/edFe0d	dfdGdHZ1  Z2S )OXLAStrategyzxStrategy for training multiple TPU devices using the :func:`torch_xla.distributed.xla_multiprocessing.spawn`
    method.NTacceleratorparallel_devicescheckpoint_io	precisionsync_module_statesreturnc                    s.   t  j||t ||d d | _d| _|| _d S )N)r   r   cluster_environmentr   r    F)super__init__r   _backward_sync_control	_launched_sync_module_states)selfr   r   r   r    r!   	__class__ S/home/ubuntu/.local/lib/python3.10/site-packages/lightning_fabric/strategies/xla.pyr%   +   s   
zXLAStrategy.__init__c                 C   s(   | j stddd lm  m} | S )NzFAccessing the XLA device before processes have spawned is not allowed.r   )r'   RuntimeErrortorch_xla.core.xla_modelcore	xla_model
xla_device)r)   xmr,   r,   r-   root_device>   s   zXLAStrategy.root_devicec                 C   s   | j d ur
t| j S dS Nr   )r   lenr)   r,   r,   r-   num_processesG   s   zXLAStrategy.num_processesc                 C   s&   | j }|d urt|tsJ |S t S N)_checkpoint_io
isinstancer   r)   pluginr,   r,   r-   r   K   s
   zXLAStrategy.checkpoint_ioioc                 C   *   |d urt |tstd| || _d S )NzHThe XLA strategy can only work with the `XLACheckpointIO` plugin, found )r;   r   	TypeErrorr:   )r)   r>   r,   r,   r-   r   T      
c                 C   s(   | j }|d urt|tsJ |S tdS )Nz32-true)
_precisionr;   r   r<   r,   r,   r-   r    [   s
   zXLAStrategy.precisionc                 C   r?   )NzEThe XLA strategy can only work with the `XLAPrecision` plugin, found )r;   r   r@   rB   )r)   r    r,   r,   r-   r    d   rA   c                       | j rt jS dS r5   )r'   r$   global_rankr7   r*   r,   r-   rD   k      zXLAStrategy.global_rankc                    rC   r5   )r'   r$   
local_rankr7   r*   r,   r-   rF   p   rE   zXLAStrategy.local_rankc                    rC   r5   )r'   r$   	node_rankr7   r*   r,   r-   rG   u   rE   zXLAStrategy.node_rankc                    rC   )N   )r'   r$   
world_sizer7   r*   r,   r-   rI   z   rE   zXLAStrategy.world_sizec                 C   s   t | | _d S r9   )r   	_launcherr7   r,   r,   r-   _configure_launcher   s   zXLAStrategy._configure_launcherc                    sN   | j d usJ t| j dkrtdt| j dd| _| jt_t	 
  d S )NrH   zThe z does not support running on a single device with the PjRT runtime. Try using all devices or the `SingleDeviceXLAStrategy` strategyT)r   r6   NotImplementedErrortype__name__r'   rD   r   rankr$   setup_environmentr7   r*   r,   r-   rP      s   zXLAStrategy.setup_environmentmodulec                 C   s0   | j rtrddlm} nddlm} || |S )Nr   )broadcast_master_param)r(   r   r/   rR   torch_xla.experimental.pjrt)r)   rQ   rR   r,   r,   r-   setup_module   s   zXLAStrategy.setup_modulec                 C   s   | | j d S r9   )tor4   )r)   rQ   r,   r,   r-   module_to_device   s   zXLAStrategy.module_to_device
dataloaderr   c                 C   sD   ddl m} t||r|S ||| j}|jj|_t|jdd |_|S )Nr   r   batch_sampler)%torch_xla.distributed.parallel_loaderr   r;   r4   _loaderdatasetgetattrrX   )r)   rW   r   r,   r,   r-   process_dataloader   s   

zXLAStrategy.process_dataloaderFtensorgroup
sync_gradsc                 C   s   | j s|S t|tstdt| j d| | dkr"|d}|j}|	| j
}ddlm  m} ddlm  m} |rD||n||}|	|}|S )aC  Function to gather a tensor from several distributed processes.

        Args:
            tensor: tensor to all-gather.
            group: unused.
            sync_grads: flag that allows users to synchronize gradients for the all-gather operation.
        Return:
            A tensor of shape (world_size, ...)

        `z4.all_gather` is only implemented for tensors. Given r   N)r'   r;   r	   rL   rM   rN   dim	unsqueezedevicerU   r4   torch_xla.core.functionsr0   	functionsr/   r1   
all_gather)r)   r^   r_   r`   original_devicexfr3   r,   r,   r-   rg      s   


zXLAStrategy.all_gatheroutput	reduce_opc                 C   s   t |tstj|| jd}t |to|tjk}t |to!| dv}|s&|r-t	d| dd l
m  m} |d|t}t |trM| dv rM|| j }|S )N)rd   )summeanavgz]Currently, the XLAStrategy only supports `sum`, `mean`, `avg` for the reduce operation, got: r   reduce)rn   rm   )r;   r	   torchr^   r4   r   SUMstrlower
ValueErrorr/   r0   r1   mesh_reducerl   rI   )r)   rj   r_   rk   invalid_reduce_opinvalid_reduce_op_strr3   r,   r,   r-   
all_reduce   s   

zXLAStrategy.all_reducenameargskwargsc                 O   s6   | j sd S dd lm  m} |d u rd}|| d S )Nr    )r'   r/   r0   r1   
rendezvous)r)   ry   rz   r{   r3   r,   r,   r-   barrier   s   zXLAStrategy.barrierr   objsrcc                 C   s   | j s|S dd lm  m} t|t}|r*| dkr |d}|j}|	| j
}nt }t|| tjt| | j
tjd}|g}|j||d |d }|sdt|   }t|}|S |	|}|S )Nr   )rd   dtype)root_ordinal)r'   r/   r0   r1   r;   r	   rb   rc   rd   rU   r4   r>   BytesIOrp   saver^   	bytearray	getbufferfloatcollective_broadcastcpubytenumpyload)r)   r   r   r3   	is_tensorrh   bufferr,   r,   r-   	broadcast   s.   



zXLAStrategy.broadcastpathstatestorage_optionsfilterc                    s2   ddl m  m} |  t j||||d dS )a  Save model, optimizer, and other state as a checkpoint file.

        Args:
            path: A path to where the file(s) should be saved
            state: A dictionary with contents to be saved. If the dict contains modules or optimizers, their
                state-dict will be retrieved and converted automatically.
            storage_options: Additional options for the ``CheckpointIO`` plugin
            filter: An optional dictionary of the same format as ``state`` mapping keys to callables that return a
                boolean indicating whether the given parameter should be saved (``True``) or filtered out (``False``).

        r   N)r   r   )r/   r0   r1   	mark_stepr$   save_checkpoint)r)   r   r   r   r   r3   r*   r,   r-   r     s   zXLAStrategy.save_checkpointstrategy_registryc                 C   s   |j d| | jd d S )Nxla)description)registerrN   )clsr   r,   r,   r-   register_strategies-  s   zXLAStrategy.register_strategies)NNNNT)r"   N)NF)NNr9   )r   )3rN   
__module____qualname____doc__r   r   r   rp   rd   r   r   boolr%   propertyr   r4   intr8   r   setterr    rD   rF   rG   rI   rK   rP   r
   rT   rV   r   r]   r	   r   rg   r   r   rr   rx   r~   r   r   r   r   r   r   r   classmethodr   r   __classcell__r,   r,   r*   r-   r   '   s    "
"
"r   ),r>   typingr   r   r   r   r   r   r   rp   r	   torch.nnr
   torch.optimr   torch.utils.datar   typing_extensionsr   lightning_fabric.acceleratorsr   !lightning_fabric.accelerators.xlar   lightning_fabric.pluginsr   %lightning_fabric.plugins.environmentsr   lightning_fabric.plugins.io.xlar   lightning_fabric.strategiesr   r   )lightning_fabric.strategies.launchers.xlar   $lightning_fabric.strategies.strategyr   $lightning_fabric.utilities.rank_zeror    lightning_fabric.utilities.typesr   r   rY   r   r   r,   r,   r,   r-   <module>   s*   $