o
    	Ti;                     @   sP  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
mZ d dlZd dlmZ ddlmZmZmZ e rId dlZd dlmZ e rad d	lmZ d d
lmZ e rad dlmZ eeZG dd dZedkrd dlmZ e Z e j!dd e j"ddgdde dZ#e$de# d dl%m&Z& e&'d(dZ)e *e) dS dS )    N)BytesIO)OptionalUnion)urlparse)nn   )is_requests_availableis_vllm_ascend_availableis_vllm_available)ConnectionError)PyNcclCommunicator)StatelessProcessGroup)PyHcclCommunicatorc                   @   s
  e Zd ZdZ					d4dee ded	ed
edef
ddZd5dedefddZ											d6de
e dee
 dededededededed ee d!ee d"e
e
e  fd#d$Zd7d&eejeef fd'd(Zd)ed*ejfd+d,Zd-ejfd.d/Zd0d1 Zd2d3 ZdS )8
VLLMClienta	  
    A client class to interact with a vLLM server.

    This class provides methods to generate completions, initialize and manage weight update groups, and update model
    weights in a distributed setting. Before using it, start the vLLM server with `trl vllm-serve`.

    Args:
        base_url (`str` or `None`, *optional*, defaults to `None`):
            Base URL for the vLLM server (e.g., `"http://localhost:8000"`). If provided, `host` and `server_port` are
            ignored.
        host (`str`, *optional*, defaults to `"0.0.0.0"`):
            IP address of the vLLM server. Ignored if `base_url` is provided.
        server_port (`int`, *optional*, defaults to `8000`):
            Port number of the vLLM server. Ignored if `base_url` is provided.
        group_port (`int`, *optional*, defaults to `51216`):
            Port number for the weight update group.
        connection_timeout (`float`, *optional*, defaults to `0.0`):
            Total timeout duration in seconds to wait for the server to be up. If the server is not up after the
            timeout, a `ConnectionError` is raised.

    Examples:
        Run the vLLM server with the model `Qwen/Qwen2.5-7B`:

        ```
        $ trl vllm-serve --model Qwen/Qwen2.5-7B
        ...
        INFO:     Application startup complete.
        INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
        ```

        Use the client to generate completions and update model weights:

        ```python
        >>> from trl.extras.vllm_client import VLLMClient

        >>> client = VLLMClient()
        >>> client.generate(["Hello, AI!", "Tell me a joke"])
        [[2980, 498, 1492, 752, 448, 264, 13027, 8645, 30, 358, 2776, 4460, 311, 3270, 264, 2025],
         [911, 7988, 1251, 382, 3838, 653, 498, 1618, 4325, 879, 2581, 20027, 264, 21428, 30, 362]]

        >>> from transformers import AutoModelForCausalLM

        >>> model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-7B", device_map="cuda")
        >>> client.init_communicator(device="cuda")
        >>> client.update_model_params(model)
        ```

        There are several ways to initialize the client:

        ```python
        VLLMClient(base_url="http://localhost:8000")
        VLLMClient(base_url="http://192.168.1.100:8000")
        VLLMClient(host="localhost", server_port=8000)
        VLLMClient(host="192.168.1.100", server_port=8000)
        ```
    N0.0.0.0@            base_urlhostserver_port
group_portconnection_timeoutc                 C   s   t  stdt stdt | _|d ur4t|}t|j	| _
|jp&d}| d|j |j | _n|| _
|| _d| j
 d| j | _|| _| | d S )NzIrequests is not installed. Please install it with `pip install requests`.zAvLLM is not installed. Please install it with `pip install vllm`.httpz://zhttp://:)r   ImportErrorr
   requestsSessionsessionr   socketgethostbynamehostnamer   schemenetlocpathr   r   r   check_server)selfr   r   r   r   r   
parsed_urlr"    r(   J/home/ubuntu/.local/lib/python3.10/site-packages/trl/extras/vllm_client.py__init__h   s   

zVLLMClient.__init__       @total_timeoutretry_intervalc              
   C   s   | j  d}t }	 zt|}W n+ tjjy= } zt | }||kr3td| j  d| d|W Y d}~nd}~ww |jdkrUd|jv rN|jd | _	t
d	 dS t
d
| d t| q)a  
        Check server availability with retries on failure, within a total timeout duration. If the server is not up
        after the total timeout duration, raise a `ConnectionError`.

        Args:
            retry_interval (`float`, *optional*, defaults to `2.0`):
                Interval in seconds between retries.
            total_timeout (`float`, *optional*, defaults to `0.0`):
                Total timeout duration in seconds.
        z/health/Tz$The vLLM server can't be reached at z after zF seconds. Make sure the server is running by running `trl vllm-serve`.N   zX-Forwarded-ForzServer is up!z"Server is not up yet. Retrying in z seconds...)r   timer   get
