o
    -iE                     @   s0  d Z ddlZddlZddlZddlZddlZddlmZ ddlZddl	Z	ddl
mZ ddlmZmZmZ ddlmZ ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z& ee'Z(G dd dej)Z)dej*fddZ+dd Z,e'dkre,  dS dS )a"  
vLLM gRPC Server

Starts a gRPC server for vLLM using the VllmEngine protocol.

Usage:
    python -m vllm.entrypoints.grpc_server --model <model_path>

Example:
    python -m vllm.entrypoints.grpc_server         --model meta-llama/Llama-2-7b-hf         --host 0.0.0.0         --port 50051
    N)AsyncGenerator)
reflection)SamplingParams
TextPromptTokensPrompt)AsyncEngineArgs)log_version_and_model)vllm_engine_pb2vllm_engine_pb2_grpc)init_logger)RequestOutput)RequestOutputKindStructuredOutputsParams)UsageContext)FlexibleArgumentParser)AsyncLLM)__version__c                   @   s.  e Zd ZdZdedefddZdejde	j
jdeejd	f fd
dZdejde	j
jdejfddZdejde	j
jdejfddZdejde	j
jdejfddZdejde	j
jdejfddZdejde	j
jdejfddZe	d dejde defddZ!ede"dejfddZ#ede"dejfddZ$d	S )!VllmEngineServicera(  
    gRPC servicer implementing the VllmEngine service.

    Handles 6 RPCs:
    - Generate: Streaming text generation
    - Embed: Embeddings (TODO)
    - HealthCheck: Health probe
    - Abort: Cancel requests out-of-band
    - GetModelInfo: Model metadata
    - GetServerInfo: Server state
    	async_llm
start_timec                 C   s   || _ || _td dS )z
        Initialize the servicer.

        Args:
            async_llm: The AsyncLLM instance
            start_time: The server start time, in seconds since epoch
        zVllmEngineServicer initializedN)r   r   loggerinfo)selfr   r    r   Y/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/entrypoints/grpc_server.py__init__;   s   zVllmEngineServicer.__init__requestcontextreturnNc              
   C  s<  |j }td| zN|ddkr%dt|jji}|jjr$|jj|d< nd|ji}| j	|j
|jd}| jj|||d2 z3 dH W }|jrK| |V  |jrT| |V  q<6 W dS  tyx } z|tjjt|I dH  W Y d}~dS d}~w ty } ztd	| |tjjt|I dH  W Y d}~dS d}~ww )
z
        Handle streaming generation requests.

        Args:
            request: The GenerateRequest protobuf
            context: gRPC context

        Yields:
            GenerateResponse protobuf messages (streaming)
        zGenerate request %s received.input	tokenizedprompt_token_idsprompt)stream)r"   sampling_params
request_idNz Error in Generate for request %s)r%   r   debug
WhichOneoflistr    	input_idsoriginal_texttext_sampling_params_from_protor$   r#   r   generate_chunk_responsefinished_complete_response
ValueErrorabortgrpc
StatusCodeINVALID_ARGUMENTstr	Exception	exceptionINTERNAL)r   r   r   r%   r"   r$   outputer   r   r   GenerateG   sB   
((zVllmEngineServicer.Generatec                    s&   t d |tjjdI dH  dS )z
        Handle embedding requests.

        TODO: Implement in Phase 4

        Args:
            request: The EmbedRequest protobuf
            context: gRPC context

        Returns:
            EmbedResponse protobuf
        zEmbed RPC not yet implementedN)r   warningr2   r3   r4   UNIMPLEMENTED)r   r   r   r   r   r   Embed~   s
   
zVllmEngineServicer.Embedc                    s4   | j j }|r
dnd}td|| tj||dS )z
        Handle health check requests.

        Args:
            request: The HealthCheckRequest protobuf
            context: gRPC context

        Returns:
            HealthCheckResponse protobuf
        HealthzEngine is not alivez+HealthCheck request: healthy=%s, message=%s)healthymessage)r   erroredr   r&   r	   HealthCheckResponse)r   r   r   
