o
    S۷i"                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZ er4d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ G d	d
 d
eZd#ddZd$ddZd$ddZd$ddZd#ddZd#ddZG dd dZG dd deZd%d!d"ZdS )&    N)Integration)capture_internal_exceptionsensure_integration_enabled)TYPE_CHECKING)Any)Optional)EventHintSparkContextc                   @   s   e Zd ZdZedddZdS )SparkIntegrationsparkreturnNc                   C   s
   t   d S N)_setup_sentry_tracing r   r   `/home/ubuntu/vllm_env/lib/python3.10/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_once   s   
zSparkIntegration.setup_oncer   N)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s    r   r   c                  C   s:   ddl m}  | j}|r|d|j |d|j dS dS )z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   r
   sentry_app_namesentry_application_idN)pysparkr   _active_spark_contextsetLocalPropertyappNameapplicationId)r   spark_contextr   r   r   _set_app_properties   s   r"   scr   c                 C   s4   ddl m} | j}|| t }| j | dS )zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)pyspark.java_gatewayr$   _gatewaySentryListener_jscr#   addSparkListener)r#   r$   gwlistenerr   r   r   _start_sentry_listener*   s
   r,   c                    s    t  }|jd	 fdd}d S )
Neventr   hintr	   r   Optional[Event]c                    sX  t   t td u r| W  d    S  jd u r$| W  d    S | di d   | di d j	d | d d j	d | d d j	d	 | d d
 j	d | d d j
 | d d j | d d j | d d j | d d j | di d j W d    | S 1 sw   Y  | S )Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portspark_versionapp_nameapplication_idmaster
spark_homeextraweb_url)r   
sentry_sdk
get_clientget_integrationr   r   
setdefault	sparkUser_confgetversionr   r    r6   	sparkHomeuiWebUrl)r-   r.   r#   r   r   process_event9   s6   


z+_add_event_processor.<locals>.process_event)r-   r   r.   r	   r   r/   )r:   get_isolation_scopeadd_event_processor)r#   scoperE   r   rD   r   _add_event_processor6   s   rI   c                 C   s   t |  t  t|  d S r   )r,   r"   rI   rD   r   r   r   _activate_integrationX   s   rJ   c                     sD   ddl m}  | j tt 									d fd
d}|| _d S )Nr   r
   selfr   argsr   kwargsr   Optional[Any]c                    s"    | g|R i |}t |  |S r   )rJ   )rK   rL   rM   rvspark_context_initr   r   "_sentry_patched_spark_context_initc   s   zE_patch_spark_context_init.<locals>._sentry_patched_spark_context_init)rK   r   rL   r   rM   r   r   rN   )r   r   _do_initr   r   )r   rR   r   rP   r   _patch_spark_context_init^   s   
rT   c                  C   s.   ddl m}  | jd urt| j d S t  d S )Nr   r
   )r   r   r   rJ   rT   r
   r   r   r   r   n   s
   


r   c                   @   s  e Zd ZdNddZdOdd	ZdPddZdQddZdRddZdSddZdTddZ	dUddZ
				dVddZdWd d!ZdXd#d$ZdYd&d'ZdZd)d*Zd[d,d-Zd\d/d0Zd]d2d3Zd^d5d6Zd_d8d9Zd`d;d<Zdad>d?ZdbdAdBZdcdDdEZdddGdHZdedJdKZG dLdM dMZdS )fSparkListenerapplicationEndr   r   Nc                 C      d S r   r   )rK   rV   r   r   r   onApplicationEndx      zSparkListener.onApplicationEndapplicationStartc                 C   rW   r   r   )rK   rZ   r   r   r   onApplicationStart{   rY   z SparkListener.onApplicationStartblockManagerAddedc                 C   rW   r   r   )rK   r\   r   r   r   onBlockManagerAdded~   rY   z!SparkListener.onBlockManagerAddedblockManagerRemovedc                 C   rW   r   r   )rK   r^   r   r   r   onBlockManagerRemoved   rY   z#SparkListener.onBlockManagerRemovedblockUpdatedc                 C   rW   r   r   )rK   r`   r   r   r   onBlockUpdated   rY   zSparkListener.onBlockUpdatedenvironmentUpdatec                 C   rW   r   r   )rK   rb   r   r   r   onEnvironmentUpdate   rY   z!SparkListener.onEnvironmentUpdateexecutorAddedc                 C   rW   r   r   )rK   rd   r   r   r   onExecutorAdded   rY   zSparkListener.onExecutorAddedexecutorBlacklistedc                 C   rW   r   r   )rK   rf   r   r   r   onExecutorBlacklisted   rY   z#SparkListener.onExecutorBlacklistedexecutorBlacklistedForStagec                 C   rW   r   r   )rK   rh   r   r   r   onExecutorBlacklistedForStage   s   z+SparkListener.onExecutorBlacklistedForStageexecutorMetricsUpdatec                 C   rW   r   r   )rK   rj   r   r   r   onExecutorMetricsUpdate   rY   z%SparkListener.onExecutorMetricsUpdateexecutorRemovedc                 C   rW   r   r   )rK   rl   r   r   r   onExecutorRemoved   rY   zSparkListener.onExecutorRemovedjobEndc                 C   rW   r   r   )rK   rn   r   r   r   onJobEnd   rY   zSparkListener.onJobEndjobStartc                 C   rW   r   r   )rK   rp   r   r   r   
