o
    Gi@                     @   s.  d dl Z d dlZd dlZd dlmZ d dlmZ d dlZd dlm	Z
 ej	 r0d dlm	  mZ ddlmZmZmZmZmZ ddlmZ ddlmZmZ dd	lmZmZ eeZd
ZdZ eG dd dZ!dej"j#dede$e%ef ddfddZ&dej"j#de$e%ef ddfddZ'G dd deZ(G dd deZ)G dd dej*j+Z,G dd dZ-G dd dej*j+Z.G d d! d!Z/ej0d"d#d$e1e2 d%e1e2 d&e2d'e2de3e3e2  f
d(d)Z4ed*ej5d&e2d+e
j6j7dej5fd,d-Z8d.ej"j#d/e%dej"j#e3ej"j# B fd0d1Z9d.ej"j#d/e%dej"j#e3ej"j# B fd2d3Z:dS )4    N)	dataclass)Type   )ContextParallelConfigContextParallelInputContextParallelModelPlanContextParallelOutputgather_size_by_comm)
get_logger)maybe_allow_in_graphunwrap_module   )HookRegistry	ModelHookzcp_input---{}zcp_output---{}c                   @   s>   e Zd ZU dZeeef ed< dZe	ed< ddefddZ
dS )	ModuleForwardMetadataNcached_parameter_indices_cls 
identifierc                 C   s   |pi }||v r|| dd fS | j d ur.| j |d }|d u r'td| d|| d|fS | jd u r7tdtt| jjj	 }|dd  }dd t
|D | _ || j vr`td| d	| j | }|t|krxtd
| dt| d|| d|fS )NTzParameter 'z' not found in cached indices.Fz$Model class is not set for metadata.r   c                 S   s   i | ]\}}||qS r   r   ).0iparamr   r   T/home/ubuntu/.local/lib/python3.10/site-packages/diffusers/hooks/context_parallel.py
<dictcomp>D   s    zIModuleForwardMetadata._get_parameter_from_args_kwargs.<locals>.<dictcomp>z4' not found in function signature but was requested.z	Expected z arguments but got .)r   get
ValueErrorr   listinspect	signatureforward
parameterskeys	enumeratelen)selfr   argskwargsindexr!   r   r   r   _get_parameter_from_args_kwargs3   s&   



z5ModuleForwardMetadata._get_parameter_from_args_kwargs)r   N)__name__
__module____qualname__r   dictstrint__annotations__r   r   r)   r   r   r   r   r   .   s   
 r   moduleparallel_configplanreturnc           
      C   s  t d|j d|  | D ]s\}}t| |}t|ts!|g}t d|dt| d |D ]Q}t|trCt	||}t
|}n4t|tttfrnt|trS|g}tdd |D sctd| t||}t|}n	td	t| t|}	|	|| q1qd
S )z"Apply context parallel on a model.z(Applying context parallel with CP mesh: z and plan: z*Applying ContextParallelHook to module_id=z identifying a total of z modulesc                 s   s    | ]}t |tV  qd S N)
isinstancer   r   xr   r   r   	<genexpr>g   s    z)apply_context_parallel.<locals>.<genexpr>z?Expected all elements of cp_model_plan to be CPOutput, but got .Unsupported context parallel model plan type: N)loggerdebug_meshitems_get_submodule_by_namer6   r   r$   r-   ContextParallelSplitHook%_CONTEXT_PARALLEL_INPUT_HOOK_TEMPLATEformatr   tupleallr   ContextParallelGatherHook&_CONTEXT_PARALLEL_OUTPUT_HOOK_TEMPLATEtyper   check_if_exists_or_initializeregister_hook)
r1   r2   r3   	module_idcp_model_plan	submodulemhook	hook_nameregistryr   r   r   apply_context_parallelQ   s,   






rQ   c                 C   s   |  D ]B\}}t| |}t|ts|g}|D ].}t|}t|tr)t|}nt|t	tt
fr7t|}n	tdt| || qqd S )Nr:   )r>   r?   r6   r   r   rH   r-   rA   rB   r   rC   rF   r   rG   remove_hook)r1   r3   rJ   rK   rL   rM   rP   rO   r   r   r   remove_context_parallelq   s   



rS   c                       s\   e Zd Zdededdf fddZdd Zd	d
 Zdd Zde	j
dede	j
fddZ  ZS )r@   metadatar2   r4   Nc                    s    t    || _|| _d | _d S r5   )super__init__rT   r2   module_forward_metadatar%   rT   r2   	__class__r   r   rV      s   

z!ContextParallelSplitHook.__init__c                 C   s   t |j}t|d| _|S )N)r   )r   rZ   r   rW   )r%   r1   clsr   r   r   initialize_hook   s   
z(ContextParallelSplitHook.initialize_hookc                 O   sF  t |}| j D ]\}}t|tr|jrq	| j|||\}}}	|d u r&q	t|tj	r3| 
||}nLt|t tfrvt|t|krQtdt| dt| dg }
t|D ]\}}t|rm|| jsm| 
||| }|
| qW|
}n	tdt| |r|||< q	|	d ur|	t|k r|||	< q	td| dt||fS )Nz"Expected input model plan to have  elements, but got r   zUnsupported input type: z9An unexpected error occurred while processing the input 'z'. Please open an issue at https://github.com/huggingface/diffusers/issues and provide a minimal reproducible example along with the full stack trace.)r   rT   r>   r6   r   split_outputrW   r)   torchTensor_prepare_cp_inputrC   r$   r   r#   	is_tensorappendrG   )r%   r1   r&   r'   	args_listnamecpm	input_valis_kwargr(   sharded_input_valr   r8   r   r   r   pre_forward   s>   



z$ContextParallelSplitHook.pre_forwardc                 C   s   t |tj}t |ttfotdd |D }|s$|s$tdt| d|r)|gnt|}| j	 D ].\}}t |t
r>|js?q2|t|krRtd| dt| d|| }| ||}|||< q2|rg|d S t|S )Nc                 s       | ]	}t |tjV  qd S r5   r6   r_   r`   r7   r   r   r   r9          z8ContextParallelSplitHook.post_forward.<locals>.<genexpr>CExpected output to be a tensor or a list/tuple of tensors, but got r   zIndex z$ out of bounds for output of length r   )r6   r_   r`   r   rC   rD   r   rG   rT   r>   r   r^   r$   ra   )r%   r1   outputrb   is_tensor_listr(   rf   current_outputr   r   r   post_forward   s    
z%ContextParallelSplitHook.post_forwardr8   cp_inputc                 C   sj   |j d ur| |j krtd|j  d|  d |S | jjr+t||j| jj	S t
||j| jj	S )NzExpected input tensor to have z dimensions, but got z' dimensions, split will not be applied.)expected_dimsdimr;   warning_oncer2   ulysses_anythingPartitionAnythingShardershard_anything	split_dim_flattened_meshEquipartitionShardershard)r%   r8   rs   r   r   r   ra      s   z*ContextParallelSplitHook._prepare_cp_input)r*   r+   r,   r   r   rV   r\   rj   rr   r_   r`   r   ra   __classcell__r   r   rY   r   r@      s    ."r@   c                       s2   e Zd Zdededdf fddZdd Z  ZS )	rE   rT   r2   r4   Nc                    s   t    || _|| _d S r5   )rU   rV   rT   r2   rX   rY   r   r   rV      s   

