o
    biS(                     @   sB  d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZmZmZmZmZmZmZ ddlZddlmZmZmZ ddlmZ erRddlmZ ddlmZ eeZd	ZeG d
d dZ eG dd dZ!eG dd dZ"eG dd dZ#dd Z$de#defddZ%dddZ&G dd deZ'G dd de'Z(dS )z,Metadata exporter API for Ray Data datasets.    N)ABCabstractmethod)	dataclassfield)TYPE_CHECKINGAnyDictListMappingOptionalSequence)EventLogTypecheck_export_api_enabledget_export_event_logger)DataContext)PhysicalOperatorunknownc                   @   s"   e Zd ZU dZeed< eed< dS )SubStagezRepresents a sub-stage within an operator in the DAG.

    Attributes:
        name: The name of the sub-stage.
        id: The unique identifier of the sub-stage.
    nameidN)__name__
__module____qualname____doc__str__annotations__ r   r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/metadata_exporter.pyr      s   
 r   c                   @   sp   e Zd ZU dZeed< eed< eed< eedZe	e ed< eedZ
e	e ed< eedZeeef ed< d	S )
Operatora  Represents a data processing operator in the DAG.

    Attributes:
        name: The name of the operator.
        id: The unique identifier of the operator within the DAG structure, typically
            incorporating a position or index (e.g., "ReadParquet_0"). This is used for
            referencing operators within the DAG topology.
        uuid: The system-generated UUID of the physical operator instance. This is the
            internal unique identifier created when the operator instance is initialized
            and remains consistent throughout its lifetime.
        input_dependencies: List of operator IDs that this operator depends on for input.
        sub_stages: List of sub-stages contained within this operator.
        args: User-specified arguments associated with the operator, which may
            include configuration settings, options, or other relevant data for the operator.
    r   r   uuiddefault_factoryinput_dependencies
sub_stagesargsN)r   r   r   r   r   r   r   listr"   r	   r#   r   dictr$   r   r   r   r   r   r   r   +   s   
 r   c                   @   sJ   e Zd ZU dZeedZee e	d< e
dddedef dd fdd	Zd
S )TopologyzRepresents the complete structure of the operator DAG.

    Attributes:
        operators: List of all operators in the DAG.
    r    	operatorsdagr   op_to_idreturnc           	   	      s   t  }|  D ]F} | }t|j||j fdd|jD t| d}t|drG|j	rGt
|j	D ]\}}| d| }|jt||d q1|j| q|S )zCreate a Topology structure from the physical operator DAG.

        Args:
            dag: The operator DAG to analyze.

        Returns:
            A Topology object representing the operator DAG structure.
        c                    s   g | ]
}| v r | qS r   r   ).0depr*   r   r   
<listcomp>g   s    z5Topology.create_topology_metadata.<locals>.<listcomp>)r   r   r   r"   r$   _sub_progress_bar_names_sub_r   r   )r'   post_order_iterr   r   r   r"   sanitize_for_struct_get_logical_argshasattrr0   	enumerater#   appendr   r(   )	r)   r*   resultopop_idoperatorjsub_namesub_stage_idr   r.   r   create_topology_metadataO   s$   

z!Topology.create_topology_metadataN)r   r   r   r   r   r%   r(   r	   r   r   staticmethodr   r   r@   r   r   r   r   r'   E   s   
 
r'   c                   @   s:   e Zd ZU dZeed< eed< eed< eed< eed< dS )DatasetMetadataa  Metadata about a Ray Data dataset.

    This class represents the metadata associated with a dataset, including its provenance
    information and execution details.

    Attributes:
        job_id: The ID of the job running this dataset.
        topology: The structure of the dataset's operator DAG.
        dataset_id: The unique ID of the dataset.
        start_time: The timestamp when the dataset execution started.
        data_context: The DataContext attached to the dataset.
    job_idtopology
dataset_id
start_timedata_contextN)	r   r   r   r   r   r   r'   floatr   r   r   r   r   rB   w   s   
 rB   c                 C   s   t | trdd |  D S t | ttttfs| d u r| S t | tr)dd | D S zt	| W S  t
tfyM   zt| W  Y S  tyL   t Y  Y S w w )Nc                 S   s   i | ]	\}}|t |qS r   r4   )r,   kvr   r   r   
<dictcomp>   s    z'sanitize_for_struct.<locals>.<dictcomp>c                 S   s   g | ]}t |qS r   rI   )r,   rK   r   r   r   r/      s    z'sanitize_for_struct.<locals>.<listcomp>)
isinstancer
   itemsr   intrH   boolr   jsondumps	TypeErrorOverflowError	ExceptionUNKNOWN)objr   r   r   r4      s   