is_healthyrB   r   r   r   HealthCheck   s
   
zVllmEngineServicer.HealthCheckc                    s.   |j }td| | j|I dH  t S )z
        Out-of-band abort requests.

        Args:
            request: The AbortRequest protobuf
            context: gRPC context

        Returns:
            AbortResponse protobuf
        zAbort requests: %sN)request_idsr   r&   r   r2   r	   AbortResponse)r   r   r   rG   r   r   r   Abort   s
   zVllmEngineServicer.Abortc                    s.   | j j}tj|j|jdk|j| |jdS )z
        Handle model info requests.

        Args:
            request: The GetModelInfoRequest protobuf
            context: gRPC context

        Returns:
            GetModelInfoResponse protobuf
        r-   )
model_pathis_generationmax_context_length
vocab_sizesupports_vision)	r   model_configr	   GetModelInfoResponsemodelrunner_typemax_model_lenget_vocab_sizeis_multimodal_model)r   r   r   rO   r   r   r   GetModelInfo   s   zVllmEngineServicer.GetModelInfoc                    s0   | j j }tj|dt t | j ddS )z
        Handle server info requests.

        Args:
            request: The GetServerInfoRequest protobuf
            context: gRPC context

        Returns:
            GetServerInfoResponse protobuf
        Fz	vllm-grpc)active_requests	is_pausedlast_receive_timestampuptime_secondsserver_type)r   output_processorget_num_unfinished_requestsr	   GetServerInfoResponsetimer   )r   r   r   num_requestsr   r   r   GetServerInfo   s   z VllmEngineServicer.GetServerInfoTparamsr#   c                 C   s$  | j rt| j nd}| jrt| jnd}d}| d}|ra|dkr(t| jd}n9|dkr3t| jd}n.|dkr>t| jd}n#|d	krIt| jd
}n|dkrTt| j	d}n|dkratt| j
jd}td+i d| drm| jndd| jdkrx| jndd| jd| jd| jd| jd| jdkr| jndd| dr| jndd| jd|d|d| jd| jd| jd| jd kr| jnd!d"| d"r| jndd#| d#r| jndd$| d$r| jndd%| jd&| jrt | jndd'| d'r| j!ndd(|d)t"|d*|rt#j$S t#j%S ),a  
        Convert protobuf SamplingParams to vLLM SamplingParams.

        Args:
            params: Protobuf SamplingParams message
            stream: Whether streaming is enabled

        Returns:
            vLLM SamplingParams with detokenize=False and structured_outputs
        N
constraintjson_schema)jsonregex)rf   grammar)rg   structural_tag)rh   json_object)ri   choice)rj   temperatureg      ?top_pg        top_kmin_pfrequency_penaltypresence_penaltyrepetition_penalty
max_tokens
min_tokensstopstop_token_idsskip_special_tokensspaces_between_special_tokens
ignore_eosnr      logprobsprompt_logprobsseedinclude_stop_str_in_output
logit_biastruncate_prompt_tokensstructured_outputs
detokenizeoutput_kindr   )&rt   r(   ru   r'   r   rd   rf   rg   rh   ri   rj   choicesr   HasFieldrk   rl   rm   rn   ro   rp   rq   rr   rs   rv   rw   rx   ry   r{   r|   r}   r~   r   dictr   boolr   DELTA
FINAL_ONLY)rb   r#   rt   ru   r   constraint_fieldr   r   r   r,      s   



 
!z.VllmEngineServicer._sampling_params_from_protor:   c                 C   sj   | j r| j d nd}|du rtjtjg dddddS tjtj|j| jr)t| jndt|j| jddS )a5  
        Build a streaming chunk response from vLLM output.
        When output_kind=DELTA, vLLM returns only new tokens automatically.

        Args:
            output: vLLM RequestOutput (with delta tokens when output_kind=DELTA)

        Returns:
            GenerateResponse with chunk field set
        r   N)	token_idsprompt_tokenscompletion_tokenscached_tokens)chunk)outputsr	   GenerateResponseGenerateStreamChunkr   r!   lennum_cached_tokensr:   
