o
    ॵi                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZm	Z	 d dl
Z
d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZmZ d d
lmZ d dl m!Z!m"Z"m#Z#m$Z$ d dl%m&Z& e& Z'dgZ(ej)e$j*ej*dG dd deZ+dS )    N)AnyDictOptionalUnion)	Pipelines)Model)
ControlNet)
OutputKeys)InputPipeline)	PIPELINES)is_modelis_official_hub_path)'ControllableImageGenerationPreprocessor)
FrameworksInvoke	ModelFileTasks)
get_logger#ControllableImageGenerationPipeline)module_namec                       s   e Zd ZdZdd Z			ddeeef f fdd	Zd
d Z	de
eef de
eef fddZde
eef de
eef fddZ  ZS )r   a    controllable image generation Pipeline.

    Examples:

    >>> import cv2
    >>> from modelscope.outputs import OutputKeys
    >>> from modelscope.pipelines import pipeline
    >>> from modelscope.utils.constant import Tasks

    >>> input_location = 'data/test/images/image_inpainting/image_inpainting_mask_1.png'
    >>> prompt = 'hot air balloon'
    >>> output_image_path = './result.png'
    >>> input = {
    >>>     'image': input_location,
    >>>     'prompt': prompt
    >>> }
    >>> controllable_image_generation = pipeline(
    >>>     Tasks.controllable_image_generation,
    >>>     model='damo/cv_controlnet_scribble-to-image_base',
    >>>     control_type='scribble')
    >>> output = controllable_image_generation(input)[OutputKeys.OUTPUT_IMG]
    >>> cv2.imwrite(output_image_path, output)
    >>> print('pipeline: the output image path is {}'.format(output_image_path))
    c                 C   sh   t |trtd|  t |tr2t|r2td| d t|r0tj|| jdt	j
| jdS |S |S )Nzinitiate model from zinitiate model from location .T)devicemodel_prefetched
invoked_bycontrol_type)
isinstancestrloggerinfor   r   r   from_pretraineddevice_namer   PIPELINEinit_control_type)selfmodel r&   r/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/pipelines/cv/controllable_image_generation_pipeline.pyinitiate_single_model;   s"   
z9ControllableImageGenerationPipeline.initiate_single_modelNcudaFr%   c           	         s|   | dd| _|dkrd}|| _| |}tj| d}t| j||d}t	 j
d	||||d| || _td d S )
Nr   hedgpur)   z./ckpt/annotator/)r   
model_pathr   )r%   preprocessorr   auto_collatezload ControlNet doner&   )getr#   r!   r(   ospathjoinget_model_dirr   super__init__r   r   r   )	r$   r%   r-   r   r.   kwargscnetr,   CIGPreprocessor	__class__r&   r'   r5   K   s*   
z,ControllableImageGenerationPipeline.__init__c                 K   sH   | j  |d< | j  |d< | j  |d< | j|d< | j|d< |i i fS )a  
        this method should sanitize the keyword args to preprocessor params,
        forward params and postprocess params on '__call__' or '_process_single' method

        Returns:
            Dict[str, str]:  preprocess_params = {'image_resolution': self.model.get_resolution()}
            Dict[str, str]:  forward_params = pipeline_parameters
            Dict[str, str]:  postprocess_params = {}
        image_resolutionmodelsetting	model_dirr   r   )r%   get_resolution
get_configr3   r#   r   )r$   pipeline_parametersr&   r&   r'   _sanitize_parameterse   s   



z8ControllableImageGenerationPipeline._sanitize_parametersinputsreturnc                 K   s   |  |}|S )N)r%   )r$   rB   forward_paramsresultr&   r&   r'   forwardw   s   
z+ControllableImageGenerationPipeline.forwardc                 K   s|   t |d d }|d }|r-|d }t j||fdd}tj|d d d d d d df iS tj|d d d d d d df iS )NrE   r   
is_cat_imgdetected_map   )axis)nparrayconcatenater	   
OUTPUT_IMG)r$   rB   r6   rE   rG   rH   catr&   r&   r'   postprocess~   s   ""z/ControllableImageGenerationPipeline.postprocess)Nr)   F)__name__
__module____qualname____doc__r(   r   r   r   r5   rA   r   r   rF   rQ   __classcell__r&   r&   r9   r'   r      s    


*),globmathr0   
subprocesstempfiletypingr   r   r   r   cv2numpyrL   torchmodelscope.metainfor   modelscope.models.baser   2modelscope.models.cv.controllable_image_generationr   modelscope.outputsr	   modelscope.pipelines.baser
   r   modelscope.pipelines.builderr   modelscope.pipelines.utilr   r   9modelscope.preprocessors.cv.controllable_image_generationr   modelscope.utils.constantr   r   r   r   modelscope.utils.loggerr   r   __all__register_modulecontrollable_image_generationr   r&   r&   r&   r'   <module>   s4   