o
    $iz2                     @   s   d dl Z d dlZd dlZd dlmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ e eZd	Zd
ZdZdZdZeeeeegZ eddG dd deZ!dS )    N)AnyDictListOptional)Celery)task_failuretask_unknown)get_replica_context)DEFAULT_CONSUMER_CONCURRENCYSERVE_LOGGER_NAME)CeleryAdapterConfigTaskProcessorAdapterTaskProcessorConfig
TaskResult)	PublicAPIworker_poolworker_concurrencytask_ignore_resulttask_acks_latetask_reject_on_worker_lostalpha)	stabilityc                       s\  e Zd ZU dZeed< eed< dZee	j
 ed< dZee ed< eZeed< def fd	d
ZefdefddZd4ddZ	d5defddZdefddZdd Zd6defddZdd Zdd Zdeeef fdd Zdee fd!d"Z					d7d#ed$ed%ed&ed'ef
d(d)Z 					d7d#ed*ed+ed,ed-ef
d.d/Z!d0ed1ed%e"fd2d3Z#  Z$S )8CeleryTaskProcessorAdapterz
    Celery-based task processor adapter.
    This adapter does NOT support any async operations.
    All operations must be performed synchronously.
    _app_configN_worker_thread_worker_hostname_worker_concurrencyconfigc                    sh   t  j|i | t|jtstd|jjr/t|jj tt	@ }|r/t
dt| d|| _d S )NzMTaskProcessorConfig.adapter_config must be an instance of CeleryAdapterConfigzJThe following configuration keys cannot be changed via app_custom_config: zA. These are managed internally by the CeleryTaskProcessorAdapter.)super__init__
isinstanceadapter_configr   	TypeErrorapp_custom_configsetkeysCELERY_DEFAULT_APP_CONFIG
ValueErrorsortedr   )selfr   argskwargsconflicting_keys	__class__ U/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/serve/task_processor.pyr    6   s    

z#CeleryTaskProcessorAdapter.__init__consumer_concurrencyc              
   C   s6  t | jj| jjj| jjjd| _tdt|t	dt
dtdi}| jjjr)|| jjj | jj| | jj| jjd| jjdi}| jjrP| jjd| jjd|| jj< | jjrb| jjd| jjd|| jj< | jjj|dd| jjiid	 | jjjd ur| jjj| jjjd
 | jjrt| j | jjrt| j d S d S )N)backendbrokerthreadsFTdirect)exchangeexchange_typerouting_key*queue)task_queuestask_routes)broker_transport_options)r   r   
queue_namer"   backend_url
broker_urlr   CELERY_WORKER_POOLCELERY_WORKER_CONCURRENCYCELERY_TASK_IGNORE_RESULTCELERY_TASK_ACKS_LATE!CELERY_TASK_REJECT_ON_WORKER_LOSTr$   updateconffailed_task_queue_nameunprocessable_task_queue_namer>   r   connect_handle_task_failurer   _handle_unknown_task)r*   r2   app_configurationqueue_configr0   r0   r1   
initializeN   sX   
z%CeleryTaskProcessorAdapter.initializec                 C   sp   t fd| jjidddd}| jjjr|| jjj |r+| jjdd|i|| d S | jjdi || d S )Nmax_retriesT<   F)autoretry_forretry_kwargsretry_backoffretry_backoff_maxretry_jitternamer0   )	Exceptionr   rQ   r"   task_custom_configrG   r   task)r*   funcrX   task_optionsr0   r0   r1   register_task_handle   s   

