o
    iDD                     @   s   d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZ ddlmZmZmZmZmZ G d	d
 d
ZdS )a	  
Dirty Worker Process

Asyncio-based worker that loads dirty apps and handles requests
from the DirtyArbiter.

Threading Model
---------------
Each dirty worker runs an asyncio event loop in the main thread for:
- Handling connections from the arbiter
- Managing heartbeat updates
- Coordinating task execution

Actual app execution runs in a ThreadPoolExecutor (separate threads):
- The number of threads is controlled by ``dirty_threads`` config (default: 1)
- Each thread can execute one app action at a time
- The asyncio event loop is NOT blocked by task execution

State and Global Objects
------------------------
Apps can maintain persistent state because:

1. Apps are loaded ONCE when the worker starts (in ``load_apps()``)
2. The same app instances are reused for ALL requests
3. App state (instance variables, loaded models, etc.) persists

Example::

    class MLApp(DirtyApp):
        def init(self):
            self.model = load_heavy_model()  # Loaded once, reused
            self.cache = {}                   # Persistent cache

        def predict(self, data):
            return self.model.predict(data)  # Uses loaded model

Thread Safety:
- With ``dirty_threads=1`` (default): No concurrent access, thread-safe by design
- With ``dirty_threads > 1``: Multiple threads share the same app instances,
  apps MUST be thread-safe (use locks, thread-local storage, etc.)

Heartbeat and Liveness
----------------------
The worker sends heartbeat updates to prove it's alive:

1. A dedicated asyncio task (``_heartbeat_loop``) runs independently
2. It updates the heartbeat file every ``dirty_timeout / 2`` seconds
3. Since tasks run in executor threads, they do NOT block heartbeats
4. The arbiter kills workers that miss heartbeat updates

Timeout Control
---------------
Execution timeout is enforced at two levels:

1. **Worker level**: Each task execution has a timeout (``dirty_timeout``).
   If exceeded, the worker returns a timeout error but the thread may
   continue running (Python threads cannot be cancelled).

2. **Arbiter level**: The arbiter also enforces timeout when waiting
   for worker response. Workers that don't respond are killed via SIGABRT.

Note: Since Python threads cannot be forcibly cancelled, a truly stuck
operation will continue until the worker is killed by the arbiter.
    N)util)	WorkerTmp   )load_dirty_apps)DirtyAppErrorDirtyAppNotFoundErrorDirtyTimeoutErrorDirtyWorkerError)DirtyProtocolmake_responsemake_error_responsemake_chunk_messagemake_end_messagec                   @   s   e Zd ZdZdd d D Zdd Zdd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'S )(DirtyWorkerz
    Dirty worker process that loads dirty apps and handles requests.

    Each worker runs its own asyncio event loop and listens on a
    worker-specific Unix socket for requests from the DirtyArbiter.
    c                 C   s   g | ]	}t td | qS )zSIG%s)getattrsignal).0x r   I/home/ubuntu/.local/lib/python3.10/site-packages/gunicorn/dirty/worker.py
<listcomp>h   s    zDirtyWorker.<listcomp>zABRT HUP QUIT INT TERM USR1c                 C   sb   || _ d| _|| _|| _|| _|| _|| _d| _d| _d| _	t
|| _i | _d| _d| _d| _dS )a?  
        Initialize a dirty worker.

        Args:
            age: Worker age (for identifying workers)
            ppid: Parent process ID
            app_paths: List of dirty app import paths
            cfg: Gunicorn config
            log: Logger
            socket_path: Path to this worker's Unix socket
        z	[booting]FTN)agepidppid	app_pathscfglogsocket_pathbootedabortedaliver   tmpapps_server_loop	_executor)selfr   r   r   r   r   r   r   r   r   __init__k   s   

zDirtyWorker.__init__c                 C   s   d| j  dS )Nz<DirtyWorker >)r   r&   r   r   r   __str__   s   zDirtyWorker.__str__c                 C   s   | j   dS )zUpdate heartbeat timestamp.N)r!   notifyr)   r   r   r   r+      s   zDirtyWorker.notifyc                 C   s   | j jr| j j D ]	\}}|tj|< q
tj| j j| j j| j j	d t
  t| j  | j  |   |   t | _| j |  d| _|   dS )z
        Initialize the worker process after fork.

        This is called in the child process after fork. It sets up
        the environment, loads apps, and starts the main run loop.
        )
