o
    bi+                     @   s   d dl mZmZmZmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ dZeG dd deZeG d	d
 d
eZeG dd dZdS )    )AnyDictListUnionOptional)DAGNodeget_dag_node_str)type_to_string)DeveloperAPI__in_context_manager__c                
       s   e Zd ZdZddddeeeeeee	f ef f  f fddZ
dee dee	ef d	ee	ef d
ee	ef fddZdd ZdefddZde	defddZde	fddZde	fddZdeee	f defddZdd Zdd Zde	fd d!Z  ZS )"	InputNodeao  Ray dag node used in DAG building API to mark entrypoints of a DAG.

    Should only be function or class method. A DAG can have multiple
    entrypoints, but only one instance of InputNode exists per DAG, shared
    among all DAGNodes.

    Example:

    .. code-block::

                m1.forward
                /       \
        dag_input     ensemble -> dag_output
                \       /
                m2.forward

    In this pipeline, each user input is broadcasted to both m1.forward and
    m2.forward as first stop of the DAG, and authored like

    .. code-block:: python

        import ray

        @ray.remote
        class Model:
            def __init__(self, val):
                self.val = val
            def forward(self, input):
                return self.val * input

        @ray.remote
        def combine(a, b):
            return a + b

        with InputNode() as dag_input:
            m1 = Model.bind(1)
            m2 = Model.bind(2)
            m1_output = m1.forward.bind(dag_input[0])
            m2_output = m2.forward.bind(dag_input.x)
            ray_dag = combine.bind(m1_output, m2_output)

        # Pass mix of args and kwargs as input.
        ray_dag.execute(1, x=2) # 1 sent to m1, 2 sent to m2

        # Alternatively user can also pass single data object, list or dict
        # and access them via list index, object attribute or dict key str.
        ray_dag.execute(UserDataObject(m1=1, m2=2))
        # dag_input.m1, dag_input.m2
        ray_dag.execute([1, 2])
        # dag_input[0], dag_input[1]
        ray_dag.execute({"m1": 1, "m2": 2})
        # dag_input["m1"], dag_input["m2"]
    N)
input_type_other_args_to_resolver   c                   sn   t |dkst |dkrtdi | _|| _|dur+t|tr+|du r%i }t||d< t jg i i |d dS )a  InputNode should only take attributes of validating and converting
        input data rather than the input data itself. User input should be
        provided via `ray_dag.execute(user_input)`.

        Args:
            input_type: Describes the data type of inputs user will be giving.
                - if given through singular InputNode: type of InputNode
                - if given through InputAttributeNodes: map of key -> type
                Used when deciding what Gradio block to represent the input nodes with.
            _other_args_to_resolve: Internal only to keep InputNode's execution
                context throughput pickling, replacement and serialization.
                User should not use or pass this field.
        r   z-InputNode should not take any args or kwargs.Nresult_type_string)other_args_to_resolve)	len
ValueErrorinput_attribute_nodesr   
isinstancetyper
   super__init__)selfr   r   argskwargs	__class__ F/home/ubuntu/.local/lib/python3.10/site-packages/ray/dag/input_node.pyr   C   s   zInputNode.__init__new_args
new_kwargsnew_optionsnew_other_args_to_resolvec                 C   s
   t |dS )N)r   )r   r   r    r!   r"   r#   r   r   r   
_copy_impld   s   
zInputNode._copy_implc                 O   s>   |   sJ dt|dkrt|dkr|d S t|i |S )zExecutor of InputNode.zInputNode is a singleton instance that should be only used in context manager for dag building and execution. See the docstring of class InputNode for examples.   r   )_in_context_managerr   DAGInputDatar   r   r   r   r   r   _execute_implm   s   
zInputNode._execute_implreturnc                 C   s   | j rt| j vr
dS | j t S )z2Return if InputNode is created in context manager.F)_bound_other_args_to_resolveIN_CONTEXT_MANAGERr   r   r   r   r'   {   s
   

zInputNode._in_context_managerkeyvalc                 C   s   || j |< dS )zqSet field in parent DAGNode attribute that can be resolved in both
        pickle and JSON serialization
        Nr,   )r   r/   r0   r   r   r   set_context   s   zInputNode.set_contextc                 C   s
   t | dS )N__InputNode__r   r.   r   r   r   __str__   s   
zInputNode.__str__c                 C   s8   t |ts	J d|| jvrt| |d| j|< | j| S )Nz5Please only access dag input attributes with str key.__getattr__)r   strr   InputAttributeNoder   r/   r   r   r   r5      s   


