o
    ٷiD                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlZddl	m
Z ddlmZ eeZd0ddZd1ddZd2ddZd0ddZd3ddZd4ddZd5d#d$Zd6d&d'ZG d(d) d)Zd7d.d/ZdS )8z1Distributed VAE patch/tile parallelism utilities.    )annotationsN)Callable)Any)init_loggervaer   returnintc                 C  sF   zt t | dd dd }|rdt|d  W S W dS  ty"   Y dS w )Nconfigblock_out_channels         )getattrlen	Exception)r   r
    r   f/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/vae_patch_parallel.py_get_vae_spatial_scale_factor   s   r   pp_sizetuple[int, int]c                 C  sP   | dkrdS t t| }t|ddD ]}| | dkr#|| | f  S qd| fS )z8Pick a (rows, cols) grid whose product equals `pp_size`.r   )r   r   r   )r   mathsqrtrange)r   rootrowsr   r   r   _factor_pp_grid   s   r   groupdist.ProcessGroupvae_patch_parallel_sizetuple[int, int, int]c                 C  s0   t | }t | }tt|t|}|||fS N)distget_world_sizeget_rankminr   )r   r   
world_sizerankr   r   r   r   _get_world_rank_pp_size(   s   


r(   c                 C  s   t tt| dd ddS )Nr	   out_channels   )r   r   )r   r   r   r   _get_vae_out_channels2   s   r+   tuple[int, float] | Nonec                 C  s<   t | dd }t | dd }|d u s|d u rd S t|t|fS )Ntile_latent_min_sizetile_overlap_factor)r   r   float)r   r-   r.   r   r   r   _get_vae_tile_params6   s
   r0   tuple[int, float, int] | Nonec                 C  s>   t | dd }t| }|d u s|d u rd S |\}}||t|fS )Ntile_sample_min_size)r   r0   r   )r   r2   tile_paramsr-   r.   r   r   r   _get_vae_tiling_params>   s   r4   orig_decodeCallable[..., Any]ztorch.Tensorc           2   
     s<  t ||\}}}|dkr||ddd S t| }|du r$||ddd S |\}	}
}t|	d|
  }|dkr=||ddd S ttd|jd |}ttd|jd |}t|}t|}|| }|dk rk||ddd S t||
 }t|| }||k }g }g }d}|D ]V}|D ]Q}|d | }|r||kr|dddd|||	 |||	 f }tt| ddd	dr| |}| 	|}|
| |
|t|jd
 t|jd f |d7 }qqtjt|g|jtjd |dkr fddt|D }nd}tj |d|d d}|dkrdd |D }|rt|nd}tj|g|jtjd} tj| d|d t|  }t| }!tj|dfd|jtjdtj||jd |!||f|j|jdt|D ]-\}"\}}#}$||"df< |#|"df< |$|"df< ||" |"ddddd|#d|$f< qY|dkrfddt|D }%fddt|D }&nd}%d}&tj|%d|d tj|&d|d |dkrtjd|j|jdS i }'t|D ]K}(|%|( })|&|( }*t|D ];}"t|)|"df  }+|+dk rqt|)|"df  }#t|)|"df  }$|*|"ddddd|#d|$f |'|+< qܐqg },t|D ]}-g }.t|D ]}/|-| |/ }+|.
|'|+  q(|,
|. q g }0t|,D ]P\}}.g }1t|.D ]:\}}|dkrf| |,|d  | ||}|dkrv| |.|d  ||}|1
|ddddd|d|f  qP|0
tj|1dd qFtj|0ddS )a&  Distributed version of diffusers AutoencoderKL.tiled_decode (decode only).

    Each rank decodes a subset of tiles; rank0 gathers all tiles and performs the
    original blend + stitch logic. Non-rank0 ranks return an empty tensor; callers
    can broadcast the stitched result if needed.
    r   Freturn_dictr   Nr   r*   r	   use_post_quant_convr   devicedtypec                      g | ]}t  qS r   torch