onJobStart   rY   zSparkListener.onJobStartnodeBlacklistedc                 C   rW   r   r   )rK   rr   r   r   r   onNodeBlacklisted   rY   zSparkListener.onNodeBlacklistednodeBlacklistedForStagec                 C   rW   r   r   )rK   rt   r   r   r   onNodeBlacklistedForStage   rY   z'SparkListener.onNodeBlacklistedForStagenodeUnblacklistedc                 C   rW   r   r   )rK   rv   r   r   r   onNodeUnblacklisted   rY   z!SparkListener.onNodeUnblacklistedr-   c                 C   rW   r   r   )rK   r-   r   r   r   onOtherEvent   rY   zSparkListener.onOtherEventspeculativeTaskc                 C   rW   r   r   )rK   ry   r   r   r   onSpeculativeTaskSubmitted   rY   z(SparkListener.onSpeculativeTaskSubmittedstageCompletedc                 C   rW   r   r   )rK   r{   r   r   r   onStageCompleted   rY   zSparkListener.onStageCompletedstageSubmittedc                 C   rW   r   r   )rK   r}   r   r   r   onStageSubmitted   rY   zSparkListener.onStageSubmittedtaskEndc                 C   rW   r   r   )rK   r   r   r   r   	onTaskEnd   rY   zSparkListener.onTaskEndtaskGettingResultc                 C   rW   r   r   )rK   r   r   r   r   onTaskGettingResult   rY   z!SparkListener.onTaskGettingResult	taskStartc                 C   rW   r   r   )rK   r   r   r   r   onTaskStart   rY   zSparkListener.onTaskStartunpersistRDDc                 C   rW   r   r   )rK   r   r   r   r   onUnpersistRDD   rY   zSparkListener.onUnpersistRDDc                   @   s   e Zd ZdgZdS )zSparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   
implementsr   r   r   r   Java   s    
r   )rV   r   r   N)rZ   r   r   N)r\   r   r   N)r^   r   r   N)r`   r   r   N)rb   r   r   N)rd   r   r   N)rf   r   r   N)rh   r   r   N)rj   r   r   N)rl   r   r   Nrn   r   r   Nrp   r   r   N)rr   r   r   N)rt   r   r   N)rv   r   r   N)r-   r   r   N)ry   r   r   Nr{   r   r   Nr}   r   r   N)r   r   r   N)r   r   r   N)r   r   r   N)r   r   r   N)r   r   r   rX   r[   r]   r_   ra   rc   re   rg   ri   rk   rm   ro   rq   rs   ru   rw   rx   rz   r|   r~   r   r   r   r   r   r   r   r   r   rU   w   s<    























rU   c                	   @   sR   e Zd Z	ddededdddfddZdddZdddZdddZdddZdS )r'   NlevelmessagedatazOptional[dict[str, Any]]r   c                 C   s   t  j|||d d S )Nr   r   r   )r:   rF   add_breadcrumb)rK   r   r   r   r   r   r   _add_breadcrumb   s   
zSentryListener._add_breadcrumbrp   r   c                 C   s2   t    d| }| jd|d t  d S )NzJob {} Startedinfo)r   r   )r:   rF   clear_breadcrumbsformatjobIdr   r"   )rK   rp   r   r   r   r   rq      s   
zSentryListener.onJobStartrn   c                 C   sb   d}d}d|   i}|   dkrd}d| }n	d}d| }| j|||d d S )	N resultJobSucceededr   zJob {} EndedwarningzJob {} Failedr   )	jobResulttoStringr   r   r   )rK   rn   r   r   r   r   r   r   ro      s   zSentryListener.onJobEndr}   c                 C   sT   |  }d| }d| i}t|}|d ur||d< | jd||d t  d S )NzStage {} Submittedname	attemptIdr   r   )	stageInfor   stageIdr   _get_attempt_idr   r"   )rK   r}   
stage_infor   r   
attempt_idr   r   r   r~      s   
zSentryListener.onStageSubmittedr{   c                 C   s   ddl m} | }d}d}d| i}t|}|d ur ||d< z|  |d< d| }d}W n |yE   d	| }d
}Y nw | j	|||d d S )Nr   )Py4JJavaErrorr   r   r   reasonzStage {} Failedr   zStage {} Completedr   r   )
py4j.protocolr   r   r   r   failureReasonr@   r   r   r   )rK   r{   r   r   r   r   r   r   r   r   r   r|      s"   zSentryListener.onStageCompletedr   r   r   r   r   )	r   r   r   strr   rq   ro   r~   r|   r   r   r   r   r'      s    




r'   r   r   Optional[int]c                 C   s>   z|   W S  ty   Y nw z|  W S  ty   Y d S w r   )r   	ExceptionattemptNumber)r   r   r   r   r     s   

r   r   )r#   r   r   N)r   r   r   r   )r:   sentry_sdk.integrationsr   sentry_sdk.utilsr   r   typingr   r   r   sentry_sdk._typesr   r	   r   r   r   r"   r,   rI   rJ   rT   r   rU   r'   r   r   r   r   r   <module>   s&    



"

	PD