o
    zi=                     @   s  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d dlZd d
lmZ d dlm Z m!Z!m"Z"m#Z# d dl$m%Z%m&Z&m'Z'm(Z( d dl$m)Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? d dl@mAZA d dlBmCZC d dlDmZ e	rd dlEmFZF G dd de=ZGdS )    N)contextmanagernullcontext)	timedelta)Path)	TYPE_CHECKINGAnyDict	GeneratorListLiteralMappingOptionalUnion)rank_zero_only)Tensor)	Optimizer)override)default_pg_timeout)_distributed_checkpoint_save_is_sharded_checkpoint_load_checkpoint_setup_device_mesh)_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)_TORCH_GREATER_EQUAL_2_4)_materialize_distributed_module)_METADATA_FILENAME)_optimizers_to_device)
reset_seed)_PATHReduceOp)LightningOptimizer)_SubprocessScriptLauncher)ParallelStrategy)
TBroadcast)	TrainerFn)is_overridden)
DeviceMeshc                       s  e Zd ZdZddddefdeed ef deed ef dede	e
 d	e	e d
df fddZedVddZeed
ejfddZed
efddZeed
ee
ef fddZed
e	e
 fddZeed
efddZeed
efddZedWddZedW fddZedXd"d#ZedX fd$d%ZedWd&d'ZeedYd(e	e d
e d) fd*d+Z!edYd,e	e
 d
dfd-d.Z"edZd0e#d1ed
e#fd2d3Z$e		4d[d5ee%ef d6e	e d7e	ee&e
f  d
e%fd8d9Z'd
e(e fd:d;Z)edWd<d=Z*ed
ee
ef fd>d?Z+ed\d@e,e
ef dAed
dfdBdCZ-edDe.d
ee
ef fdEdFZ/ed@e,e
ef d
dfdGdHZ0e	dYd@ee
ef dIe1dJe	e d
df fdKdLZ2edMe1d
ee
ef fdNdOZ3dW fdPdQZ4d
e
fdRdSZ5dWdTdUZ6  Z7S )]ModelParallelStrategya  Enables user-defined parallelism applied to a model.

    .. warning::  This is an :ref:`experimental <versioning:Experimental API>` feature.

    Currently supports up to 2D parallelism. Specifically, it supports the combination of
    Fully Sharded Data-Parallel 2 (FSDP2) with Tensor Parallelism (DTensor). These PyTorch APIs are currently still
    experimental in PyTorch (see https://pytorch.org/docs/stable/distributed.tensor.parallel.html).
    Requires PyTorch 2.4 or newer.

    Arguments:
        data_parallel_size: The number of devices within a data-parallel group. Defaults to ``"auto"``, which
            sets this size to the number of nodes in the cluster.
        tensor_parallel_size: The number of devices within a tensor-parallel group. Defaults to ``"auto"``, which
            sets this size to the number of GPUs in a single node.
        save_distributed_checkpoint: If ``True``, each rank saves its shard of weights and optimizer states to a file.
            The checkpoint is a folder with as many files as the world size.
            If ``False``, the full weights and optimizer states get assembled on rank 0 and saved to a single file.

    autoTNdata_parallel_sizetensor_parallel_sizesave_distributed_checkpointprocess_group_backendtimeoutreturnc                    sP   t    tstt| j d|| _|| _|| _|| _	|| _
d | _d| _d S )Nz  requires PyTorch 2.4 or higher.   )super__init__r   ImportErrortype__name___data_parallel_size_tensor_parallel_size_save_distributed_checkpoint_process_group_backend_timeout_device_mesh	num_nodes)selfr.   r/   r0   r1   r2   	__class__ _/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/strategies/model_parallel.pyr6   P   s   

