o
    mi3                     @   s  d dl Z d dlZd dlZd dlmZmZmZmZ d dlZz5d dl	m
Z d dlmZ d dlmZ d dlmZ d dlmZ dZeeeek rPed	e d
e  W n ey_   ed Y nw ddlmZ e eZde dZdd Zdd Zdd Zdd Z dd Z!		d*ddZdede"fdd Z#				d+ded!ee" d"ee" d#eee"  d$eee"e"f  f
d%d&Z$d'e"de"fd(d)Z%dS ),    N)CallableListMappingOptional)__version__)
structures)(_create_task_factory_from_component_spec)_func_to_component_spec)parse_versionz1.6.1zYour version of kfp z/ may not work.  This integration requires kfp>=z(kfp not found!  Please `pip install kfp`   )	wandb_logz
import typing
from typing import NamedTuple

import collections
from collections import namedtuple

import kfp
from kfp import components
from kfp.components import InputPath, OutputPath

import wandb


c                 C   sL   dd }|| D ]\}}t j|}|r t||r t||d u r# dS qdS )Nc                 S   sh   |  d}g g }}t|d d dD ]\}}d|d | }|| }|| || qt||S )N.r   )split	enumeratejoinappendzip)	full_func
componentsparentschildreni_parentchild r   ]/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/wandb/integration/kfp/kfp_patch.pyget_parent_child_pairs/   s   



z0full_path_exists.<locals>.get_parent_child_pairsFT)wandbutil
get_modulehasattrgetattr)r   r   r   r   moduler   r   r   full_path_exists.   s   
r&   c                 C   s   t j| }d}|  d|j }t|s#t d|  d|j d |S t j|jg  ||jgt j|j vrWt|d|j t	||j t||j| t j|j 
||jg d}|S )NFr   zFailed to patch z4!  Please check if this package/module is installed!orig_T)r    r!   r"   __name__r&   	termerrorpatched
setdefaultsetattrr$   r   )module_namefuncr%   successr   r   r   r   patch@   s   r0   c                 C   sH   | t jv r"t j|  D ]\}}t||t|d|  q
g t j| < d S d S )Nr'   )r    r*   r,   r$   )r-   r%   r.   r   r   r   unpatchU   s
   
r1   c                   C   s   t d t d t d d S )Nkfp.componentskfp.components._python_opwandb.integration.kfp)r1   r   r   r   r   unpatch_kfp\   s   r5   c                  C   sf   dt fdt fdtfdtfg} g }| D ]\}}t||}|| qt|s1td tdt d S d S )Nr2   r3   zSFailed to patch one or more kfp functions.  Patching @wandb_log decorator to no-op.r4   )	create_component_from_func_get_function_source_definitionstrip_type_hintsr0   r   allr    r)   r   )to_patch	successesr-   r.   r/   r   r   r   	patch_kfpb   s*   
r<   Tc                    s,   ddl m   fdd}| du r|S || S )zsWrap a standard python function and log to W&B.

    NOTE: Because patching failed, this decorator is a no-op.
    r   wrapsc                    s     fdd}|S )Nc                     s    | i |S )Nr   )argskwargsr.   r   r   wrapper   s   z-wandb_log.<locals>.decorator.<locals>.wrapperr   )r.   rB   r=   rA   r   	decorator   s   zwandb_log.<locals>.decoratorN)	functoolsr>   )r.   log_component_filerC   r   r=   r   r   ~   s
   	r   r.   returnc                 C   sN   t | }t|}|d}tdd |}|s"td| j dd	|S )zGet the source code of a function.

    This function is modified from KFP.  The original source is below:
    https://github.com/kubeflow/pipelines/blob/b6406b02f45cdb195c7b99e2f6d22bf85b12268b/sdk/python/kfp/components/_python_op.py#L300-L319.
    r   c                 S   s   |  d S )N)defz
@wandb_log)
startswith)xr   r   r   <lambda>   s    z1_get_function_source_definition.<locals>.<lambda>z6Failed to dedent and clean up the source of function "z(". It is probably not properly indented.)
inspect	getsourcetextwrapdedentr   	itertools	dropwhile
ValueErrorr(   r   )r.   	func_codefunc_code_linesr   r   r   r7      s   



