o
    `۷i                  	   @   s
  d dl Z d dlZd dlmZ d dlmZmZ d dlmZ d dl	m
Z
mZ d dlmZ d dlmZ d dlmZmZ d d	lmZ eeZe
fd
ededefddZeddd
edefddZeddd
efddZedd	ddddee dee defddZdS )    N)wraps)CallableOptional)import_attr)DEFAULT_CONSUMER_CONCURRENCYSERVE_LOGGER_NAME)TaskConsumerWrapper)copy_class_metadata)TaskProcessorAdapterTaskProcessorConfig)	PublicAPItask_processor_configconsumer_concurrencyreturnc              
   C   s   | j }t|trt|}nt|r|}ntdt|j d| z|| }W n ty> } zt	d|j d| d }~ww t|t
sQt|j dt|j z|| W |S  typ } zt	d|j d| d }~ww )Nz>Adapter must be either a string path or a callable class, got z: zFailed to instantiate z- must inherit from TaskProcessorAdapter, got zFailed to initialize )adapter
isinstancestrr   callable	TypeErrortype__name__	ExceptionRuntimeErrorr
   
initialize)r   r   r   adapter_classadapter_instancee r   M/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/task_consumer.py_instantiate_adapter   s2   


r   alpha)	stabilityc                 C   s   t | S )a  
    Create a TaskProcessorAdapter instance from the provided configuration and call .initialize(). This function supports two ways to specify an adapter:

    1. String path: A fully qualified module path to an adapter class
       Example: "ray.serve.task_processor.CeleryTaskProcessorAdapter"

    2. Class reference: A direct reference to an adapter class
       Example: CeleryTaskProcessorAdapter

    Args:
        task_processor_config: Configuration object containing adapter specification.
    Returns:
        An initialized TaskProcessorAdapter instance ready for use.

    Raises:
        ValueError: If the adapter string path is malformed or cannot be imported.
        TypeError: If the adapter is not a string or callable class.

    Example:
        .. code-block:: python

            config = TaskProcessorConfig(
                adapter="my.module.CustomAdapter",
                adapter_config={"param": "value"},
                queue_name="my_queue"
            )
            adapter = instantiate_adapter_from_config(config)
    )r   r   r   r   r   instantiate_adapter_from_config:   s   !r#   c                    s    fdd}|S )a  
    Decorator to mark a class as a TaskConsumer.

    Args:
        task_processor_config: Configuration for the task processor (required)

    Note:
        This decorator must be used with parentheses:
        @task_consumer(task_processor_config=config)

    Returns:
        A wrapper class that inherits from the target class and implements the task consumer functionality.

    Example:
        .. code-block:: python

            from ray import serve
            from ray.serve.task_consumer import task_consumer, task_handler

            @serve.deployment
            @task_consumer(task_processor_config=config)
            class MyTaskConsumer:

                @task_handler(name="my_task")
                def my_task(self, *args, **kwargs):
                    pass

    c                    s&   G  fddd t }t|  |S )Nc                       sB   e Zd ZU eed<  fddZdef fddZ fddZd	S )
z>task_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper_adapterc                    s    j | g|R i | d S N)__init__)selfargskwargs
target_clsr   r   r&      s   zGtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.__init__r   c              
      s   t || _tj tjdD ]\}}t|ddr*t|d|}t| |}| j|| qz| j  t	d W d S  t
yM } z	td|   d }~ww )N)	predicate_is_task_handlerF
_task_namez"task consumer started successfullyzFailed to start task consumer: )r   r$   inspect
getmembers
isfunctiongetattrregister_task_handlestart_consumerloggerinfor   error)r'   r   namemethod	task_namebound_methodr   r+   r   r   r   initialize_callable   s&   

zRtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.initialize_callablec                    s&   | j   t dr |  d S d S )N__del__)r$   stop_consumerhasattrr>   )r'   r*   r   r   r>      s   

zFtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.__del__N)	r   
__module____qualname__r
   __annotations__r&   intr=   r>   r   r<   r   r   _TaskConsumerWrapper~   s
   
 rE   )r   r	   )r+   rE   r"   r*   r   	decorator}   s   
#z task_consumer.<locals>.decoratorr   )r   rF   r   r"   r   task_consumer^   s   (rG   r8   _funcr8   c                   sH    durt  tr  std   fdd}| dur"|| S |S )a  
    Decorator to mark a method as a task handler.
    Optionally specify a task name. Default is the method name.

    Arguments:
        _func: The function to decorate.
        name: The name of the task. Default is the method name.

    Returns:
        A wrapper function that is marked as a task handler.

    Example:
        .. code-block:: python

            from ray import serve
            from ray.serve.task_consumer import task_consumer, task_handler

            @serve.deployment
            @task_consumer(task_processor_config=config)
            class MyTaskConsumer:

                @task_handler(name="my_task")
                def my_task(self, *args, **kwargs):
                    pass

    Nz*Task name must be a non-empty string, got c                    s<   t  st  fdd}d|_p j|_|S td)Nc                     s    | i |S r%   r   )r(   r)   fr   r   wrapper   s   z0task_handler.<locals>.decorator.<locals>.wrapperTz)Async task handlers are not supported yet)r/   iscoroutinefunctionr   r-   r   r.   NotImplementedError)rK   rL   rH   rJ   r   rF      s   
ztask_handler.<locals>.decorator)r   r   strip
ValueError)rI   r8   rF   r   rH   r   task_handler   s    rQ   r%   )r/   logging	functoolsr   typingr   r   ray._common.utilsr   ray.serve._private.constantsr   r    ray.serve._private.task_consumerr   ray.serve._private.utilsr	   ray.serve.schemar
   r   ray.util.annotationsr   	getLoggerr5   rD   r   r#   rG   r   rQ   r   r   r   r   <module>   sL    

$#I