o
    8wi=                     @   s  d dl Z d dlmZmZ d dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZ d dlZd dlmZ d dlmZm Z m!Z!m"Z" d dl#m$Z$m%Z%m&Z&m'Z' d dl#m(Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmZ erd dlDmEZE G dd de<ZFdS )    N)	GeneratorMapping)contextmanagernullcontext)	timedelta)Path)TYPE_CHECKINGAnyLiteralOptionalUnion)rank_zero_only)Tensor)	Optimizer)override)default_pg_timeout)_distributed_checkpoint_save_is_sharded_checkpoint_load_checkpoint_setup_device_mesh)_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)_TORCH_GREATER_EQUAL_2_4)_materialize_distributed_module)_METADATA_FILENAME)_optimizers_to_device)
reset_seed)_PATHReduceOp)LightningOptimizer)_SubprocessScriptLauncher)ParallelStrategy)
TBroadcast)	TrainerFn)is_overridden)
DeviceMeshc                       s  e Zd ZdZddddefdeed ef deed ef dede	e
 d	e	e d
df fddZedVddZeed
ejfddZed
efddZeed
ee
ef fddZed
e	e
 fddZeed
efddZeed
efddZedWddZedW fddZedXd"d#ZedX fd$d%ZedWd&d'ZeedYd(e	e d
e d) fd*d+Z!edYd,e	e
 d
dfd-d.Z"edZd0e#d1ed
e#fd2d3Z$e		4d[d5ee%ef d6e	e d7e	ee&e
f  d
e%fd8d9Z'd
e(e fd:d;Z)edWd<d=Z*ed
ee
ef fd>d?Z+ed\d@e,e
ef dAed
dfdBdCZ-edDe.d
ee
ef fdEdFZ/ed@e,e
ef d
dfdGdHZ0e	dYd@ee
ef dIe1dJe	e d
df fdKdLZ2edMe1d
ee
ef fdNdOZ3dW fdPdQZ4d
e
fdRdSZ5dWdTdUZ6  Z7S )]ModelParallelStrategya  Enables user-defined parallelism applied to a model.

    .. warning::  This is an :ref:`experimental <versioning:Experimental API>` feature.

    Currently supports up to 2D parallelism. Specifically, it supports the combination of
    Fully Sharded Data-Parallel 2 (FSDP2) with Tensor Parallelism (DTensor). These PyTorch APIs are currently still
    experimental in PyTorch (see https://pytorch.org/docs/stable/distributed.tensor.parallel.html).
    Requires PyTorch 2.4 or newer.

    Arguments:
        data_parallel_size: The number of devices within a data-parallel group. Defaults to ``"auto"``, which
            sets this size to the number of nodes in the cluster.
        tensor_parallel_size: The number of devices within a tensor-parallel group. Defaults to ``"auto"``, which
            sets this size to the number of GPUs in a single node.
        save_distributed_checkpoint: If ``True``, each rank saves its shard of weights and optimizer states to a file.
            The checkpoint is a folder with as many files as the world size.
            If ``False``, the full weights and optimizer states get assembled on rank 0 and saved to a single file.

    autoTNdata_parallel_sizetensor_parallel_sizesave_distributed_checkpointprocess_group_backendtimeoutreturnc                    sP   t    tstt| j d|| _|| _|| _|| _	|| _
d | _d| _d S )Nz  requires PyTorch 2.4 or higher.   )super__init__r   ImportErrortype__name___data_parallel_size_tensor_parallel_size_save_distributed_checkpoint_process_group_backend_timeout_device_mesh	num_nodes)selfr,   r-   r.   r/   r0   	__class__ h/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/pytorch_lightning/strategies/model_parallel.pyr4   Q   s   

