o
    ॵi=[                     @   s  d dl Z d dlmZ d dlZd dlmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3m4Z4 d dl5m6Z6 d dl7m8Z8 ddl9m:Z:m;Z; e4 rd dl<Z<e3 r	 ed Z=ee>e?eddf Z@ee>edf ZAe6 ZBG dd deZCG dd deCZDdd ZEdS )     N)ABCabstractmethod)partial)Pool)Lock)AnyDict	GeneratorListMappingUnion)version)Model)	MsDataset)TASK_OUTPUTSModelOutputBase)TASK_INPUTScheck_input_type)Preprocessor)Config)
FrameworksInvoke	ModelFile)create_devicedevice_placementverify_device)read_configsnapshot_download)is_tf_availableis_torch_available)
get_logger)compile_model   )is_modelis_official_hub_path)ztorch.Tensorz	tf.TensorzImage.Imageznumpy.ndarrayztorch.nn.Modulec                
   @   sz  e Zd ZdZdd Zdee fddZ							d/d
ede	eee f de	e
ee
 f defddZdd ZdefddZde	eee f de	eeef ef fddZdd ZdefddZdd Zdedeeef fddZd d! Zdee deeef fd"d#Zd$d% Zd&d' Zd(edeeef fd)d*Zd(eeef deeef fd+d,Zd(eeef deeef fd-d.ZdS )0PipelinezPipeline base.
    c                 K   sp   t |trtd|  t |tr6t|r6td| d t|r4tj|f| jdt	j
| jd|S |S |S )Nzinitiate model from zinitiate model from location .T)devicemodel_prefetched
invoked_by
device_map)
isinstancestrloggerinfor$   r#   r   from_pretraineddevice_namer   PIPELINEr*   )selfmodelkwargs r5   M/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/pipelines/base.pyinitiate_single_model/   s$   
zPipeline.initiate_single_modelinput_modelsc                 C   s"   g }|D ]
}| | | q|S N)appendr7   )r2   r8   modelsr3   r5   r5   r6   initiate_multiple_models?   s   z!Pipeline.initiate_multiple_modelsNgpuTconfig_filer3   preprocessorr'   c           	      K   s\  |dur|dksJ d|| _ t| || _t|ts+| j|fi || _| jg| _n	d| _| || _t	| jdk| _
|durMt|| _tj|}n| j
sct| jtrZ| j}n| jj}t|| _|du rq| j
sqt|| _n|| _| js| j
r| jd r|  | _nd| _| jtjkrt| j| _d| _t | _|| _ |!dd| _"|!di | _#dS )	aH   Base class for pipeline.

        If config_file is provided, model and preprocessor will be
        instantiated from corresponding config. Otherwise, model
        and preprocessor will be constructed separately.

        Args:
            config_file(str, optional): Filepath to configuration file.
            model: (list of) Model name or model object
            preprocessor: (list of) Preprocessor object
            device (str): device str, should be either cpu, cuda, gpu, gpu:X or cuda:X
            auto_collate (bool): automatically to convert data to tensor or not.
            compile (bool, optional): Compile the model with torch 2.0, default False
            compile_options (dict, optional): The compile options if compile=True,
                default None to use the default params of 'TorchModel.compile'.
        Nr=   z;`device` and `device_map` cannot be input at the same time!r"   r   Fcompilecompile_options)$r*   r   r0   r+   r
   r7   r3   r;   r<   lenhas_multiple_modelsr   	from_filecfgospathdirnamer,   	model_dirr   r   r/   r?   _get_framework	frameworkr   torchr   r'   _model_preparer   _model_prepare_lock_auto_collateget_compile_compile_options)	r2   r>   r3   r?   r'   auto_collater*   r4   rI   r5   r5   r6   __init__E   s@   

zPipeline.__init__c                    s    j jdd  fdd} jsH jtjkrE jr2 jD ]}|| q jr1 fdd jD  _n| j	  jrEt
 j	fi  j _	d _ j   dS )	zQ Place model on certain device for pytorch models before first inference
        iX  )timeoutc                    s`   t | tjjst| dr| j} t | tjjsd S |   ddlm} || r.| 	 j
 d S d S )Nr3   r   )is_on_same_device)r+   rL   nnModulehasattrr3   evalmodelscope.utils.torch_utilsrV   tor'   )r3   rV   r2   r5   r6   _prepare_single   s   z/Pipeline.prepare_model.<locals>._prepare_singlec                    s   g | ]}t |fi  jqS r5   )r!   rR   ).0mr]   r5   r6   
<listcomp>   s    z*Pipeline.prepare_model.<locals>.<listcomp>TN)rN   acquirerM   rK   r   rL   rC   r;   rQ   r3   r!   rR   release)r2   r^   r`   r5   r]   r6   prepare_model   s&   




