o
    ci(                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	Z	d dl
mZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d	d
 Ze ZeejedkZeejedk Zee	jedkZeone	j Zzd dlmZ W n e y   d dl!mZ Y nw erej"j#Z#nej"j$Z#erd dl%m&Z&m'Z'm(Z( e )e*Z+dZ,eddG dd dej"j-Z.eddG dd de#Z/eddG dd dej"j0Z1eddG dd deZ2edddej3dej3fddZ4eddG dd  d ej5j6Z7dS )!    N)Path)AnyDict)Version)train)TagKeyrecord_extra_usage_tag)
Checkpoint)	PublicAPIc                  C   s0   z	dd l m}  W | S  ty   dd l} Y | S w )Nr   )lightning.pytorchpytorchModuleNotFoundErrorpytorch_lightning)pl r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/lightning/_lightning_utils.pyimport_lightning   s   
r   z2.0.0z2.1.0z1.12.0)LightningEnvironment)FullStateDictConfigFullyShardedDataParallelStateDictType
_report_onbeta)	stabilityc                       N   e Zd ZdZ fddZedejfddZede	e
ef fddZ  ZS )	RayDDPStrategya  Subclass of DDPStrategy to ensure compatibility with Ray orchestration.

    For a full list of initialization arguments, please refer to:
    https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html

    Note that `process_group_backend`, `timeout`, and `start_method` are disabled here,
    please specify these arguments in :class:`~ray.train.torch.TorchConfig` instead.
    c                    "   t  j|i | ttjd d S N1)super__init__r   r   TRAIN_LIGHTNING_RAYDDPSTRATEGYselfargskwargs	__class__r   r   r    C      zRayDDPStrategy.__init__returnc                 C      t jj S Nrayr   torch
get_devicer#   r   r   r   root_deviceG      zRayDDPStrategy.root_devicec                 C      t | j| jdS N)num_replicasrankdict
world_sizeglobal_rankr0   r   r   r   distributed_sampler_kwargsK      z)RayDDPStrategy.distributed_sampler_kwargs__name__
__module____qualname____doc__r    propertyr.   devicer1   r   strr   r;   __classcell__r   r   r&   r   r   8   s    	 r   c                       sh   e Zd ZdZ fddZedejfddZede	e
ef fddZde	e
ef f fd	d
Z  ZS )RayFSDPStrategya  Subclass of FSDPStrategy to ensure compatibility with Ray orchestration.

    For a full list of initialization arguments, please refer to:
    https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html

    .. note::
        It is recommended to upgrade `lightning>=2.1` or above when using FSDP
        with Lightning, since Lightning starts to natively support `state_dict_type`,
        `sharding_strategy`, `auto_wrap_policy` and other FSDP configurations from 2.1.
    c                    r   r   )r   r    r   r   TRAIN_LIGHTNING_RAYFSDPSTRATEGYr"   r&   r   r   r    `   r(   zRayFSDPStrategy.__init__r)   c                 C   r*   r+   r,   r0   r   r   r   r1   d   r2   zRayFSDPStrategy.root_devicec                 C   r3   r4   r7   r0   r   r   r   r;   h   r<   z*RayFSDPStrategy.distributed_sampler_kwargsc                    s   | j dus	J dtrYtrYtrYtj| j tjtdddd2 | j 	 }i }t
d}| D ]\}}|drA||d }|||< q-|||< q-|W  d   S 1 sRw   Y  dS t  S )a  Gathers the full state dict to rank 0 on CPU.

        FSDP checkpointing is broken in Lightning 2.0.x. This subclass patches the
        behavior to perform a full state dict checkpointing, gathering the checkpoint
        shards on rank 0 CPU. Upgrade to `lightning>=2.1` to do sharded state dict
        checkpointing.

        See the note in the class docstring for more details.
        Nz.Failed to get the state dict for a None model!T)offload_to_cpu
rank0_only)modulestate_dict_typestate_dict_configz_forward_module.)model_TORCH_FSDP_AVAILABLE_LIGHTNING_GREATER_EQUAL_2_0_LIGHTNING_LESS_THAN_2_1r   rK   r   FULL_STATE_DICTr   
state_dictlenitems
startswithr   lightning_module_state_dict)r#   rR   ckpt_state_dict
prefix_lenkvnon_prefixed_keyr&   r   r   rV   o   s2   



$
z+RayFSDPStrategy.lightning_module_state_dict)r>   r?   r@   rA   r    rB   r.   rC   r1   r   rD   r   r;   rV   rE   r   r   r&   r   rF   S   s    "rF   c                       r   )	RayDeepSpeedStrategyzSubclass of DeepSpeedStrategy to ensure compatibility with Ray orchestration.

    For a full list of initialization arguments, please refer to:
    https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DeepSpeedStrategy.html
    c                    r   r   )r   r    r   r   $TRAIN_LIGHTNING_RAYDEEPSPEEDSTRATEGYr"   r&   r   r   r       r(   zRayDeepSpeedStrategy.__init__r)   c                 C   r*   r+   r,   r0   r   r   r   r1      r2   z RayDeepSpeedStrategy.root_devicec                 C   r3   r4   r7   r0   r   r   r   r;      r<   z/RayDeepSpeedStrategy.distributed_sampler_kwargsr=   r   r   r&   r   r\      s     r\   c                       s   e Zd ZdZ fddZdefddZdefddZdefd	d
