o
    ίiC?                  	   @   s<  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlmZmZ d dlmZmZmZ d dlmZ d dlmZmZmZmZ d dlZd dlZd dlmZ d dl m!Z! d dl"m#Z#m$Z$ d d	l%m&Z& d d
l'm(Z( d dl)m*Z*m+Z+ d dl,m-Z. d dl/m0Z0m1Z1 e2 Z3ej4j5j6j7ej4j5j8j7ej4j5j9j7ej4j5j:j7ej4j;j<j7ej4j=j>j7hZ?eG dd dZ@eG dd dZAde@fddZBdKdeeCeDf de*fddZEdKdeeCeDf de*fddZFdd ZGe deHfddZIe deHfd d!ZJe deCfd"d#ZKe deCfd$d%ZLe deCfd&d'ZMe deHfd(d)ZNe d*eCdeCfd+d,ZOe dePfd-d.ZQd/d0 ZRd1d2 ZSd3d4 ZTd5d6 ZUd7eCdeeePeHf  fd8d9ZVdKd:d;ZWeX 	<dLd=ejYjZd>eDd?eDfd@dAZ[dBdC Z\dDdE Z]ej^dFdG Z_			dMde@dHeeeePeHf   fdIdJZ`dS )N    N)asdict	dataclass)	lru_cachepartialreduce)chain)ListOptionalTupleUnion)distributed)ReduceOp)MixedPrecisionPolicyfully_shard)DTensor)checkpoint_wrapper)
DeviceMeshinit_device_mesh)DistributedDataParallel)CheckpointPolicy$create_selective_checkpoint_contextsc                   @   s   e Zd ZU dZeed< dZeed< dZeed< dZe	ed< dZ
e	ed< dZe	ed< d	Zeed
< dZeed< dZe	ed< dZdZe	ed< dZeed< dZeed< dS )DistributedArgs   dp_sharddp_replicatetp_sizeF"selective_activation_checkpointingfull_activation_checkpointingcompileno_shard	fsdp_typebf16model_dtypematmul_allow_tf32Tdetect_anomaly   compile_cache_size_limit
forkserverspawn_methodN)__name__
__module____qualname__r   int__annotations__r   r   r   boolr   r   r    strr"   r#   &allow_bf16_reduced_precision_reductionr$   r&   r(    r1   r1   D/home/ubuntu/.local/lib/python3.10/site-packages/core/distributed.pyr   0   s    
 

r   c                   @   sz   e Zd ZU dZeed< dZeed< dZeed< dZeed< dZ	eed< d	Z
eed
< dZeed< dZeed< dZeed< dS )EnvironmentArgsGNUMKL_SERVICE_FORCE_INTEL1OMP_NUM_THREADSMKL_NUM_THREADS0ENABLE_INTRA_NODE_COMMTORCH_NCCL_AVOID_RECORD_STREAMS22NCCL_IB_TIMEOUTWARN
NCCL_DEBUGTORCH_NCCL_ASYNC_ERROR_HANDLINGzexpandable_segments:TruePYTORCH_CUDA_ALLOC_CONFN)r)   r*   r+   r5   r/   r-   r7   r8   r:   r;   r=   r?   r@   rA   r1   r1   r1   r2   r3   H   s   
 r3   distributed_argsc              
   C   s   | j }| j}| j}|| | t ks$J d| d| d| dt  d	g }g }|dkr6|| |d |dks?| jdkrI|| |d |dkrW|| |d	 t|}t|}td
||dS )Nz#dp_replicate * dp_shard * tp_size (z * z) != world_size ()r   r   r   r   tpcuda)
mesh_shapemesh_dim_names)r   r   r   get_world_sizeappendr    tupler   )rB   r   r   r   dimsnamesr1   r1   r2   get_device_meshY   s(   





rM   xmeshc                 C   0   t |  }tj|tj|r| nd d |S N)opgroup)torchtensorrE   dist
all_reducer   MAX	get_grouprN   rO   rU   r1   r1   r2   dist_maxs      r[   c                 C   rP   rQ   )rT   rU   rE   rV   rW   r   AVGrY   rZ   r1   r1   r2   	dist_meany   r\   r^   c                 C   sP   t  }| D ] }t| | ||< ||  dkr||  n||  ||< q|S Nr   )dictr^   dimitemtolist)rN   rkr1   r1   r2   dist_mean_dict   s
   .rf   returnc                   C   s   t jdd uS )N
LOCAL_RANK)osenvirongetr1   r1   r1   r2   get_is_torch_run   s   rl   c                   C   s   dt jv ot  S )NSLURM_JOB_ID)ri   rj   rl   r1   r1   r1   r2   get_is_slurm_job   s   rn   c                   C   ,   t  r
ttjd S t rttjd S dS )NRANKSLURM_PROCIDr   rl   r,   ri   rj   rn   r1   r1   r1   r2   get_global_rank   
   rs   c                   C   ro   )Nrh   SLURM_LOCALIDr   rr   r1   r1   r1   r2   get_local_rank   rt   rv   c                   C   ro   )N
WORLD_SIZESLURM_NTASKSr   rr   r1   r1   r1   r2   rH      rt   rH   c                   C   s
   t  dkS r_   )rs   r1   r1   r1   r2   get_is_master   s   
ry   job_idc                 C   s2   t  r
ttjd S d\}}t| }|||S )NMASTER_PORT)i N  i`  )rl   r,   ri   rj   randomRandomrandint)rz   MIN_MASTER_PORTMAX_MASTER_PORTrngr1   r1   r2   get_master_port   s
   