zModelParallelStrategy.__init__r+   c                 C   s   | j d u r	td| j S )NzKAccessing the device mesh before processes have initialized is not allowed.)r?   RuntimeErrorrA   rD   rD   rE   device_meshc   s   
z!ModelParallelStrategy.device_meshc                 C   s   | j d usJ | j | j S N)parallel_devices
local_rankrG   rD   rD   rE   root_devicei   s   z!ModelParallelStrategy.root_devicec                 C   s   | j d ur
t| j S dS )Nr   )rJ   lenrG   rD   rD   rE   num_processeso   s   z#ModelParallelStrategy.num_processesc                 C   s*   | j d usJ | j d }| | dS )Ndata_parallel)num_replicasrank)rH   sizeget_local_rank)rA   data_parallel_meshrD   rD   rE   distributed_sampler_kwargss   s   
z0ModelParallelStrategy.distributed_sampler_kwargsc                 C   s   | j S rI   )r=   rG   rD   rD   rE   r1   z   s   z+ModelParallelStrategy.process_group_backendc                 C      dS )NTrD   rG   rD   rD   rE   restore_checkpoint_after_setup~      z4ModelParallelStrategy.restore_checkpoint_after_setupc                 C   rV   )NFrD   rG   rD   rD   rE   lightning_restore_optimizer   rX   z1ModelParallelStrategy.lightning_restore_optimizerc                 C   s2   | j d usJ | j jst| j | j| j| _d S d S rI   )cluster_environmentcreates_processes_externallyr&   rN   r@   	_launcherrG   rD   rD   rE   _configure_launcher   s   z)ModelParallelStrategy._configure_launcherc                    sj   t    |   | jdkr| j| _| jdkr| j| _t| j| j| j| j	| _
| jd us.J | j
| j_
d S )Nr-   )r5   setup_environment_setup_distributedr:   r@   r;   rN   r   
world_sizerL   r?   lightning_modulerG   rB   rD   rE   r^      s   


z'ModelParallelStrategy.setup_environmenttrainer
pl.Trainerc                    s   ddl m  | jd usJ | jd usJ | j| td| js+tdt| j	 dt
 fdd| j D rCtd| jj	 d	t| j| j | j| j| _|   |   |jjtjkrf| | |   |jjtjkrzt| j| j d S d S )
Nr   FullyShardedDataParallelconfigure_modelzWhen using the zs, you are required to override the `configure_model()` hook in the LightningModule and apply parallelization there.c                 3   s    | ]}t | V  qd S rI   )
isinstance).0modrd   rD   rE   	<genexpr>   s    z.ModelParallelStrategy.setup.<locals>.<genexpr>z\Found modules that are wrapped with `torch.distributed.fsdp.FullyShardedDataParallel`. The `z5` only supports the new FSDP2 APIs in PyTorch >= 2.4.)torch.distributed.fsdpre   modelacceleratorsetupr*   ra   	TypeErrorr8   r9   anymodulesrC   r   rL   precision_pluginconvert_modulemodel_to_devicebarrierstatefnr)   FITTINGsetup_optimizerssetup_precision_pluginr!   
optimizersrA   rb   rD   rd   rE   rn      s0   
zModelParallelStrategy.setupc                    s   |    t |S rI   ) _reset_optimizers_and_schedulersr5   ry   r|   rB   rD   rE   ry      s   z&ModelParallelStrategy.setup_optimizersc                 C   s    | j d usJ | j | j d S rI   )rl   torL   rG   rD   rD   rE   rt      s   z%ModelParallelStrategy.model_to_device
empty_init)NNNc              	   c   s    |rt dnt }|) | j  d V  W d    n1 s!w   Y  W d    d S W d    d S 1 s9w   Y  d S )Nmeta)torchdevicer   rr   tensor_init_context)rA   r   empty_init_contextrD   rD   rE   r      s
   Pz)ModelParallelStrategy.tensor_init_contextnamec                 C   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)
device_ids)r   r   distributedget_backendru   _determine_device_ids)rA   r   rD   rD   rE   ru      s
   zModelParallelStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr   r   )r   r   r   broadcast_object_list_groupWORLD)rA   r   r   rD   rD   rE   	broadcast   s
   zModelParallelStrategy.broadcastmeantensorr   	reduce_opc                 C   s   t |trt|||dS |S )N)r   )rg   r   r   )rA   r   r   r   rD   rD   rE   reduce   s   
zModelParallelStrategy.reducec                 C   s
   | j jgS rI   )rL   indexrG   rD   rD   rE   r      s   
z+ModelParallelStrategy._determine_device_idsc                 C   s>   | j d usJ | jd usJ | j   | j  | j  d S rI   )rZ   rm   teardownrr   rG   rD   rD   rE   r      s
   

zModelParallelStrategy.teardownc                 C   s<   ddl m}m} || j dd}| jdusJ || j|dS )zCollects the state dict of the model.

        Only returns a non-empty state dict on rank 0 if ``save_distributed_checkpoint=False``.

        r   )StateDictOptionsget_model_state_dictTfull_state_dictcpu_offloadNoptions)'torch.distributed.checkpoint.state_dictr   r   r<   rl   )rA   r   r   state_dict_optionsrD   rD   rE   lightning_module_state_dict   s   z1ModelParallelStrategy.lightning_module_state_dict