zModelParallelStrategy.__init__r)   c                 C   s   | j d u r	td| j S )NzKAccessing the device mesh before processes have initialized is not allowed.)r=   RuntimeErrorr?   rB   rB   rC   device_meshd   s   
z!ModelParallelStrategy.device_meshc                 C   s   | j d usJ | j | j S N)parallel_devices
local_rankrE   rB   rB   rC   root_devicej   s   z!ModelParallelStrategy.root_devicec                 C   s   | j d ur
t| j S dS )Nr   )rH   lenrE   rB   rB   rC   num_processesp   s   z#ModelParallelStrategy.num_processesc                 C   s*   | j d usJ | j d }| | dS )Ndata_parallel)num_replicasrank)rF   sizeget_local_rank)r?   data_parallel_meshrB   rB   rC   distributed_sampler_kwargst   s   
z0ModelParallelStrategy.distributed_sampler_kwargsc                 C   s   | j S rG   )r;   rE   rB   rB   rC   r/   {   s   z+ModelParallelStrategy.process_group_backendc                 C      dS )NTrB   rE   rB   rB   rC   restore_checkpoint_after_setup      z4ModelParallelStrategy.restore_checkpoint_after_setupc                 C   rT   )NFrB   rE   rB   rB   rC   lightning_restore_optimizer   rV   z1ModelParallelStrategy.lightning_restore_optimizerc                 C   s2   | j d usJ | j jst| j | j| j| _d S d S rG   )cluster_environmentcreates_processes_externallyr$   rL   r>   	_launcherrE   rB   rB   rC   _configure_launcher   s   z)ModelParallelStrategy._configure_launcherc                    sj   t    |   | jdkr| j| _| jdkr| j| _t| j| j| j| j	| _
| jd us.J | j
| j_
d S )Nr+   )r3   setup_environment_setup_distributedr8   r>   r9   rL   r   
world_sizerJ   r=   lightning_modulerE   r@   rB   rC   r\      s   


z'ModelParallelStrategy.setup_environmenttrainer
pl.Trainerc                    s   ddl m  | jd usJ | jd usJ | j| td| js+tdt| j	 dt
 fdd| j D rCtd| jj	 d	t| j| j | j| j| _|   |   |jjtjkrf| | |   |jjtjkrzt| j| j d S d S )
Nr   FullyShardedDataParallelconfigure_modelzWhen using the zs, you are required to override the `configure_model()` hook in the LightningModule and apply parallelization there.c                 3   s    | ]}t | V  qd S rG   )
isinstance).0modrb   rB   rC   	<genexpr>   s    z.ModelParallelStrategy.setup.<locals>.<genexpr>z\Found modules that are wrapped with `torch.distributed.fsdp.FullyShardedDataParallel`. The `z5` only supports the new FSDP2 APIs in PyTorch >= 2.4.)torch.distributed.fsdprc   modelacceleratorsetupr(   r_   	TypeErrorr6   r7   anymodulesrA   r   rJ   precision_pluginconvert_modulemodel_to_devicebarrierstatefnr'   FITTINGsetup_optimizerssetup_precision_pluginr   
optimizersr?   r`   rB   rb   rC   rl      s0   
zModelParallelStrategy.setupc                    s   |    t |S rG   ) _reset_optimizers_and_schedulersr3   rw   rz   r@   rB   rC   rw      s   z&ModelParallelStrategy.setup_optimizersc                 C   s    | j d usJ | j | j d S rG   )rj   torJ   rE   rB   rB   rC   rr      s   z%ModelParallelStrategy.model_to_device
empty_init)NNNc              	   c   s    |rt dnt }|) | j  d V  W d    n1 s!w   Y  W d    d S W d    d S 1 s9w   Y  d S )Nmeta)torchdevicer   rp   tensor_init_context)r?   r}   empty_init_contextrB   rB   rC   r      s
   Pz)ModelParallelStrategy.tensor_init_contextnamec                 C   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)
device_ids)r   r   distributedget_backendrs   _determine_device_ids)r?   r   rB   rB   rC   rs      s
   zModelParallelStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr   r   )r   r   r   broadcast_object_list_groupWORLD)r?   r   r   rB   rB   rC   	broadcast   s
   zModelParallelStrategy.broadcastmeantensorr   	reduce_opc                 C   s   t |trt|||dS |S )N)r   )re   r   r   )r?   r   r   r   rB   rB   rC   reduce   s   
zModelParallelStrategy.reducec                 C   s
   | j jgS rG   )rJ   indexrE   rB   rB   rC   r      s   
z+ModelParallelStrategy._determine_device_idsc                 C   s>   | j d usJ | jd usJ | j   | j  | j  d S rG   )rX   rk   teardownrp   rE   rB   rB   rC   r      s
   

zModelParallelStrategy.teardownc                 C   s<   ddl m}m} || j dd}| jdusJ || j|dS )zCollects the state dict of the model.

        Only returns a non-empty state dict on rank 0 if ``save_distributed_checkpoint=False``.

        r   )StateDictOptionsget_model_state_dictTfull_state_dictcpu_offloadNoptions)'torch.distributed.checkpoint.state_dictr   r   r:   rj   )r?   r   r   state_dict_optionsrB   rB   rC   lightning_module_state_dict   s   z1ModelParallelStrategy.lightning_module_state_dict
checkpointstrictc                 C      d S rG   rB   )r?   r   r   rB   rB   rC   load_model_state_dict	  rV   z+ModelParallelStrategy.load_model_state_dict	optimizerc                 C   s   ddl m}m} ddlm} ddlm} || j dd}t|tr$|j	}| j
dus+J || j
||d}| jsD| jdkrD|||j| j
}|S )	zCollects the state of the given optimizer.

        Only returns a non-empty state dict on rank 0 if ``save_distributed_checkpoint=False``.

        r   )r   get_optimizer_state_dictrb   )OptimStateKeyTypeTr   Nr   )r   r   r   ri   rc   r   r:   re   r#   
_optimizerrj   global_rankrekey_optim_state_dictPARAM_ID)r?   r   r   r   FSDPr   r   
state_dictrB   rB   rC   optimizer_state  s   
z%ModelParallelStrategy.optimizer_statec                 C   r   rG   rB   )r?   r   rB   rB   rC   load_optimizer_state_dict%  rV   z/ModelParallelStrategy.load_optimizer_state_dictfilepathstorage_optionsc                    s   |d urt dt| j dt| j dt| |}| r.| js.t|s.td| | jrm|	 r9|
  |jddd d|di}|dd	 t|d
g D  t|| | jdkrkt||t  d S d S t|rvt| t j||dS )N`zF.save_checkpoint(..., storage_options=...)` is not supported because `z"` does not use the `CheckpointIO`.z/The checkpoint path exists and is a directory: T)parentsexist_okr   c                 S      i | ]
\}}d | |qS 
optimizer_rB   )rf   idxoptim_staterB   rB   rC   
<dictcomp>>  s    
z9ModelParallelStrategy.save_checkpoint.<locals>.<dictcomp>optimizer_statesr   )r   r   )rm   r6   r7   r   r   is_dirr:   r   IsADirectoryErroris_fileunlinkmkdirpopupdate	enumerater   r   r   saver   shutilrmtreer3   save_checkpoint)r?   r   r   r   pathconverted_stater@   rB   rC   r   *  s0   