ZdefddZ	deddfddZ
deddfddZdd Z  ZS )RayLightningEnvironmentz9Setup Lightning DDP training environment for Ray cluster.c                    r   r   )r   r    r   r   'TRAIN_LIGHTNING_RAYLIGHTNINGENVIRONMENTr"   r&   r   r   r       r(   z RayLightningEnvironment.__init__r)   c                 C      t   S r+   )r   get_contextget_world_sizer0   r   r   r   r9         z"RayLightningEnvironment.world_sizec                 C   r`   r+   )r   ra   get_world_rankr0   r   r   r   r:      rc   z#RayLightningEnvironment.global_rankc                 C   r`   r+   )r   ra   get_local_rankr0   r   r   r   
local_rank   rc   z"RayLightningEnvironment.local_rankc                 C   r`   r+   )r   ra   get_node_rankr0   r   r   r   	node_rank   rc   z!RayLightningEnvironment.node_ranksizeNc                 C      d S r+   r   )r#   ri   r   r   r   set_world_size      z&RayLightningEnvironment.set_world_sizer6   c                 C   rj   r+   r   )r#   r6   r   r   r   set_global_rank   rl   z'RayLightningEnvironment.set_global_rankc                 C   rj   r+   r   r0   r   r   r   teardown   s   z RayLightningEnvironment.teardown)r>   r?   r@   rA   r    intr9   r:   rf   rh   rk   rm   rn   rE   r   r   r&   r   r^      s    r^   trainerr)   c                    s   t ttg}t fdd|D s#tdt j ddd |D  dt jdd	}|r;t|t	s;td
t| dt
tjd  S )z@Prepare the PyTorch Lightning Trainer for distributed execution.c                 3   s    | ]	}t  j|V  qd S r+   )
isinstancestrategy.0clsrp   r   r   	<genexpr>   s    z"prepare_trainer.<locals>.<genexpr>zInvalid strategy class: zJ. To use PyTorch Lightning with Ray, the strategy object should be one of c                 S   s   g | ]}|j qS r   )r>   rs   r   r   r   
<listcomp>   s    z#prepare_trainer.<locals>.<listcomp>z class or its subclass.cluster_environmentNzoInvalid cluster environment plugin. The expected class is`ray.train.lightning.RayLightningEnvironment` but got !r   )r   rF   r\   anyRuntimeErrortyperr   getattrrq   r^   r   r   TRAIN_LIGHTNING_PREPARE_TRAINER)rp   valid_strategy_classry   r   rv   r   prepare_trainer   s$   
r   c                       s0   e Zd ZdZdZd	 fddZd	ddZ  ZS )
RayTrainReportCallbacku$  A simple callback that reports checkpoints to Ray on train epoch end.

    This callback is a subclass of `lightning.pytorch.callbacks.Callback
    <https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.callbacks.Callback.html#lightning.pytorch.callbacks.Callback>`_.

    It fetches the latest `trainer.callback_metrics` and reports together with
    the checkpoint on each training epoch end.

    Checkpoints will be saved in the following structure::

        checkpoint_00000*/      Ray Train Checkpoint
        └─ checkpoint.ckpt      PyTorch Lightning Checkpoint

    For customized reporting and checkpointing logic, implement your own
    `lightning.pytorch.callbacks.Callback` following this user
    guide: :ref:`Saving and Loading Checkpoints <train-dl-saving-checkpoints>`.
    zcheckpoint.ckptr)   Nc                    sp   t    t  | _t  | _tt	
 | j | _tj| jr0| jdkr0t| j ttjd d S )Nr   r   )r   r    r   ra   get_trial_name
trial_namere   rf   r   tempfile
gettempdiras_posixtmpdir_prefixospathisdirshutilrmtreer   r   &TRAIN_LIGHTNING_RAYTRAINREPORTCALLBACKr0   r&   r   r   r      s   
zRayTrainReportCallback.__init__c                 C   s   t | jt|j }tj|dd |j}dd | D }|j|d< |j	|d< t || j
 }|j|dd t|}tj||d	 |j  | jd
krTt| d S d S )NT)exist_okc                 S   s   i | ]	\}}||  qS r   )item)rt   rY   rZ   r   r   r   
<dictcomp>  s    z=RayTrainReportCallback.on_train_epoch_end.<locals>.<dictcomp>epochstepF)weights_only)metrics
checkpointr   )r   r   rD   current_epochr   r   makedirscallback_metricsrT   global_stepCHECKPOINT_NAMEsave_checkpointr	   from_directoryr   reportrr   barrierrf   r   r   )r#   rp   	pl_moduletmpdirr   	ckpt_pathr   r   r   r   on_train_epoch_end  s   




z)RayTrainReportCallback.on_train_epoch_end)r)   N)r>   r?   r@   rA   r   r    r   rE   r   r   r&   r   r      s
    
r   )8loggingr   r   r   pathlibr   typingr   r   r.   packaging.versionr   r-   r   ray._private.usage.usage_libr   r   	ray.trainr	   ray.utilr
   r   r   __version__rO   rP   _TORCH_GREATER_EQUAL_1_12distributedis_availablerN   &lightning.pytorch.plugins.environmentsr   r   &pytorch_lightning.plugins.environments
strategiesFSDPStrategyDDPFullyShardedStrategytorch.distributed.fsdpr   r   r   	getLoggerr>   loggerLIGHTNING_REPORT_STAGE_KEYDDPStrategyr   rF   DeepSpeedStrategyr\   r^   Trainerr   	callbacksCallbackr   r   r   r   r   <module>   sV    

D