z/CeleryTaskProcessorAdapter.register_task_handlereturnc                 K   s:   | j j|f||| jjd|}t|j|jt |jdS )N)r+   r,   r;   )idstatus
created_atresult)	r   	send_taskr   r?   r   r`   ra   timerc   )r*   	task_namer+   r,   optionstask_responser0   r0   r1   enqueue_task_sync   s   z,CeleryTaskProcessorAdapter.enqueue_task_syncc                 C   s    | j |}t|j|j|jdS )N)r`   rc   ra   )r   AsyncResultr   r`   rc   ra   )r*   task_idtask_detailsr0   r0   r1   get_task_status_sync   s   z/CeleryTaskProcessorAdapter.get_task_status_syncc                 K   s   | j dur| j  rtd dS t j}| jj d| | _dd| j d| j	j
g}tj| jj|fd| _ | j   td| j  dS )	z Starts the Celery worker thread.Nz(Celery worker thread is already running._workerz--hostname=z-Q)targetr+   z,Celery worker thread started with hostname: )r   is_aliveloggerinfor	   replica_tagr   mainr   r   r?   	threadingThreadworker_mainstart)r*   r,   	unique_idworker_argsr0   r0   r1   start_consumer   s$   



z)CeleryTaskProcessorAdapter.start_consumer      $@timeoutc                 C   s   | j du s
| j  std dS td | jjjdd| j gd | j j|d | j  r:t	d| d	 ntd
 d| _ dS )zESignals the Celery worker to shut down and waits for it to terminate.Nz$Celery worker thread is not running.z+Sending shutdown signal to Celery worker...shutdownzcelery@)destination)r~   z&Worker thread did not terminate after z	 seconds.z!Celery worker thread has stopped.)
r   rq   rr   rs   r   control	broadcastr   joinwarning)r*   r~   r0   r0   r1   stop_consumer   s   




z(CeleryTaskProcessorAdapter.stop_consumerc                 C   s$   t d | jj  t d d S )NzShutting down Celery worker...z"Celery worker shutdown complete...)rr   rs   r   r   r   r*   r0   r0   r1   r      s   
z#CeleryTaskProcessorAdapter.shutdownc                 C   s   | j j| dS )z
        Cancels a task synchronously. Only supported for Redis and RabbitMQ brokers by Celery.
        More details can be found here: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
        N)r   r   revoke)r*   rk   r0   r0   r1   cancel_task_sync   s   z+CeleryTaskProcessorAdapter.cancel_task_syncc                 C   s   | j j  S )z
        Returns the metrics of the Celery worker synchronously.
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Inspect.stats
        )r   r   inspectstatsr   r0   r0   r1   get_metrics_sync   s   z+CeleryTaskProcessorAdapter.get_metrics_syncc                 C   s   | j j S )a  
        Checks the health of the Celery worker synchronously.
        Returns a list of dictionaries, each containing the worker name and a dictionary with the health status.
        Example: [{'celery@192.168.1.100': {'ok': 'pong'}}]
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Control.ping
        )r   r   pingr   r0   r0   r1   health_check_sync   s   z,CeleryTaskProcessorAdapter.health_check_syncsenderrk   r+   r,   einfoc              	   K   s   t d| dt|  |t|jt|t|t|g}| jjr>| | jj|j| t d| d| d| jj d dS dS )ar  Handle task failures and route them to appropriate dead letter queues.

        This method is called when a task fails after all retry attempts have been
        exhausted. It logs the failure and moves the task to failed_task_queue

        Args:
            sender: The task object that failed
            task_id: Unique identifier of the failed task
            args: Positional arguments passed to the task
            kwargs: Keyword arguments passed to the task
            einfo: Exception info object containing exception details and traceback
            **kw: Additional keyword arguments passed by Celery
        z#Task failure detected for task_id: z	, einfo: zTask z& failed after max retries. Exception: z. Moved it to the z queue.N)	rr   rs   str	exceptionr   rI   _move_task_to_queuerX   error)r*   r   rk   r+   r,   r   kwdlq_argsr0   r0   r1   rL      s&   z/CeleryTaskProcessorAdapter._handle_task_failurerX   r`   messageexcc              
   K   sX   t d| d| dt|  | jjr*| | jj|||t|t|t|g dS dS )a  Handle unknown or unregistered tasks received by Celery.

        This method is called when Celery receives a task that it doesn't recognize
        (i.e., a task that hasn't been registered with the Celery app). These tasks
        are moved to the unprocessable task queue if configured.

        Args:
            sender: The Celery app or worker that detected the unknown task
            name: Name of the unknown task
            id: Task ID of the unknown task
            message: The raw message received for the unknown task
            exc: The exception raised when trying to process the unknown task
            **kwargs: Additional context information from Celery
        z'Unknown task detected by Celery. Name: z, ID: z, Exc: N)rr   rs   r   r   rJ   r   )r*   r   rX   r`   r   r   r,   r0   r0   r1   rM   -  s   z/CeleryTaskProcessorAdapter._handle_unknown_taskr?   rf   c                 C   sn   zt d| d| d|  | jj|||d W dS  ty6 } zt d| d| d|  |d}~ww )z4Helper function to move a task to a specified queue.zMoving task: z to queue: z, args: )rX   r;   r+   zFailed to move task: z	, error: N)rr   rs   r   rd   rY   r   )r*   r?   rf   r+   er0   r0   r1   r   U  s    z.CeleryTaskProcessorAdapter._move_task_to_queue)N)NN)r}   )NNNNN)%__name__
__module____qualname____doc__r   __annotations__r   r   r   rv   rw   r   r   r
   r   intr    rP   r^   r   ri   rm   r|   floatr   r   r   r   r   r   r   r   rL   rM   listr   __classcell__r0   r0   r.   r1   r   (   sl   
 
=

/
(r   )"loggingrv   re   typingr   r   r   r   celeryr   celery.signalsr   r   	ray.server	   ray.serve._private.constantsr
   r   ray.serve.schemar   r   r   r   ray.util.annotationsr   	getLoggerrr   rB   rC   rD   rE   rF   r'   r   r0   r0   r0   r1   <module>   s0    
	