z%ModelParallelStrategy.save_checkpointcheckpoint_pathc                 C   sN   t | |}d| jidd t| jD }| jd usJ t||| jjddS )Nr   c                 S   r   r   rB   )rf   r   r   rB   rB   rC   r   Q  s    z9ModelParallelStrategy.load_checkpoint.<locals>.<dictcomp>T)r   rt   r   optimizer_states_from_list)r   r   rj   r   ry   r_   r   strict_loading)r?   r   r   rt   rB   rB   rC   load_checkpointK  s   z%ModelParallelStrategy.load_checkpointc                    sH   t    t  |   |  | _| jd usJ t| j| j| jd d S )N)r0   )	r3   r\   r    set_world_ranks_get_process_group_backendr;   rX   r   r<   rE   r@   rB   rC   r]   [  s   

z(ModelParallelStrategy._setup_distributedc                 C   s   | j pt| jS rG   )r;   r   rJ   rE   rB   rB   rC   r   c  s   z0ModelParallelStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rG   )rX   set_global_rank	node_rankrL   rI   set_world_sizer>   r   r   rO   utils_rank_zero_onlyrE   rB   rB   rC   r   f  s   
z%ModelParallelStrategy.set_world_ranks)r1   r)   )r1   N)r`   ra   r1   NrG   )r   )Nr   )T)8r7   
__module____qualname____doc__r   r   r
   intboolr   strr   r4   propertyrF   r   r   r   rJ   rL   dictr	   rS   r/   rU   rW   r[   r\   rl   rw   rr   r   r   r   rs   r&   r   r   r"   r   listr   r   r   r   r   r   r   r   r!   r   r   r]   r   r   __classcell__rB   rB   r@   rC   r*   <   s     

"
 r*   )Gr   collections.abcr   r   
contextlibr   r   datetimer   pathlibr   typingr   r	   r
   r   r   r   "lightning_utilities.core.rank_zeror   r   r   torch.optimr   typing_extensionsr   pytorch_lightningpl5lightning_fabric.plugins.collectives.torch_collectiver   *lightning_fabric.strategies.model_parallelr   r   r   r   &lightning_fabric.utilities.distributedr   r   r   r   r   r   "lightning_fabric.utilities.importsr   lightning_fabric.utilities.initr   lightning_fabric.utilities.loadr   $lightning_fabric.utilities.optimizerr   lightning_fabric.utilities.seedr     lightning_fabric.utilities.typesr!   r"    pytorch_lightning.core.optimizerr#   8pytorch_lightning.strategies.launchers.subprocess_scriptr$   %pytorch_lightning.strategies.parallelr%   %pytorch_lightning.strategies.strategyr&    pytorch_lightning.trainer.statesr'   )pytorch_lightning.utilities.model_helpersr(   %pytorch_lightning.utilities.rank_zerotorch.distributed.device_meshr)   r*   rB   rB   rB   rC   <module>   s@   