r   c                  C   sD   t  rtjd S t r tdddtjd g} |  d dS dS )	NMASTER_ADDRscontrolshow	hostnamesSLURM_JOB_NODELISTr   zutf-8z	127.0.0.1)rl   ri   rj   rn   
subprocesscheck_outputsplitdecode)r   r1   r1   r2   get_master_addr   s   
r   c                 C   s   t | }t }tjtj|dd ||d< t r*dtj	d  }tj
|r*||d< | D ] \}}tj	|t|krNt|tj	|< td| d|  q.d S )	NT)ignore_errorsTRITON_CACHE_DIRz/scratch/slurm_tmpdir/rm   TMP_DIRzWARNING: Setting z to )r   tempfilemkdtempatexitregistershutilrmtreern   ri   rj   pathexistsitemsrk   r/   loggerwarning)env_argsenv_varstriton_cache_dirnew_tmpnamevaluer1   r1   r2   	setup_env   s   r   c                 C   sV  t | j t   W d   n1 sw   Y  t }tt tjd< tt	 tjd< t
 tjd< ttttjdddtjd< t rPtd	|  nt r\td
|  ntd tdtj  d|  krudk sxJ  J | jrdtjjj_td | jtjjj_tj dkrtj| tjjddd tj| j  dS )z
    Handle single and multi-GPU / multi-node / SLURM jobs.
    Initialize the following variables:
        - global_rank
        - world_size
    Nrp   rw   r   rm   )rz   r{   z(Run launched with torchrun, local rank: z%Run launched with slurm, local rank: zSingle GPU jobzENV: r   r%   Tz\WARNING: Setting torch.backends.matmul.allow_tf32 to True. This is faster but less accurate.r   zenv://nccl)init_methodbackend)!mpset_start_methodr(   Managerrv   r/   rs   ri   rj   rH   r   r   r,   rk   rl   r   inforn   r#   rT   backendsrE   matmul
allow_tf32r   r0   device_count
set_devicer   init_process_groupautogradset_detect_anomalyr$   )	dist_args
local_rankr1   r1   r2   setup_torch_distributed   s:   



r   c                 C   s   |j dd}tt|| S )N.sep)r   r   getattr)moduleaccess_stringrL   r1   r1   r2   
get_module  s   r   c                 C   s4   |j dd}tt|d d | }t||d | d S )Nr   r   r   )r   r   r   setattr)r   r   r   rL   parentr1   r1   r2   
set_module  s   r   n_layersc                    s   dg fddt  D  S )N)vision_modelTc                    s"   g | ]}d | | d k fqS )zlayers.r   r1   ).0ir   r1   r2   
<listcomp>  s    z.default_fsdp_grouping_plan.<locals>.<listcomp>)ranger   r1   r   r2   default_fsdp_grouping_plan  s   r   c                    s    pt   fdd}|S )Nc                    s   | v rt jS t jS N)r   	MUST_SAVEPREFER_RECOMPUTE)ctxfuncargskwargsno_recompute_opsr1   r2   default_policy'  s
   z*get_default_policy.<locals>.default_policy)default_no_recompute_ops)r   r   r1   r   r2   get_default_policy$  s   r        @@modelr   stdc                 C   s   t |  D ]o\}}t|tr| }| dkr#td| d qt	|
 s1t|
 r:td| d | |  }| }||krVtd| d| d ||krftd| d| d |dk rutd| d qd S )	Nr   zModel parameter z, is empty, probably because of FSDP shardingz contains NaN or Infz! has a suspiciously large range (zE): please check initialization and init_weights is defined and calledz. has a suspiciously large standard deviation (z> is all zeros: it might be because of a missing initialization)r   named_parameters