empty_like.0_)count_tensorr   r   
<listcomp>       z-_distributed_tiled_decode.<locals>.<listcomp>gather_listdstr   c                 S  s   g | ]}t | qS r   r   itemrE   tr   r   r   rH      s    srcr   c                   r@   r   rA   rD   )meta_tensorr   r   rH      rI   c                   r@   r   rA   rD   )tile_tensorr   r   rH      rI   )dim)r(   r4   r   listr   shaper   r   post_quant_convdecoderappendrB   tensorr>   int64r"   gathermax	broadcastrN   r+   fullzerosr?   	enumerateemptyblend_vblend_hcat)2r   r5   r7   r   r   r&   r'   r   tiling_paramsr-   r.   r2   overlap_sizeh_startsw_startsnum_rowsnum_cols	num_tilesblend_extent	row_limitactivelocal_tiles
local_metatile_idij	tile_ranktiledecodedcount_gather	max_countcountsmax_count_tensorr)   idxhwmeta_gathertile_gathertile_mapsrc_rankmeta_src	tiles_srctidr   rrowcresult_rows
result_rowr   )rG   rS   rT   r   _distributed_tiled_decodeG   s   
(


$

*


*

*r   vae_scale_factorc           :   	     s(  t ||\}}}|dkr||ddd S t| }	|	du r$||ddd S |	\}
}t|
t| }td|d }||k }|j\}}}}t|}|| }|| }t| }tjd|j	|j
d}d}d}t|\}}|r>|}|| }|| }|| | }|d | | } || | }!|d | | }"td| | }#td|"|! }$|#dks|$dkrtjd|j	|j
d}nrt|t|#|$d }%td||% }&t|| |% }'td|!|% }(t||"|% })|dddd|&|'|(|)f }*tt| ddd	dr| |*}*| |*}+||& | },|!|( | }-|,|#|  }.|-|$|  }/|+dddd|,|.|-|/f }| r.t|jd
 nd}| r<t|jd nd}tj||g|j	tjd|dkr[fddt|D }0nd}0tj|0d|d d}1d}2|dkrdd |0D }3tdd |3D dd}1tdd |3D dd}2tj|1|2g|j	tjd}4tj|4d|d t|4d  }1t|4d  }2|1dks|2dkrtjd|j	|j
d n$tj|||1|2f|j	|j
d |r|r| ddddd|d|f< |dkr fddt|D }5nd}5tj |5d|d |dkrtjd|j	|j
dS tj||||f|j	|j
d}6t|D ]k}|}7|| }|| }|| | }|d | | } || | }!|d | | }"| | | }8|"|! | }9|8dksd|9dkrfq&|5|7 }*|*ddddd|8d|9f |6dddd|| | | |!| |"| f< q&|6S )ae  Decode one spatial block per rank, then stitch RGB blocks on rank0.

    Intended for sizes where diffusers tiling would not kick in, so we can still
    reduce the per-rank VAE decode activation peak. Each rank decodes a core
    block with a small latent-space halo, then crops to the core and gathers the
    RGB blocks to rank0 for final stitching.
    r   Fr9   r   Nr   r=   r	   r;   r<   r   c                   r@   r   rA   rD   )shape_tensorr   r   rH   "  rI   z-_distributed_patch_decode.<locals>.<listcomp>rJ   c                 S  s   g | ]}t d d |D qS )c                 s  s    | ]	}t | V  qd S r!   rM   rE   xr   r   r   	<genexpr>*  s    z7_distributed_patch_decode.<locals>.<listcomp>.<genexpr>)tuplerO   r   r   r   rH   *  s    c                 s  s    | ]\}}|V  qd S r!   r   )rE   r~   rF   r   r   r   r   +      z,_distributed_patch_decode.<locals>.<genexpr>)defaultc                 s  s    | ]\}}|V  qd S r!   r   )rE   rF   r   r   r   r   r   ,  r   rQ   c                   r@   r   rA   rD   )paddedr   r   rH   <  rI   )r(   r0   r   r/   r^   rW   r+   rB   rc   r>   r?   r   r%   r   rX   rY   numelr[   r\   r   r"   r]   r_   rN   ra   ):r   r5   r7   r   r   r   r&   r'   r   r3   r-   r.   overlap_latent	halo_baserp   bszrF   latent_hlatent_wscaleout_hout_wr)   
