o
    ٷi7                     @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZ ddl	m
Z
 e
eZded	ed
edededededeeeef gdf dedefddZdeeef deeef d	edeeeeef dB f fddZdee defddZdS )    N)Callable)Any)OmniStageTaskType)OrchestratorAggregator   )get_connector_logger	connectorstage_idnext_stage_idreq_idnext_inputssampling_paramsoriginal_promptnext_stage_queue_submit_fnmetricsreturnc	              
   C   s   z_t   }	|||| d| t   dd}
| t|t|t||
\}}}|r]tj||dt|t|t   d}|r@||d< || t   }||	 d }|||||t|d W dS W dS  tyy } zt	d	|| W Y d
}~dS d
}~ww )z
    Attempts to send data via OmniConnector.
    Returns True if successful, False otherwise.
    Encapsulates the logic of preparing payload, sending via connector,
    sending notification, and recording metrics.
    z->)r   stage_transition	timestamp)engine_inputsr   metadataT)type
request_idr   from_connector
from_stageto_stagesent_tsconnector_metadata     @@FzI[Orchestrator] OmniConnector failed for req %s: %s; falling back to queueN)
timeputstrr   GENERATE
on_forwardfloat	Exceptionloggerwarning)r   r	   r
   r   r   r   r   r   r   t0payload_datasuccessserialized_sizer   notify_payloadt1tx_mse r/   a/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/distributed/omni_connectors/adapter.pytry_send_via_connector   sV   "
r1   task
connectorsc              
   C   s  | d }|  dr|  d}t|}|std|| dS ||f}| |}|rzXt }|  d}	|j ||t||	d}
t }|
rWt|
trM|
\}}n|
}t||}nd}d	}|rwt|t	rw| d
}|| d }||d}||fW S td|| W dS  t
y } ztd||| W Y d}~dS d}~ww td|||| dS d	dlm} z|| d
d\}}||fW S  t
y   Y dS w )z
    Attempts to resolve input data from either connector or IPC.
    Returns (engine_inputs, rx_metrics) or (None, None) if failed/skipped.
    r   r   r   zN[Stage-%s] 'from_connector' is true but 'from_stage' is missing for request %s)NNr   )r   Nr   r   r   )rx_decode_time_msrx_transfer_byteszO[Stage-%s] Failed to get data from connector for request %s or payload is emptyzB[Stage-%s] Error retrieving data from connector for request %s: %sz>[Stage-%s] No connector found for edge %s -> %s for request %s) maybe_load_from_ipc_with_metricsengine_inputs_shm)getr    r%   errorr   
isinstancetuplelenserialize_objdictr$   !vllm_omni.entrypoints.stage_utilsr6   )r2   r3   r	   ridr   r   connector_keyr   _t_startr   payload_t_endr(   r*   ein	decode_ms
rx_metricsr.   r6   r   r/   r/   r0   try_recv_via_connector^   sb   	










rH   
prompt_idsc                    s   d d}d}d} fddt tD }|t d}d}t t|d D ]4}|| }||d  }	|d  }
|
|kr@q)|
|krK||	| 7 }q)|
|kr\|t|d	 kr\|d
7 }q)	 q)|| S )zCompute the length of the talker prompt ids.

    Args:
        prompt_ids: The prompt ids tensor.

    Returns:
        The length of the talker prompt ids.
    i\P i"  ih  i#- c                    s   g | ]
}|  kr|qS r/   r/   ).0iim_start_token_idrI   r/   r0   
<listcomp>   s    z4compute_talker_prompt_ids_length.<locals>.<listcomp>r   r      	   )ranger<   append)rI   system_token_iduser_token_idassistant_token_idim_start_indexessum_user_lenassistant_lenrK   sr.   roler/   rL   r0    compute_talker_prompt_ids_length   s(   	
r[   )r   collections.abcr   typingr   r?   r   vllm_omni.metricsr   utils.loggingr   __name__r%   intr    r>   boolr1   r;   rH   listr[   r/   r/   r/   r0   <module>   sL   	

L


R