initgroupsTN)r   envitemsosenvironr   set_owner_processuidgidr,   seedclose_on_execr!   filenor   init_signals	load_appsgetpidr   dirty_worker_initr   run)r&   kvr   r   r   init_process   s   

zDirtyWorker.init_processc                 C   sn   | j D ]	}t|tj qttj| j ttj| j ttj| j ttj| j ttj| j dS )zSet up signal handlers.N)	SIGNALSr   SIG_DFLSIGTERM_signal_handlerSIGQUITSIGINTSIGABRTSIGUSR1)r&   sigr   r   r   r7      s   
zDirtyWorker.init_signalsc                 C   s:   |t jkr| j  dS d| _| jr| j| j dS dS )z(Handle signals by setting alive = False.NF)r   rF   r   reopen_filesr    r$   call_soon_threadsafe	_shutdown)r&   rG   framer   r   r   rB      s   

zDirtyWorker._signal_handlerc                 C   s   | j r
| j   dS dS )zInitiate async shutdown.N)r#   closer)   r   r   r   rJ      s   zDirtyWorker._shutdownc                 C   s   z<t | j| _| j D ]-\}}| jd| z|  | jd| W q ty9 } z	| j	d||  d}~ww W dS  tyP } z| j	d|  d}~ww )zLoad all configured dirty apps.zLoaded dirty app: %szInitialized dirty app: %sz%Failed to initialize dirty app %s: %sNzFailed to load dirty apps: %s)
r   r   r"   r.   r   debuginitinfo	Exceptionerrorr&   pathapper   r   r   r8      s(   	zDirtyWorker.load_appsc              
   C   s   ddl m} | jj}||d| j dd| _| jd| z=zt	 | _
t| j
 | j
|   W n tyL } z| jd| W Y d}~nd}~ww W |   dS W |   dS |   w )	z Run the main asyncio event loop.r   )ThreadPoolExecutorzdirty-worker--)max_workersthread_name_prefixz#Created thread pool with %d threadszWorker error: %sN)concurrent.futuresrV   r   dirty_threadsr   r%   r   rM   asyncionew_event_loopr$   set_event_looprun_until_complete
_run_asyncrP   rQ   _cleanup)r&   rV   num_threadsrU   r   r   r   r;      s&   
zDirtyWorker.runc                    s*  t j| jrt | j tj| j| jdI dH | _t 	| jd | j
d| j| j t|  }zKz'| j4 I dH  | j I dH  W d  I dH  n1 I dH sUw   Y  W n
 tjye   Y nw W |  z|I dH  W dS  tjy~   Y dS w |  z|I dH  W w  tjy   Y w w )z6Main async loop - start server and handle connections.)rS   Ni  zDirty worker %s listening on %s)r/   rS   existsr   unlinkr\   start_unix_serverhandle_connectionr#   chmodr   rO   r   create_task_heartbeat_loopserve_foreverCancelledErrorcancel)r&   heartbeat_taskr   r   r   r`      sB   (zDirtyWorker._run_asyncc                    s6   | j r|   t| jjd I dH  | j sdS dS )zPeriodically update heartbeat.g       @N)r    r+   r\   sleepr   dirty_timeoutr)   r   r   r   ri     s
   zDirtyWorker._heartbeat_loopc                    s   | j d zZz&| jr-z
t|I dH }W n
 tjy    Y nw | ||I dH  | jsW n tyG } z| j 	d| W Y d}~nd}~ww W |
  z
| I dH  W dS  tya   Y dS w |
  z	| I dH  W w  tyx   Y w w )zl
        Handle a connection from the arbiter.

        Each connection can send multiple requests.
        zNew connection from arbiterNzConnection error: %s)r   rM   r    r
   read_message_asyncr\   IncompleteReadErrorhandle_requestrP   rQ   rL   wait_closed)r&   readerwritermessagerU   r   r   r   rf      s:   	zDirtyWorker.handle_connectionc                    sx  | dtt }| d}|tjkr*t|td| }t||I dH  dS | d}| d}| dg }| di }	| 	  z@| 
||||	I dH }
t|
rb| ||
|I dH  W dS t|
rt| ||
|I dH  W dS t||
}t||I dH  W dS  ty } z+t }| jd	|||| t|tt||||d
}t||I dH  W Y d}~dS d}~ww )ai  
        Handle a single request message.

        Supports both regular (non-streaming) and streaming responses.
        For streaming, detects if the result is a generator and sends
        chunk messages followed by an end message.

        Args:
            message: Request dict from protocol
            writer: StreamWriter for sending responses
        idtypezUnknown message type: Napp_pathactionargskwargszError executing %s.%s: %s