checkpointstrictc                 C      d S rI   rD   )rA   r   r   rD   rD   rE   load_model_state_dict  rX   z+ModelParallelStrategy.load_model_state_dict	optimizerc                 C   s   ddl m}m} ddlm} ddlm} || j dd}t|tr$|j	}| j
dus+J || j
||d}| jsD| jdkrD|||j| j
}|S )	zCollects the state of the given optimizer.

        Only returns a non-empty state dict on rank 0 if ``save_distributed_checkpoint=False``.

        r   )r   get_optimizer_state_dictrd   )OptimStateKeyTypeTr   Nr   )r   r   r   rk   re   r   r<   rg   r%   
_optimizerrl   global_rankrekey_optim_state_dictPARAM_ID)rA   r   r   r   FSDPr   r   
state_dictrD   rD   rE   optimizer_state  s   
z%ModelParallelStrategy.optimizer_statec                 C   r   rI   rD   )rA   r   rD   rD   rE   load_optimizer_state_dict$  rX   z/ModelParallelStrategy.load_optimizer_state_dictfilepathstorage_optionsc                    s   |d urt dt| j dt| j dt| |}| r.| js.t|s.td| | jrm|	 r9|
  |jddd d|di}|dd	 t|d
g D  t|| | jdkrkt||t  d S d S t|rvt| t j||dS )N`zF.save_checkpoint(..., storage_options=...)` is not supported because `z"` does not use the `CheckpointIO`.z/The checkpoint path exists and is a directory: T)parentsexist_okr   c                 S      i | ]
\}}d | |qS 
optimizer_rD   )rh   idxoptim_staterD   rD   rE   
<dictcomp>=  s    
z9ModelParallelStrategy.save_checkpoint.<locals>.<dictcomp>optimizer_statesr   )r   r   )ro   r8   r9   r   r   is_dirr<   r   IsADirectoryErroris_fileunlinkmkdirpopupdate	enumerater   r   r   saver    shutilrmtreer5   save_checkpoint)rA   r   r   r   pathconverted_staterB   rD   rE   r   )  s0   



z%ModelParallelStrategy.save_checkpointcheckpoint_pathc                 C   sN   t | |}d| jidd t| jD }| jd usJ t||| jjddS )Nr   c                 S   r   r   rD   )rh   r   r   rD   rD   rE   r   P  s    z9ModelParallelStrategy.load_checkpoint.<locals>.<dictcomp>T)r   rv   r   optimizer_states_from_list)r   r   rl   r   r{   ra   r   strict_loading)rA   r   r   rv   rD   rD   rE   load_checkpointJ  s   z%ModelParallelStrategy.load_checkpointc                    sH   t    t  |   |  | _| jd usJ t| j| j| jd d S )N)r2   )	r5   r^   r"   set_world_ranks_get_process_group_backendr=   rZ   r   r>   rG   rB   rD   rE   r_   Z  s   

z(ModelParallelStrategy._setup_distributedc                 C   s   | j pt| jS rI   )r=   r   rL   rG   rD   rD   rE   r   b  s   z0ModelParallelStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rI   )rZ   set_global_rank	node_rankrN   rK   set_world_sizer@   r   r   rQ   utils_rank_zero_onlyrG   rD   rD   rE   r   e  s   
z%ModelParallelStrategy.set_world_ranks)r3   r+   )r3   N)rb   rc   r3   NrI   )r   )Nr   )T)8r9   
__module____qualname____doc__r   r   r   intboolr   strr   r6   propertyrH   r   r   r   rL   rN   r   r   rU   r1   rW   rY   r]   r^   rn   ry   rt   r   r	   r   ru   r(   r   r   r$   r   r
   r   r   r   r   r   r   r   r   r#   r   r   r_   r   r   __classcell__rD   rD   rB   rE   r,   ;   s     

"
 r,   )Hr   
contextlibr   r   datetimer   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   "lightning_utilities.core.rank_zeror   r   r   torch.optimr   typing_extensionsr   pytorch_lightningpl5lightning_fabric.plugins.collectives.torch_collectiver   *lightning_fabric.strategies.model_parallelr   r   r   r   &lightning_fabric.utilities.distributedr   r   r   r   r   r   "lightning_fabric.utilities.importsr   lightning_fabric.utilities.initr   lightning_fabric.utilities.loadr    $lightning_fabric.utilities.optimizerr!   lightning_fabric.utilities.seedr"    lightning_fabric.utilities.typesr#   r$    pytorch_lightning.core.optimizerr%   8pytorch_lightning.strategies.launchers.subprocess_scriptr&   %pytorch_lightning.strategies.parallelr'   %pytorch_lightning.strategies.strategyr(    pytorch_lightning.trainer.statesr)   )pytorch_lightning.utilities.model_helpersr*   %pytorch_lightning.utilities.rank_zerotorch.distributed.device_meshr+   r,   rD   rD   rD   rE   <module>   s>   ,