completionr   r   r   r.   B  s(   z"VllmEngineServicer._chunk_responsec                 C   st   | j r| j d nd}|du rtjtjg ddddddS tjtj|j|jp&d| jr.t| jndt|j| jddS )z
        Build a final completion response from vLLM output.

        Args:
            output: vLLM RequestOutput (finished=True)

        Returns:
            GenerateResponse with complete field set
        r   Nerror)
output_idsfinish_reasonr   r   r   )completert   )	r   r	   r   GenerateCompleter   r   r!   r   r   r   r   r   r   r0   j  s,   z%VllmEngineServicer._complete_response)T)%__name__
__module____qualname____doc__r   floatr   r	   GenerateRequestr3   aioServicerContextr   r   r<   EmbedRequestEmbedResponser?   HealthCheckRequestrD   rF   AbortRequestrH   rI   GetModelInfoRequestrP   rV   GetServerInfoRequestr^   ra   staticmethodr   r   r,   r   r.   r0   r   r   r   r   r   .   st    
7




N'r   argsc              	      s  t tt| j td|  t }t| }|jt	j
d}tj|t	j
| j| jd}t||}tjjddgd}t|| tjjd jtjf}t|| | j d| j }|| | I d	H  td
| td t ! }	t "   fdd}
t#j$t#j%fD ]}|	&||
 qz`z	 ' I d	H  W n t(y   td Y n%w W td |j)ddI d	H  td |*  td td d	S W td |j)ddI d	H  td |*  td td d	S td |j)ddI d	H  td |*  td td w )zW
    Main serving function.

    Args:
        args: Parsed command line arguments
    zvLLM gRPC server args: %s)usage_context)vllm_configr   enable_log_requestsdisable_log_stats)zgrpc.max_send_message_length)zgrpc.max_receive_message_lengthr   )options
VllmEngine:NzvLLM gRPC server started on %sz"Server is ready to accept requestsc                      s   t d    d S )NzReceived shutdown signal)r   r   setr   
stop_eventr   r   signal_handler  s   
z"serve_grpc.<locals>.signal_handlerzInterrupted by userz!Shutting down vLLM gRPC server...g      @)gracezgRPC server stoppedzAsyncLLM engine stoppedzShutdown complete)+r   r   VLLM_VERSIONrQ   r   r_   r   from_cli_argscreate_engine_configr   OPENAI_API_SERVERr   from_vllm_configr   disable_log_stats_serverr   r3   r   serverr
    add_VllmEngineServicer_to_serverr	   
DESCRIPTORservices_by_name	full_namer   SERVICE_NAMEenable_server_reflectionhostportadd_insecure_portstartasyncioget_running_loopEventsignalSIGTERMSIGINTadd_signal_handlerwaitKeyboardInterruptrt   shutdown)r   r   engine_argsr   r   servicerr   service_namesaddressloopr   sigr   r   r   
serve_grpc  sx   












r   c               
   C   s   t dd} | jdtddd | jdtdd	d | jd
ddd t| } |  }z
tt	| W dS  t
yP } ztd| td W Y d}~dS d}~ww )zMain entry point.zvLLM gRPC Server)descriptionz--hostz0.0.0.0zHost to bind gRPC server to)typedefaulthelpz--porti  zPort to bind gRPC server toz--disable-log-stats-server
store_truez$Disable stats logging on server side)actionr   zServer failed: %srz   N)r   add_argumentr6   intr   add_cli_args
parse_argsuvlooprunr   r7   r   r8   sysexit)parserr   r;   r   r   r   main  s:   
r   __main__)-r   argparser   r   r   r_   collections.abcr   r3   r   grpc_reflection.v1alphar   vllmr   r   r   vllm.engine.arg_utilsr   vllm.entrypoints.utilsr   	vllm.grpcr	   r
   vllm.loggerr   vllm.outputsr   vllm.sampling_paramsr   r   vllm.usage.usage_libr   vllm.utils.argparse_utilsr   vllm.v1.engine.async_llmr   vllm.versionr   r   r   r   r   	Namespacer   r   r   r   r   r   <module>   s<     iX&