zInputNode.__getattr__c                 C   sd   t |ttfsJ dd }| jd ur|| jv rt| j| }|| jvr-t| |d|| j|< | j| S )NRPlease only use int index or str as first-level key to access fields of dag input.__getitem__)r   r6   intr   r
   r   r7   )r   r/   r   r   r   r   r:      s   


zInputNode.__getitem__c                 C   s   |  td | S )NT)r2   r-   r.   r   r   r   	__enter__   s   zInputNode.__enter__c                 G   s   d S Nr   )r   r   r   r   r   __exit__   s   zInputNode.__exit__c                 C      d| j v r
| j d S dS zvGet type of the output of this DAGNode.

        Generated by ray.experimental.gradio_utils.type_to_string().
        r   Nr1   r.   r   r   r   get_result_type      

zInputNode.get_result_type)__name__
__module____qualname____doc__r   r   r   r   r;   r6   r   r   r   r%   r*   boolr'   r2   r4   r5   r:   r<   r>   rA   __classcell__r   r   r   r   r      s2    9!



	

r   c                
       s   e Zd ZdZ	ddedeeef dedef fddZd	e	e
 d
eee
f deee
f deee
f fddZdd ZdefddZdefddZedeeef fddZ  ZS )r7   a5  Represents partial access of user input based on an index (int),
     object attribute or dict key (str).

    Examples:

        .. code-block:: python

            with InputNode() as dag_input:
                a = dag_input[0]
                b = dag_input.x
                ray_dag = add.bind(a, b)

            # This makes a = 1 and b = 2
            ray_dag.execute(1, x=2)

            with InputNode() as dag_input:
                a = dag_input[0]
                b = dag_input[1]
                ray_dag = add.bind(a, b)

            # This makes a = 2 and b = 3
            ray_dag.execute(2, 3)

            # Alternatively, you can input a single object
            # and the inputs are automatically indexed from the object:
            # This makes a = 2 and b = 3
            ray_dag.execute([2, 3])
    Ndag_input_noder/   accessor_methodr   c              
      s2   || _ || _|| _t g i i ||||d d S )N)rI   r/   rJ   r   )_dag_input_node_key_accessor_methodr   r   )r   rI   r/   rJ   r   r   r   r   r      s   zInputAttributeNode.__init__r    r!   r"   r#   c                 C   s   t |d |d |d |d S )NrI   r/   rJ   r   )r7   r$   r   r   r   r%      s   zInputAttributeNode._copy_implc                 O   sv   t | jtr| j| j S | j}t | jtr,| jdkr|| j S | jdkr*t|| jS dS t | jtr7|| j S td)a0  Executor of InputAttributeNode.

        Args and kwargs are to match base class signature, but not in the
        implementation. All args and kwargs should be resolved and replaced
        with value in bound_args and bound_kwargs via bottom-up recursion when
        current node is executed.
        r:   r5   r9   N)	r   rK   r(   rL   r6   rM   getattrr;   r   )r   r   r   user_input_python_objectr   r   r   r*      s   	



z InputAttributeNode._execute_implr+   c                 C   s   t | d| j dS )Nz["z"])r	   rL   r.   r   r   r   r4     s   zInputAttributeNode.__str__c                 C   r?   r@   r1   r.   r   r   r   rA     rB   z"InputAttributeNode.get_result_typec                 C   s   | j S r=   )rL   r.   r   r   r   r/   &  s   zInputAttributeNode.keyr=   )rC   rD   rE   rF   r   r   r;   r6   r   r   r   r   r%   r*   r4   rA   propertyr/   rH   r   r   r   r   r7      s4    "




 r7   c                   @   s2   e Zd ZdZdd Zdeeef defddZ	dS )	r(   zIf user passed multiple args and kwargs directly to dag.execute(), we
    generate this wrapper for all user inputs as one object, accessible via
    list index or object attribute key.
    c                 O   s   t || _|| _d S r=   )list_args_kwargsr)   r   r   r   r   2  s   

zDAGInputData.__init__r/   r+   c                 C   s0   t |tr
| j| S t |tr| j| S td)Nr9   )r   r;   rR   r6   rS   r   r8   r   r   r   r:   6  s   



zDAGInputData.__getitem__N)
rC   rD   rE   rF   r   r   r;   r6   r   r:   r   r   r   r   r(   +  s    r(   N)typingr   r   r   r   r   ray.dagr   ray.dag.format_utilsr	   ray.experimental.gradio_utilsr
   ray.util.annotationsr   r-   r   r7   r(   r   r   r   r   <module>   s     -r