z"ContextParallelGatherHook.__init__c                 C   s   t |tj}|r|g}nt |ttfrtdd |D s&tdt| dt|}t|t| j	krCtdt| j	 dt| dt
| j	D ]*\}}|d u rQqH| jjrdt|| |j| jj||< qHt|| |j| jj||< qH|ry|d S t|S )Nc                 s   rk   r5   rl   r7   r   r   r   r9      rm   z9ContextParallelGatherHook.post_forward.<locals>.<genexpr>rn   r   zExpected output to have r]   r   )r6   r_   r`   r   rC   rD   r   rG   r$   rT   r#   r2   rw   rx   unshard_anything
gather_dimr{   r|   unshard)r%   r1   ro   rb   r   rf   r   r   r   rr      s&     

z&ContextParallelGatherHook.post_forward)r*   r+   r,   r   r   rV   rr   r~   r   r   rY   r   rE      s    rE   c                   @   s$   e Zd Zedd Zedd ZdS )AllGatherFunctionc                 C   s8   || _ || _tj|| _tj|| _tj	|||dS )Ngroup)
ru   r   r_   distributedget_world_size
world_sizeget_rankrankfuncolall_gather_tensor)ctxtensorru   r   r   r   r   r       s
   zAllGatherFunction.forwardc                 C   $   t j|| j| jd}|| j d d fS Nru   )r_   chunkr   ru   r   )r   grad_outputgrad_chunksr   r   r   backward  s   zAllGatherFunction.backwardN)r*   r+   r,   staticmethodr    r   r   r   r   r   r      s
    
r   c                	   @   \   e Zd Zedejdedejjj	dejfddZ
edejdedejjj	dejfddZd	S )
r|   r   ru   meshr4   c                 C   sB   |  | |   dksJ d|j|  |dtj|  S )Nr   zHTensor size along dimension to be sharded must be divisible by mesh sizer   )sizer   r_   r   r   	get_groupr[   r   ru   r   r   r   r   r}     s   "zEquipartitionSharder.shardc                 C      |  }t||| }|S r5   )
contiguousr   applyr   r   r   r   r   r     s   zEquipartitionSharder.unshardN)r*   r+   r,   classmethodr_   r`   r/   r   device_mesh
DeviceMeshr}   r   r   r   r   r   r|     s
    &*r|   c                   @   s8   e Zd Zedejdedejj	fddZ