r4   dataset_metadatar+   c                 C   s   ddl m} ddlm} ddlm}m}m}m} | }| }| j	j
D ]<}	| }
|
|	j ||	j|	j|	j|
d}|	jD ]}|j| q;|	jD ]}||j|jd}|j| qG|j
| q"| }|t|| j || j| j| j|d}|j	| |S )a  Convert the dataset metadata to a protobuf message.

    Args:
        dataset_metadata: DatasetMetadata object containing the dataset's
            information and DAG structure.

    Returns:
        The protobuf message representing the dataset metadata.
    r   )asdict)Struct)ExportDatasetMetadatar   r   r'   )r   r   r   r$   r2   )rE   rC   rF   rG   )dataclassesrY   google.protobuf.struct_pb2rZ   .ray.core.generated.export_dataset_metadata_pb2r[   r   r   r'   rD   r(   updater$   r   r   r   r"   r8   r#   r4   rG   rE   rC   rF   CopyFrom)rX   rY   rZ   ProtoDatasetMetadataProtoOperatorProtoSubStageProtoTopologyproto_dataset_metadataproto_topologyr:   r$   proto_operatordep_id	sub_stageproto_sub_stagerG   r   r   r   dataset_metadata_to_proto   sB   


rk   DatasetMetadataExporterc                   C   s   t  S )znGet the dataset metadata exporter instance.

    Returns:
        The dataset metadata exporter instance.
    )LoggerDatasetMetadataExportercreate_if_enabledr   r   r   r   get_dataset_metadata_exporter   s   ro   c                   @   s@   e Zd ZdZededdfddZeeded  fddZ	dS )	rl   zAbstract base class for dataset metadata exporters.

    Implementations of this interface can export Ray Data metadata to various destinations
    like log files, databases, or monitoring systems.
    rX   r+   Nc                 C      dS )zExport dataset metadata to the destination.

        Args:
            dataset_metadata: DatasetMetadata object containing dataset information.
        Nr   )selfrX   r   r   r   export_dataset_metadata   s   z/DatasetMetadataExporter.export_dataset_metadatac                 C   rp   )zCreate an exporter instance if the export functionality is enabled.

        Returns:
            An exporter instance if enabled, None otherwise.
        Nr   )clsr   r   r   rn      s   z)DatasetMetadataExporter.create_if_enabled)
r   r   r   r   r   rB   rr   classmethodr   rn   r   r   r   r   rl      s    c                   @   sH   e Zd ZdZdejfddZdeddfdd	Ze	de
d  fd
dZdS )rm   zDataset metadata exporter implementation that uses the Ray export event logger.

    This exporter writes dataset metadata to log files using Ray's export event system.
    loggerc                 C   s
   || _ dS )zInitialize with a configured export event logger.

        Args:
            logger: The export event logger to use for writing events.
        N)_export_logger)rq   ru   r   r   r   __init__
  s   
z&LoggerDatasetMetadataExporter.__init__rX   r+   Nc                 C   s   t |}| j| dS )zExport dataset metadata using the export event logger.

        Args:
            dataset_metadata: DatasetMetadata object containing dataset information.
        N)rk   rv   
send_event)rq   rX   data_metadata_protor   r   r   rr     s   z5LoggerDatasetMetadataExporter.export_dataset_metadatac                 C   sn   ddl m} t|jj}|sdS tjtj	j
j d}zttj|}t|W S  ty6   |d Y dS w )zCreate a logger-based exporter if the export API is enabled.

        Returns:
            A LoggerDatasetMetadataExporter instance if enabled, None otherwise.
        r   )ExportEventNlogszcUnable to initialize the export event logger, so no Dataset Metadata export events will be written.)#ray.core.generated.export_event_pb2rz   r   
SourceTypeEXPORT_DATASET_METADATAospathjoinray_privateworker_global_nodeget_session_dir_pathr   r   DATASET_METADATArm   rU   	exception)rs   rz   &is_dataset_metadata_export_api_enabledlog_directoryru   r   r   r   rn     s*   
z/LoggerDatasetMetadataExporter.create_if_enabled)r   r   r   r   loggingLoggerrw   rB   rr   rt   r   rn   r   r   r   r   rm     s    	rm   )r+   rl   ))r   rQ   r   r   abcr   r   r\   r   r   typingr   r   r   r	   r
   r   r   r   &ray._private.event.export_event_loggerr   r   r   ray.data.contextr   ray.data9ray.data._internal.execution.interfaces.physical_operatorr   	getLoggerr   ru   rV   r   r   r'   rB   r4   rk   ro   rl   rm   r   r   r   r   <module>   s8    $
1
A	