o
    ۷i{                     @   s   d Z ddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZmZmZ eeZdZdZG d	d
 d
ZdedefddZdS )aG  
Tensor Parallelism worker for distributed inference.

This module implements a simple broadcast-based mechanism for TP serving:
- Rank 0 receives HTTP requests and broadcasts them to all ranks
- All ranks execute inference synchronously
- Rank 0 collects and returns the result

Inspired by SGLang's distributed architecture.
    N   )current_platform   )GenerateRequestGenerateResponseModelManageri,  c                   @   s\   e Zd ZdZdededefddZedd Zd	d
 Z	dd Z
dd ZdedefddZdS )TPCoordinatorzv
    Coordinator for Tensor Parallelism inference.

    Runs on rank 0 and broadcasts requests to all TP workers.
    model_managerrank
world_sizec                 C   sT   || _ || _|| _t | _t | _d| _d | _	t
d| d|  |   d S )NFz TPCoordinator initialized: rank=z, world_size=)r
   r   r   time_last_broadcast_time	threadingLock_heartbeat_lock_stop_heartbeat_heartbeat_threadloggerinfo_start_heartbeat)selfr
   r   r    r   O/home/ubuntu/vllm_env/lib/python3.10/site-packages/cache_dit/serve/tp_worker.py__init__$   s   

zTPCoordinator.__init__c                 C   s   | j jS )z=Expose the underlying model_manager's pipe for compatibility.)r
   piper   r   r   r   r   /   s   zTPCoordinator.pipec                 C   s
   | j  S )z8Get model information from the underlying model manager.)r
   get_model_infor   r   r   r   r   4   s   
zTPCoordinator.get_model_infoc                    s<    fdd}t j|dd _ j  tdt d d S )Nc                     s    j sbtt  jJ t  j tkrNztjtgtj	t
jd} tj| dd t  _td W n tyM } ztd|  W Y d }~nd }~ww W d    n1 sXw   Y   j rd S d S )Ndtypedevicer   srczHeartbeat sent to workerszHeartbeat failed: )r   r   sleepHEARTBEAT_INTERVALr   r   torchtensorHEARTBEAT_SIZElongr   device_typedist	broadcastr   debug	Exceptionerror)size_tensorer   r   r   heartbeat_loop9   s(   

z6TPCoordinator._start_heartbeat.<locals>.heartbeat_loopT)targetdaemonz#Heartbeat thread started (interval=zs))r   Threadr   startr   r   r$   )r   r1   r   r   r   r   8   s   
zTPCoordinator._start_heartbeatc                 C   s"   d| _ | jr| jjdd d S d S )NTr   )timeout)r   r   joinr   r   r   r   stopN   s   zTPCoordinator.stoprequestreturnc           
      C   s   | j Y t  t|}t|}tj|gtjtj	d}t
j|dd || j d | j | j }tj|tjtj	d}|d| tj|tjd t
j|dd t | _W d   n1 s_w   Y  |d|    }t|}| j|}	|	S )z}
        Generate images using TP.

        This method broadcasts the request to all ranks and collects the result.
        r   r   r!   r   N)r   )r   r   synchronizepickledumpslenr%   r&   r(   r)   r*   r+   r   zerosuint8copy_
frombufferr   r   cpunumpytobytesloadsr
   generate)
r   r9   request_datarequest_sizer/   padded_sizerequest_tensorbroadcasted_request_databroadcasted_requestresponser   r   r   rG   S   s,   


zTPCoordinator.generateN)__name__
__module____qualname____doc__r   intr   propertyr   r   r   r8   r   r   rG   r   r   r   r   r	      s    
r	   r
   r   c           
      C   s  t d| d 	 ztt  tjdgtjtjd}tj	|dd |
 }|tkr4t d| d W q	|t  d	 t  t  }tj|tjtjd}tj	|dd |d
|    }t|}t d| d | |}t d| d W n ty   t d| d Y d
S  ty }	 z@dt|	v sdt|	 v rt d| d|	  t  W Y d
}	~	d
S t d| dt|	j d|	  t d W Y d
}	~	n-d
}	~	w t!y }	 zt d| dt|	j d|	  t d W Y d
}	~	nd
}	~	ww q
)zb
    Worker loop for TP ranks > 0.

    Receives requests from rank 0 and executes inference.
    z
TP worker z! started, waiting for requests...Tr   r   r!   zRank z received heartbeatr   Nz executing inference...z inference completedz shutting down...NCCLr6   z NCCL error: z runtime error: z: g?z error: )"r   r   r   r;   r%   r&   r(   r)   r*   r+   itemr'   r,   get_world_sizer?   r@   rC   rD   rE   r<   rF   rG   KeyboardInterruptRuntimeErrorstrlowerr.   destroy_process_group	exceptiontyperO   r   r#   r-   )
r
   r   r/   rI   rJ   rK   rH   r9   _r0   r   r   r   run_tp_worker{   sN   


""r`   )rR   loggingr<   r   r   r%   torch.distributeddistributedr*   	platformsr   r
   r   r   r   	getLoggerrO   r   r$   r'   r	   rS   r`   r   r   r   r   <module>   s    
^