r7   output_component_file
base_imagepackages_to_installannotationsc                 C   sR   ddg}|s	|}n||7 }t | t||d}|rtj|d|_|r%|| t|S )a  Convert a Python function to a component and returns a task factory.

    The returned task factory accepts arguments and returns a task object.

    This function is modified from KFP.  The original source is below:
    https://github.com/kubeflow/pipelines/blob/b6406b02f45cdb195c7b99e2f6d22bf85b12268b/sdk/python/kfp/components/_python_op.py#L998-L1110.

    Args:
        func: The python function to convert
        base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is the python image corresponding to the current python environment.
        output_component_file: Optional. Write a component definition to a local file. The produced component file can be loaded back by calling :code:`load_component_from_file` or :code:`load_component_from_uri`.
        packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
        annotations: Optional. Allows adding arbitrary key-value data to the component specification.

    Returns:
        A factory function with a strongly-typed signature taken from the python function.
        Once called with the required arguments, the factory constructs a task instance that can run the original function in a container.

    Examples:
        The function name and docstring are used as component name and description. Argument and return annotations are used as component input/output types::

            def add(a: float, b: float) -> float:
                """Return sum of two arguments"""
                return a + b


            # add_op is a task factory function that creates a task object when given arguments
            add_op = create_component_from_func(
                func=add,
                base_image="python:3.7",  # Optional
                output_component_file="add.component.yaml",  # Optional
                packages_to_install=["pandas==0.24"],  # Optional
            )

            # The component spec can be accessed through the .component_spec attribute:
            add_op.component_spec.save("add.component.yaml")

            # The component function can be called with arguments to create a task:
            add_task = add_op(1, 3)

            # The resulting task has output references, corresponding to the component outputs.
            # When the function only has a single anonymous return value, the output name is "Output":
            sum_output_ref = add_task.outputs["Output"]

            # These task output references can be passed to other component functions, constructing a computation graph:
            task2 = add_op(sum_output_ref, 5)


        :code:`create_component_from_func` function can also be used as decorator::

            @create_component_from_func
            def add_op(a: float, b: float) -> float:
                """Return sum of two arguments"""
                return a + b

        To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax::

            from typing import NamedTuple


            def add_multiply_two_numbers(a: float, b: float) -> NamedTuple(
                "Outputs", [("sum", float), ("product", float)]
            ):
                """Return sum and product of two arguments"""
                return (a + b, a * b)


            add_multiply_op = create_component_from_func(add_multiply_two_numbers)

            # The component function can be called with arguments to create a task:
            add_multiply_task = add_multiply_op(1, 3)

            # The resulting task has output references, corresponding to the component outputs:
            sum_output_ref = add_multiply_task.outputs["sum"]

            # These task output references can be passed to other component functions, constructing a computation graph:
            task2 = add_multiply_op(sum_output_ref, 5)

        Bigger data should be read from files and written to files.
        Use the :py:class:`kfp.components.InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function.
        Use the :py:class:`kfp.components.OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.

        You can specify the type of the consumed/produced data by specifying the type argument to :py:class:`kfp.components.InputPath` and :py:class:`kfp.components.OutputPath`. The type can be a python type or an arbitrary type name string. :code:`OutputPath('CatBoostModel')` means that the function states that the data it has written to a file has type :code:`CatBoostModel`. :code:`InputPath('CatBoostModel')` means that the function states that it expect the data it reads from a file to have type 'CatBoostModel'. When the pipeline author connects inputs to outputs the system checks whether the types match.
        Every kind of data can be consumed as a file input. Conversely, bigger data should not be consumed by value as all value inputs pass through the command line.

        Example of a component function declaring file input and output::

            def catboost_train_classifier(
                training_data_path: InputPath("CSV"),  # Path to input data file of type "CSV"
                trained_model_path: OutputPath(
                    "CatBoostModel"
                ),  # Path to output data file of type "CatBoostModel"
                number_of_trees: int = 100,  # Small output of type "Integer"
            ) -> NamedTuple(
                "Outputs",
                [
                    ("Accuracy", float),  # Small output of type "Float"
                    ("Precision", float),  # Small output of type "Float"
                    ("JobUri", "URI"),  # Small output of type "URI"
                ],
            ):
                """Train CatBoost classification model"""
                ...

                return (accuracy, precision, recall)
    r    kfp)r.   
extra_coderU   rV   )rW   )r	   wandb_logging_extrasr   MetadataSpecmetadatasaver   )r.   rT   rU   rV   rW   core_packagescomponent_specr   r   r   r6      s"   q
r6   source_codec                 C   s   | S )zStrip type hints from source code.

    This function is modified from KFP.  The original source is below:
    https://github.com/kubeflow/pipelines/blob/b6406b02f45cdb195c7b99e2f6d22bf85b12268b/sdk/python/kfp/components/_python_op.py#L237-L248.
    r   )r`   r   r   r   r8   <  s   r8   )NT)NNNN)&rK   rO   rM   typingr   r   r   r   r    rX   r   kfp_versionkfp.componentsr   kfp.components._componentsr   kfp.components._python_opr	   
wandb.utilr
   MIN_KFP_VERSIONtermwarnImportErrorr)   wandb_loggingr   rL   decorator_coderZ   r&   r0   r1   r5   r<   strr7   r6   r8   r   r   r   r   <module>   sf    



 