local_corelocal_hlocal_w	grid_rows	grid_cols	patch_idx	patch_row	patch_colh0h1w0w1core_hcore_whaloph0ph1pw0pw1rw   rx   ch0cw0ch1cw1shape_gathermax_hmax_wshapesmax_hw_tensorblock_gatheroutr   phpwr   )r   r   r   _distributed_patch_decode   s    

 

 

Pr   c                   @  s&   e Zd ZdZdd
dZddddZdS )VaePatchParallelismzPatch/tile-parallel VAE decode wrapper.

    This is meant to wrap `vae.decode` as an instance-level override
    so pipelines don't need model-specific code paths.
    r   r   r   r   group_getterCallable[[], dist.ProcessGroup]r   Nonec                C  s,   || _ t|| _|| _t|| _|j| _d S r!   )_vaer   _vae_patch_parallel_size_group_getterr   _vae_scale_factordecode_orig_decode)selfr   r   r   r   r   r   __init__c  s
   

zVaePatchParallelism.__init__Tr7   r8   r:   boolargskwargsc                 O  s  |j dkr| j|g|R d|i|S | jdkst s*| j|g|R d|i|S t| jdds?| j|g|R d|i|S z|  }W n ty\   | j|g|R d|i| Y S w t	|}t
|}tt| jt|}|dkr| j|g|R d|i|S t| jdd }	|	d u rt| j| j|||d}
n)|jd |	kp|jd	 |	k}|rt| j| j|||d}
nt| j| j|||| jd
}
|dkr|
 dkrtd | j|g|R ddi|d }
|dkr|
j|jkr|
j|jd}
|dkr|
 s|
 }
tjd|jtjd}|dkr|tjt|
j|jtjd tj|d|d |dkr>tjtdd |  D |j|jd}
tj|
d|d |sL|
fS ddl!m"} ||
dS )N   r:   r   
use_tilingFr-   )r   r5   r7   r   r   r   r<   )r   r5   r7   r   r   r   r   zUVAE patch parallel decode produced empty output on rank0; falling back to vae.decode.)r?   )r   r=   rQ   c                 s  s    | ]}t |V  qd S r!   )r   r   r   r   r   r     r   z-VaePatchParallelism.decode.<locals>.<genexpr>)DecoderOutput)sample)#ndimr   r   r"   is_initializedr   r   r   r   r#   r$   r%   r   r   rW   r   r   r   loggerwarningr?   tois_contiguous
contiguousrB   rc   r>   r\   copy_r[   r   r_   tolist!diffusers.models.autoencoders.vaer   )r   r7   r:   r   r   r   r&   r'   r   r-   rx   should_tiler   r   r   r   r   r   q  sx   
 

	
 
 
&
zVaePatchParallelism.decodeN)r   r   r   r   r   r   r   r   )T)r7   r8   r:   r   r   r   r   r   )__name__
__module____qualname____doc__r   r   r   r   r   r   r   \  s    
r   pipeliner   r   r   c                C  st   |dkrdS t | dd}|du st|dsdS t|dsdS t |ddr&dS t|||d}d	|_|j|_|j|_dS )
zOWrap a diffusers-style pipeline's `vae.decode` with patch/tile parallel decode.r   Nr   r   rY   "_vllm_vae_patch_parallel_installedF)r   r   T)r   hasattrr   r   r   (_vllm_vae_patch_parallel_original_decode)r   r   r   r   wrapperr   r   r   ,maybe_wrap_vae_decode_with_patch_parallelism  s"   
r   )r   r   r   r   )r   r   r   r   )r   r   r   r   r   r    )r   r   r   r,   )r   r   r   r1   )r   r   r5   r6   r7   r8   r   r   r   r   r   r8   )r   r   r5   r6   r7   r8   r   r   r   r   r   r   r   r8   )r   r   r   r   r   r   r   r   )r   
__future__r   r   collections.abcr   typingr   rB   torch.distributeddistributedr"   vllm.loggerr   r   r   r   r   r(   r+   r0   r4   r   r   r   r   r   r   r   r   <module>   s*   








	 

 b