exceptionsRequestExceptionr   status_codeheadersr   loggerinfosleep)r&   r,   r-   url
start_timeresponseexcelapsed_timer(   r(   r)   r%      s0   
	


zVLLMClient.check_server         ?   promptsimagesnrepetition_penaltytemperaturetop_ptop_kmin_p
max_tokensguided_decoding_regexgeneration_kwargsreturnc                    s   | j  d}dd  |r fdd|D nd}| jj||||||||||	|
|p(i dd}|jd	kr8| d
 S td|j d|j )a  
        Generates model completions for the provided prompts.

        Args:
            prompts (`list[str]`):
                List of text prompts for which the model will generate completions.
            images (`list[PIL.Image]` or `None`, *optional*, defaults to `None`):
                List of PIL Images to send along with the prompts.
            n (`int`, *optional*, defaults to `1`):
                Number of completions to generate for each prompt.
            repetition_penalty (`float`, *optional*, defaults to `1.0`):
                Parameter for repetition penalty. 1.0 means no penalty.
            temperature (`float`, *optional*, defaults to `1.0`):
                Temperature parameter for sampling. Higher values increase diversity.
            top_p (`float`, *optional*, defaults to `1.0`):
                Top-p sampling parameter.`1.0` means no truncation.
            top_k (`int`, *optional*, defaults to `-1`):
                Top-k sampling parameter. `-1` means no truncation.
            min_p (`float`, *optional*, defaults to `0.0`):
                Minimum probability for sampling.
            max_tokens (`int`, *optional*, defaults to `16`):
                Maximum number of tokens to generate for each prompt.
            guided_decoding_regex (`str` or `None`, *optional*, defaults to `None`):
                Regular expression to guide the decoding process.
            generation_kwargs (`dict` or `None`, *optional*, defaults to `None`):
                Additional generation parameters to pass to the vLLM `SamplingParams`. This can include parameters like
                `seed`, `frequency_penalty`, etc. If it contains keys that conflict with the other parameters, they
                will override them.

        Returns:
            `list[list[int]]`:
                List of lists of token IDs representing the model-generated completions for each prompt.
        z
/generate/c                 S   s,   t  }| j|dd | }t|dS )NPNG)formatzutf-8)r   savegetvaluebase64	b64encodedecode)imagebuffer	img_bytesr(   r(   r)   pil_to_base64   s   z*VLLMClient.generate.<locals>.pil_to_base64c                    s   g | ]} |qS r(   r(   ).0imgrW   r(   r)   
<listcomp>   s    z'VLLMClient.generate.<locals>.<listcomp>N)rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rK   jsonr.   completion_idsRequest failed: , )r   r   postr3   r]   	Exceptiontext)r&   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rK   r8   r:   r(   rZ   r)   generate   s*   /
zVLLMClient.generater   devicec                 C   s   | j  d}t|}|jdkr| d }ntd|j d|j |d }|| _| j  d}tt	j
|j}| jj|d| j||d	d
}|jdkrXtd|j d|j td tj| j| j| j|d}t||d| _t| j dS )a  
        Initializes the weight update group in a distributed setup for model synchronization.

        Args:
            device (`torch.device`, `str`, or `int`, *optional*, defaults to `0`):
                Device of trainer main process. It's the device that will be used for the weights synchronization.
                Can be a `torch.device` object, a string like `'cuda:0'`, or an integer device index.
        z/get_world_size/r.   
world_sizer_   r`   r=   z/init_communicator/r   )r   portrf   client_device_uuidr\   g?)r   rg   rankrf   re   N)r   r   r0   r3   r]   rb   rc   ri   strtorchcudaget_device_propertiesuuidr   ra   r   r/   r7   r   creater   r   pynccl_commatexitregisterclose_communicator)r&   re   r8   r:   vllm_world_sizerf   rh   pgr(   r(   r)   init_communicator   s.   



	
zVLLMClient.init_communicatornameweightsc                 C   s~   t |jt|j}}| j d}| jj||||dd}|jdkr.td|j d|j	 | j
j|| jd | j
j  dS )	a0  
        Updates a specific named parameter in the model and broadcasts it to other processes.

        Args:
            name (`str`):
                Name of the layer whose weights are being updated.
            weights (`torch.Tensor`):
                Tensor containing the updated weights.
        z/update_named_param/)rx   dtypeshaper\   r.   r_   r`   )srcN)rk   rz   tupler{   r   r   ra   r3   rb   rc   rq   	broadcastri   groupbarrier)r&   rx   ry   rz   r{   r8   r:   r(   r(   r)   update_named_param(  s   

zVLLMClient.update_named_parammodelc                 C   s$   |  D ]\}}| ||j qdS )z
        Updates all parameters of the given model by calling `update_named_param` for each parameter in the model.

        Args:
            model (`nn.Module`):
                Model whose parameters (weights/biases) are to be updated.
        N)named_parametersr   data)r&   r   rx   paramr(   r(   r)   update_model_params<  s   zVLLMClient.update_model_paramsc                 C   s>   | j  d}| j|}|jdkrtd|j d|j dS )z8
        Resets the prefix cache for the model.
        z/reset_prefix_cache/r.   r_   r`   N)r   r   ra   r3   rb   rc   r&   r8   r:   r(   r(   r)   reset_prefix_cacheH  s
   
zVLLMClient.reset_prefix_cachec                 C   sX   | j  d}z| j|}W n
 ty   Y dS w |jdkr*td|j d|j dS )zW
        Closes the weight update group and cleans up the communication group.
        z/close_communicator/r.   r_   r`   N)r   r   ra   r   r3   rb   rc   r   r(   r(   r)   rt   Q  s   
zVLLMClient.close_communicator)Nr   r   r   r   )r   r+   )
Nr=   r>   r>   r>   r?   r   r@   NN)r   )__name__
__module____qualname____doc__r   rk   intfloatr*   r%   listdictrd   r   rl   re   rw   Tensorr   r   Moduler   r   rt   r(   r(   r(   r)   r   .   sv    ;
'	


O1	r   __main__)SamplingParamsrm   rj   z
Hello, AI!zTell me a joke       )rC   rI   sampling_paramsz
Responses:)AutoModelForCausalLMzQwen/Qwen2.5-7B)+rr   rQ   loggingr   r/   ior   typingr   r   urllib.parser   rl   r   import_utilsr   r	   r
   r   r   ,vllm.distributed.device_communicators.pyncclr   vllm.distributed.utilsr   3vllm_ascend.distributed.device_communicators.pyhcclr   	getLoggerr   r5   r   vllmr   clientrw   rd   	responsesprinttransformersr   from_pretrainedtor   r   r(   r(   r(   r)   <module>   sB   
  6