edd ZdS )	AllGatherAnythingFunctionr   ru   r   c                 C   s4   || _ || _t|| _t|| _t|||}|S r5   )ru   r   distr   r   r   r   _all_gather_anything)r   r   ru   r   gathered_tensorr   r   r   r    $  s   z!AllGatherAnythingFunction.forwardc                 C   r   r   )r_   tensor_splitr   ru   r   )r   r   grad_splitsr   r   r   r   -  s   z"AllGatherAnythingFunction.backwardN)r*   r+   r,   r   r_   r`   r/   r   r   r   r    r   r   r   r   r   r   #  s
    r   c                	   @   r   )
rx   r   ru   r   r4   c                 C   sX   |  | |  ksJ d|   d| d|   d|j|  |dt|  S )NzCannot shard tensor of size z along dim z across mesh of size r   r   )r   r   r   r   r   r   r   r   r   ry   6  s    z'PartitionAnythingSharder.shard_anythingc                 C   r   r5   )r   r   r   r   r   r   r   r   r   A  s   z)PartitionAnythingSharder.unshard_anythingN)r*   r+   r,   r   r_   r`   r/   r   r   r   ry   r   r   r   r   r   rx   5  s*    
rx   @   )maxsizeshapegather_dimsru   r   c                 C   s:   g }t |D ]}tt| }|| ||< || q|S r5   )ranger   copydeepcopyrc   )r   r   ru   r   gather_shapesr   
rank_shaper   r   r   _fill_gather_shapesJ  s   r   r   r   c           
         sv   t j|d}    j}|| }t||}tt|t|||} fdd|D }t j| |d tj	||d}	|	S )Nr   c                    s    g | ]}t j| j jd qS ))devicedtype)r_   emptyr   r   )r   r   r   r   r   
<listcomp>_  s     z(_all_gather_anything.<locals>.<listcomp>r   )
r   r   r   r   r	   r   rC   
all_gatherr_   cat)
r   ru   r   r   r   rank_dimr   r   gathered_tensorsr   r   r   r   r   T  s   
r   modelre   c                 C   s    | ddkrtdt| |S )N*r   z.Wildcard '*' can only be used once in the name)countr   _find_submodule_by_name)r   re   r   r   r   r?   f  s   
r?   c                 C   s   |dkr| S d|v r| ddn|df\}}|dkr@t| tjjs%tdg }| D ]}t||}t|ts8|g}|| q)|S t	| |rOt
| |}t||S td| d| jj d)N r   r   r   z-Wildcard '*' can only be used with ModuleList'z' is not a submodule of ')splitr6   r_   nn
ModuleListr   r   r   extendhasattrgetattrrZ   r*   )r   re   
first_atomremaining_name
submodulesrL   subsubmodulesr   r   r   r   l  s"    




r   );r   	functoolsr   dataclassesr   typingr   r_   torch.distributedr   r   is_available)torch.distributed._functional_collectives_functional_collectivesr   models._modeling_parallelr   r   r   r   r	   utilsr
   utils.torch_utilsr   r   hooksr   r   r*   r;   rA   rF   r   r   Moduler-   r.   rQ   rS   r@   rE   autogradFunctionr   r|   r   rx   	lru_cacherC   r/   r   r   r`   r   r   r   r?   r   r   r   r   r   <module>   sP   
"

" ["
0	$*.