%s)ry   rz   	traceback)getstruuiduuid4r
   MSG_TYPE_REQUESTr   r	   write_message_asyncr+   executeinspectisgenerator_stream_sync_generator
isasyncgen_stream_async_generatorr   rP   r}   
format_excr   rQ   r   )r&   rv   ru   
request_idmsg_typeresponsery   rz   r{   r|   resultrU   tbr   r   r   rr   ;  sJ   






 zDirtyWorker.handle_requestc           
   
      s  t    fdd}zsz2t }	 || j|I dH }| u r!nt|t||I dH  |   qt|t	|I dH  W n2 t
yp } z&t }| jd|| t|tt||d}	t||	I dH  W Y d}~nd}~ww W   dS W   dS   w )z
        Stream chunks from a synchronous generator.

        Args:
            request_id: Request ID for the messages
            gen: Sync generator to iterate
            writer: StreamWriter for sending messages
        c                      s"   zt W S  ty     Y S w N)nextStopIterationr   
_EXHAUSTEDgenr   r   	_get_next~  s
   
z5DirtyWorker._stream_sync_generator.<locals>._get_nextTNError during streaming: %s
%sr}   )objectr\   get_running_looprun_in_executorr%   r
   r   r   r+   r   rP   r}   r   r   rQ   r   r   r   rL   )
r&   r   r   ru   r   loopchunkrU   r   r   r   r   r   r   q  s>   

z"DirtyWorker._stream_sync_generatorc              
      s   znz'|2 z3 dH W }t |t||I dH  |   q6 t |t|I dH  W n2 ty[ } z&t }| j	d|| t
|tt||d}t ||I dH  W Y d}~nd}~ww W | I dH  dS W | I dH  dS | I dH  w )z
        Stream chunks from an asynchronous generator.

        Args:
            request_id: Request ID for the messages
            gen: Async generator to iterate
            writer: StreamWriter for sending messages
        Nr   r   )r
   r   r   r+   r   rP   r}   r   r   rQ   r   r   r   aclose)r&   r   r   ru   r   rU   r   r   r   r   r   r     s2   	


"z#DirtyWorker._stream_async_generatorc              	      s   || j vr
t|| j | | jjdkr| jjnd}t }ztj|| j fdd|dI dH }|W S  tj	yV   | j
d| | td| d  d	|dw )
a  
        Execute an action on a dirty app.

        The action runs in a thread pool executor to avoid blocking the
        asyncio event loop. Execution timeout is enforced using
        ``dirty_timeout`` config.

        Args:
            app_path: Import path of the dirty app
            action: Action name to execute
            args: Positional arguments
            kwargs: Keyword arguments

        Returns:
            Result from the app action

        Raises:
            DirtyAppNotFoundError: If app is not loaded
            DirtyTimeoutError: If execution exceeds timeout
            DirtyAppError: If execution fails
        r   Nc                      s    gR i S r   r   r   rz   rT   r{   r|   r   r   <lambda>  s    z%DirtyWorker.execute.<locals>.<lambda>)timeoutz%Execution timeout for %s.%s after %dszExecution of .z
 timed out)r"   r   r   ro   r\   r   wait_forr   r%   TimeoutErrorr   warningr   )r&   ry   rz   r{   r|   r   r   r   r   r   r   r     s2   

zDirtyWorker.executec                 C   s   | j r| j jddd d| _ | j D ]+\}}z|  | jd| W q ty> } z| jd|| W Y d}~qd}~ww z| j	  W n	 tyO   Y nw zt
j| jr^t
| j W n	 tyh   Y nw | jd| j dS )zClean up resources on shutdown.FT)waitcancel_futuresNzClosed dirty app: %szError closing dirty app %s: %szDirty worker %s exiting)r%   shutdownr"   r.   rL   r   rM   rP   rQ   r!   r/   rS   rc   r   rd   rO   r   rR   r   r   r   ra     s0   zDirtyWorker._cleanupN)__name__
__module____qualname____doc__splitr?   r'   r*   r+   r>   r7   rB   rJ   r8   r;   r`   ri   rf   rr   r   r   r   ra   r   r   r   r   r   `   s,    $
!60!4r   )r   r\   r   r/   r   r}   r   gunicornr   gunicorn.workers.workertmpr   rT   r   errorsr   r   r   r	   protocolr
   r   r   r   r   r   r   r   r   r   <module>   s   A	