isinstancer   to_localnumelr   r   rT   isnananyisinfmaxminr   all)r   r   r   r   paramparam_range	param_stdr1   r1   r2   check_model_value_range1  s4   


r   c                 C   s   t  t j|  td dS )zD
    Handle signals sent by SLURM for time limit / pre-emption.
    zSignal handler installed.N)signalSIGUSR2r   r   )callabler1   r1   r2   init_signal_handlerS  s   r   c                  C   s   t tjd } tdt | f  | dkr4tjdddkr4tdtjd   td	tjd   ntd
 t	
d d S )Nrq   zHost: %s - Global rank: %ir   LAUNCH_WITH DORAzRequeuing job rm   zscontrol requeue z+Not the master process, no need to requeue.)r,   ri   rj   r   r   socketgethostnamerk   systemsysexit)prod_idr1   r1   r2   requeue_slurm_job[  s   
r   c               	   #   sB    d  fddt jD } zd V  W t j|  d S t j|  w )N)r   r{   rp   rw   rh   LOCAL_WORLD_SIZETORCHELASTIC_RUN_IDDORA_FORCE_DISTRIBc                    s,   i | ]}| d s| v r|tj|qS ))SLURM_SLURMD_SRUN_SBATCH_	SUBMITIT_WANDB_)
startswithri   rj   pop)r   rN   distrib_namesr1   r2   
<dictcomp>r  s    zclean_env.<locals>.<dictcomp>)ri   rj   update)cluster_envr1   r   r2   	clean_envf  s   

r  fsdp_grouping_planc              	   C   s<  |j dkr(|jdksJ d|d usJ d|jdksJ d|| |d || ttjtjtjd|j }|jdks?|jd	kr|jd	krY|j	dksMJ d
|d 
 dksYJ d
tt|tjd|j	dksj|jd	krn|d n|d d}|d u rtt| j}|D ]\}	}
t| |	}t| |	t|fi |d|
i qt| fi |ddi} ntd|j |jr|jrJ dt| ttt|d} |jr|jrJ dtd | j D ]\}}t|dd}| j|| qt| dr| jjj  D ]\}}t|dd}| jjj || q|jr|j!tj"j#_$t| } | S )Nr   
full_shardz/Only full shard is supported for TP parallelismz&TP plan is required for TP parallelismFz+Compile is not supported for TP parallelismrD   )fp32fp16r!   r   z)dp_shard must be 1 for no_shard fsdp_typer   )param_dtypereduce_dtype)r   r   r   )	mp_policyrO   reshard_after_forwardTzInvalid fsdp_type: zUSelective activation checkpointing is incompatible with full activation checkpointing)
context_fnzUFull activation checkpointing is incompatible with selective activation checkpointingz7FULL ACTIVATION CHECKPOINTING on all transformer blocks)preserve_rng_stater   )%r   r    r   r`   rT   float32float16bfloat16r"   r   sizer   r   lenlayersr   r   r   
ValueErrorr   r   r   r   r   r   r   debugnamed_childrenregister_modulehasattrr   transformer	resblocksr&   _dynamoconfigcache_size_limit)r   device_mesh
model_argsrB   r  tp_parallelizer   r  fsdp_configr   r
  r   layer_idtransformer_blockresblockr1   r1   r2   parallelize_model  s   
	








r$  r   )r   r   )NNN)ar   
contextlibloggingmultiprocessingr   ri   r|   r   r   r   r   r   r   dataclassesr   r   	functoolsr   r   r   	itertoolsr   typingr   r	   r
   r   rT   xformers.opsxformersr   rV   torch.distributedr   "torch.distributed._composable.fsdpr   r   torch.distributed._tensorr   ;torch.distributed.algorithms._checkpoint.checkpoint_wrapperr   torch.distributed.device_meshr   r   torch.nn.parallelr   DDPtorch.utils.checkpointr   r   	getLoggerr   opsatenmmdefault
_scaled_mm'_scaled_dot_product_efficient_attention#_scaled_dot_product_flash_attentionc10d_functionalreduce_scatter_tensorxformers_flash	flash_fwdr   r   r3   rM   r,   floatr[   r^   rf   r.   rl   rn   rs   rv   rH   ry   r   r/   r   r   r   r   r   r   r   no_gradnnModuler   r   r   contextmanagerr  r$  r1   r1   r1   r2   <module>   s   






				-
!