zPipeline.prepare_modelreturnc                    s|   g  | j D ]}t|tr|}n|j}t|tj}t	|} 
|j qt fdd D s:td   d S  d S )Nc                 3   s    | ]	}| d  kV  qdS )r   Nr5   )r_   x
frameworksr5   r6   	<genexpr>   s    z*Pipeline._get_framework.<locals>.<genexpr>z:got multiple models, but they are in different frameworks r   )r;   r+   r,   rI   ospjoinr   CONFIGURATIONr   rD   r:   rK   allr-   warning)r2   r`   rI   cfg_filerE   r5   rg   r6   rJ      s   


zPipeline._get_frameworkinputc           
      O   s   | j s| jr| jd r| js|   |dd }| jdi |\}}}||d< ||d< ||d< t|tr\|d u rPg }|D ]}	|	| j
|	g|R i | q<|S | j||fi |}|S t|trm| j|g|R i |S | j
|g|R i |}|S )Nr   
batch_sizepreprocess_paramsforward_paramspostprocess_paramsr5   )r3   rC   r;   rM   rd   pop_sanitize_parametersr+   listr:   _process_single_process_batchr   _process_iterator)
r2   rp   argsr4   rq   rr   rs   rt   outputeler5   r5   r6   __call__   s,   
 	
zPipeline.__call__c                 K   s
   i i |fS )a  
        this method should sanitize the keyword args to preprocessor params,
        forward params and postprocess params on '__call__' or '_process_single' method
        considered to be a normal classmethod with default implementation / output

        Default Returns:
            Dict[str, str]:  preprocess_params = {}
            Dict[str, str]:  forward_params = {}
            Dict[str, str]:  postprocess_params = pipeline_parameters
        r5   )r2   pipeline_parametersr5   r5   r6   rv      s   
zPipeline._sanitize_parametersc                 o   s*    |D ]}| j |g|R i |V  qd S r9   )rx   )r2   rp   r{   r4   r}   r5   r5   r6   rz      s   zPipeline._process_iteratorc                 C   s   t || jS r9   )
collate_fnr'   )r2   datar5   r5   r6   _collate_fn   s   zPipeline._collate_fnc              	   O   s   | di }| di }| di }| | | j|fi |}t| j| j= | jtjkrTt  | j	r;| 
|}| j|fi |}W d    n1 sNw   Y  n	| j|fi |}W d    n1 sgw   Y  | j|fi |}| | |S )Nrr   rs   rt   )rP   _check_input
preprocessr   rK   r0   r   rL   no_gradrO   r   forwardpostprocess_check_output)r2   rp   r{   r4   rr   rs   rt   outr5   r5   r6   rx      s&   


	
zPipeline._process_singlec                 C   sv   i }|D ]}|  D ]\}}||g }|| |||< q
q| D ]}t|| d tjr8t|| ||< q#|S )Nr   )itemsrP   r:   keysr+   rL   Tensorcat)r2   	data_list
batch_datasample_preprocessedkv
value_listr5   r5   r6   _batch  s   

zPipeline._batchc              
      s  | d| d}| d}g }tdt||D ]}t|| t|}|| }	fdd||| D }
tjjG jtjkrot	  
|
}jrV|}j|fi |}W d    n1 siw   Y  n
|
}j|fi |}W d    n1 sw   Y  t|	D ]T i }| D ]8\}}|d urt|ttfrt|d tjrt| fdd|D ||< q|  ||< q|  d	  ||< qj|fi |}| || qq|S )
Nrr   rs   rt   r   c                    s   g | ]}j |fi  qS r5   )r   )r_   i)rr   r2   r5   r6   ra     s    z+Pipeline._process_batch.<locals>.<listcomp>c                 3   s     | ]}|  d   V  qdS )r"   Nr5   )r_   e)	batch_idxr5   r6   ri   4  s
    
z*Pipeline._process_batch.<locals>.<genexpr>r"   )rP   rangerB   minr   rK   r0   r   rL   r   r   rO   r   r   r   r+   tuplerw   r   typer   r   r:   )r2   rp   rq   r4   rs   rt   output_listr   endreal_batch_sizepreprocessed_listbatched_outr   r   elementr5   )r   rr   r2   r6   ry     sT   









zPipeline._process_batchc           	      C   sh  | j }|tv rt| }t|trLd }|D ]}t|ttfr*t|t|kr)|} nqt|tr3|} nq|d u rJd}|D ]	}|| d7 }q<t||}t|trXt	|| d S t|trwt|tsfJ dt
||D ]	\}}t	|| qkd S t|tr| D ]}t|tr||v rt	|| ||  qd S td| t| ddstd| d d	| _d S d S )
NzDinput data format for current pipeline should be one of following: 

zinput should be a tuplezinvalid input_type definition _input_has_warnedFtask z input definition is missingT)	group_keyr   r+   rw   dictr   r   r,   
ValueErrorr   zipr   getattrr-   rn   r   )	r2   rp   	task_name
input_typematched_typeterr_msg	input_eler   r5   r5   r6   r   B  sN   





zPipeline._check_inputc                 C   s   | j }|tvrt| ddstd| d d| _d S t| }g }t|ttfr,|	 n|}|D ]}t|ttfrB||vrB|
| q0t|dkrTtd| d| d	d S )
N_output_has_warnedFr   z output keys are missingTr   zexpected output keys are z, those z are missing)r   r   r   r-   rn   r   r+   r   r   r   r:   rB   r   )r2   rp   r   output_keysmissing_keysr   r5   r5   r6   r   l  s,   


zPipeline._check_outputinputsc                 K   s8   | j dus	J dt| j trJ d| j |fi |S )z\ Provide default implementation based on preprocess_cfg and user can reimplement it
        Nz'preprocess method should be implementedzEdefault implementation does not support using multiple preprocessors.)r?   r+   r
   )r2   r   rr   r5   r5   r6   r     s
   zPipeline.preprocessc                 K   s2   | j dus	J d| jrJ d| j |fi |S )zU Provide default implementation using self.model and user can reimplement it
        Nz$forward method should be implementedzFdefault implementation does not support multiple models in a pipeline.)r3   rC   )r2   r   rs   r5   r5   r6   r     s   zPipeline.forwardc                 K   s   t d)ac   If current pipeline support model reuse, common postprocess
            code should be write here.

        Args:
            inputs:  input data
            post_params:   post process parameters

        Return:
            dict of results:  a dict containing outputs of model, each
                output should have the standard output name.
        r   )NotImplementedError)r2   r   post_paramsr5   r5   r6   r     s   zPipeline.postprocess)NNNr=   TN)__name__
__module____qualname____doc__r7   r
   
InputModelr<   r,   r   r   rT   rd   rJ   Inputr   r   r	   r~   rv   rz   r   rx   r   ry   r   r   r   r   r   r5   r5   r5   r6   r%   +   sR    
C#
"


/*


r%   c                   @   s   e Zd ZdZ			ddedeeee f fddZdd	 Z	d
d Z
edd Zdeeef deeef fddZedd ZdedefddZdS )DistributedPipelinea  This pipeline is used to load multi gpu models.

    What will this class do:
    1. Read the global config from the configuration.json
    2. Set the multiprocessing method to spawn
    3. Open a multiprocessing pool of the world_size to instantiate model pieces.
    4. Set the master port and ip
    5. Call _instantiate_one to instantiate one model piece,
    This method should be implemented by the derived class.
    6. After the forward method is called, do preprocess in main process and
    call _forward_one to collect results, and do post process in main process.

    NOTE: _instantiate_one and _forward_one are class methods, any derived class should implement them and
    store the model handler in the class field.
    NTr3   r?   c           	      K   sP  || _ d| _t | _|| _tj|r|| _nt	|| _t
| j| _| | j| _d | _d| _t| j| _d| _| jj| _tjjddd tt| j}t| j| _d|vrZd|d< d|v rdt|d ntd	d
}ddlm}m} ||sy| }t ||d< |d tj!d< |d tj!d< | j"t#| j$j%fd| ji| jj&|| g | _'d S )NFcpuspawnT)force	master_ipz	127.0.0.1master_porti<s  iL  r   )_find_free_port_is_free_portMASTER_ADDRMASTER_PORTrI   )(r?   rM   r   rN   rO   rF   rG   existsrI   r   r   rE   _get_world_size
world_size
model_poolr0   r   r'   rC   rK   rL   multiprocessingset_start_methodrw   r   r   intrandomrandintr[   r   r   r,   environmapr   	__class___instantiate_oner3   r;   )	r2   r3   r?   rS   r4   ranksr   r   r   r5   r5   r6   rT     sX   


zDistributedPipeline.__init__c                 C   s*   t | dr| jd ur| j  d S d S d S )Nr   )rY   r   	terminater]   r5   r5   r6   __del__  s   zDistributedPipeline.__del__c                 C   s    | j  }|d= |d= |d= |S )Nr   r?   rN   )__dict__copy)r2   	self_dictr5   r5   r6   __getstate__  s
   
z DistributedPipeline.__getstate__c                 K      dS )a  Instantiate one model piece.

        Args:
            rank: The model rank.
            model_dir: The model_dir in the node.
            kwargs: Any extra args.

        Returns:
            None. The model handler should be kept in the class field.
        Nr5   )clsrankrI   r4   r5   r5   r6   r        z$DistributedPipeline._instantiate_oner   re   c                 K   s,   ||d}| j | jj|g| j }|d S )N)r   rs   r   )r   r   r   _forward_oner   )r2   r   rs   resr5   r5   r6   r     s   
zDistributedPipeline.forwardc                 C   r   )zForward the inputs to one model piece.

        Use the model handler kept in the class field to forward.

        Args:
            inputs: The inputs after the preprocessing.

        Returns:
            The forward results.
        Nr5   )r   r   r5   r5   r6   r     r   z DistributedPipeline._forward_onerE   c                 C   s    | d}|d u r| dS |S )Nzmegatron.world_sizezmodel.world_size)safe_get)r2   rE   m_world_sizer5   r5   r6   r     s   

z#DistributedPipeline._get_world_size)NNT)r   r   r   r   r,   r   r   r
   rT   r   r   classmethodr   r   r   r   r   r   r   r   r5   r5   r5   r6   r     s(    
1




r   c              	      s8  ddl m} dd }t| tst| tr#t|  fdd|  D S t| ttfrRdt	| kr5t
g S t| d ttfrE||  S t|  fdd| D S t| tjri| jjtju ra| S tt
|  S t| t
jrt|  S t| ttttttd	fr| S || d
kr| S || dkr| S tdt|  )a3  Prepare the input just before the forward function.
    This method will move the tensors to the right device.
    Usually this method does not need to be overridden.

    Args:
        data: The data out of the dataloader.
        device: The device to move data to.

    Returns: The processed data.

    r   )default_collatec                 S   s   | j jS r9   )r   r   )objr5   r5   r6   get_class_name(  s   z"collate_fn.<locals>.get_class_namec                    s(   i | ]\}}||d krt | n|qS )	img_metasr   )r_   r   r   r'   r5   r6   
<dictcomp>-  s    zcollate_fn.<locals>.<dictcomp>c                 3   s    | ]}t | V  qd S r9   r   )r_   r   r   r5   r6   ri   7  s    zcollate_fn.<locals>.<genexpr>NInputFeaturesDataContainerzUnsupported data type )torch.utils.data.dataloaderr   r+   r   r   r   r   r   rw   rB   rL   r   r   floatr\   npndarraydtypestr_r   
from_numpybytesr,   boolr   )r   r'   r   r   r5   r   r6   r     s2   

r   )FrF   os.pathrG   rj   r   abcr   r   	functoolsr   r   r   	threadingr   typingr   r   r	   r
   r   r   numpyr   	packagingr   modelscope.models.baser   modelscope.msdatasetsr   modelscope.outputsr   r   modelscope.pipeline_inputsr   r   modelscope.preprocessorsr   modelscope.utils.configr   modelscope.utils.constantr   r   r   modelscope.utils.devicer   r   r   modelscope.utils.hubr   r   modelscope.utils.import_utilsr   r   modelscope.utils.loggerr    r[   r!   utilr#   r$   rL   r   r,   r   r   r   r-   r%   r   r   r5   r5   r5   r6   <module>   sH      wz