o
    8wi.M                     @   s~   d Z ddlZddlmZ ddlmZ eG dd dZG dd dZG d	d
 d
eZdd Z	e	Z
dd ZeZG dd dZdS )aq  A pipeline for data transformations.

Example
-------
>>> from hyperpyyaml import load_hyperpyyaml
>>> yamlstring = '''
... pipeline: !new:speechbrain.utils.data_pipeline.DataPipeline
...     static_data_keys: [a, b]
...     dynamic_items:
...         -   func: !name:operator.add
...             takes: ["a", "b"]
...             provides: foo
...         -   func: !name:operator.sub
...             takes: ["foo", "b"]
...             provides: bar
...     output_keys: ["foo", "bar"]
... '''
>>> hparams = load_hyperpyyaml(yamlstring)
>>> hparams["pipeline"]({"a":1, "b":2})
{'foo': 3, 'bar': 1}

Author:
    * Aku Rouhe
    N)	dataclass)DependencyGraphc                   @   s   e Zd ZU dZeed< dS )
StaticItemzData class that represents a static item.

    Static items are in-memory items so they don't need to be computed
    dynamically.
    keyN)__name__
__module____qualname____doc__str__annotations__ r   r   \/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/speechbrain/utils/data_pipeline.pyr       s   
 r   c                   @   sH   e Zd ZdZg dg fddZdd Zdd Zd	d
 Zdd Zdd Z	dS )DynamicItema#  Essentially represents a data transformation function.

    A DynamicItem takes some arguments and computes its value dynamically when
    called. A straight-forward use-case is to load something from disk
    dynamically; take the path and provide the loaded data.

    Instances of this class are often created implicitly via the
    @takes and @provides decorators or otherwise from specifying the taken and
    provided arguments and the function.

    A counterpart is the GeneratorDynamicItem, which should be used for
    generator functions.

    Arguments
    ---------
    takes : list
        The keys of the items that this needs to compute its output.
    func : callable
        The function that is used to compute the output.
    provides : list
        The keys that this provides.
    Nc                 C   s   || _ || _|| _d S N)takesfuncprovides)selfr   r   r   r   r   r   __init__C   s   
zDynamicItem.__init__c                 G   s
   | j | S r   )r   )r   argsr   r   r   __call__H      
zDynamicItem.__call__c                 C      | j S z1The next argkeys to provide to this, when called.)r   r   r   r   r   
next_takesL      zDynamicItem.next_takesc                 C   r   z.The next keys that this provides, when called.r   r   r   r   r   next_providesQ   r   zDynamicItem.next_providesc                 C   s   | j gS zAssuming that this may need to be called multiple times; which keys
        does it provide at that call. Returns a list, with len equal to the
        number of times that this may be called.
        r   r   r   r   r   provided_in_orderV   s   zDynamicItem.provided_in_orderc                 C   s   dS )[Signals that this will not be called any more times on this pipeline
        call.
        Nr   r   r   r   r   reset^   s   zDynamicItem.reset)
r   r   r   r	   r   r   r   r   r!   r#   r   r   r   r   r   +   s    r   c                       sH   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	  Z
S )GeneratorDynamicItema  Essentially represents a multi-step data transformation.

    This is the generator function counterpart for DynamicItem (which should be
    used for regular functions).

    A GeneratorDynamicItem first takes some arguments and then uses those in
    multiple steps to incrementally compute some values when called.

    A typical use-case is a pipeline of transformations on data: e.g. taking in
    text as a string, and first a tokenized version, and then on the second
    call providing an integer-encoded version. This can be used even though the
    integer-encoder needs to be trained on the first outputs.

    The main benefit is to be able to define the pipeline in a clear function,
    even if parts of the pipeline depend on others for their initialization.

    Arguments
    ---------
    *args : tuple
        Forwarded to parent class
    **kwargs : tuple
        Forwarded to parent class

    Example
    -------
    >>> lab2ind = {}
    >>> def text_pipeline(text):
    ...     text = text.lower().strip()
    ...     text = "".join(c for c in text if c.isalpha() or c == " ")
    ...     words = text.split()
    ...     yield words
    ...     encoded = [lab2ind[word] for word in words]
    ...     yield encoded
    >>> item = GeneratorDynamicItem(
    ...         func=text_pipeline,
    ...         takes=["text"],
    ...         provides=["words", "words_encoded"])
    >>> # First create the integer-encoding:
    >>> ind = 1
    >>> for token in item("Is this it? - This is it."):
    ...     if token not in lab2ind:
    ...         lab2ind[token] = ind
    ...         ind += 1
    >>> # Now the integers can be encoded!
    >>> item()
    [1, 2, 3, 2, 1, 3]
    c                    s"   t  j|i | d | _d| _d S )Nr   )superr   current_generatornum_provided_items)r   r   kwargs	__class__r   r   r      s   
zGeneratorDynamicItem.__init__c                 G   sF   | j t| jkrtd| js| j| | _t| j}|  j d7  _ |S )Nz*DynamicItemPipeline called too many times!   )r'   lenr   RuntimeErrorr&   r   next)r   r   outr   r   r   r      s   
zGeneratorDynamicItem.__call__c                 C   s   | j s| jS g S r   )r&   r   r   r   r   r   r      s   zGeneratorDynamicItem.next_takesc                 C   s    | j | j }t|tr|gS |S r   )r   r'   
isinstancer
   r   keysr   r   r   r      s   
z"GeneratorDynamicItem.next_providesc                 C   s6   g }| j D ]}t|tr||g q|| q|S r    )r   r0   r
   append)r   in_orderr2   r   r   r   r!      s   

z&GeneratorDynamicItem.provided_in_orderc                 C   s$   | j dur
| j   d| _ d| _dS )r"   Nr   )r&   closer'   r   r   r   r   r#      s   


zGeneratorDynamicItem.reset)r   r   r   r	   r   r   r   r   r!   r#   __classcell__r   r   r)   r   r$   f   s    0

r$   c                         fdd}|S )a|  Decorator which makes a DynamicItem and specifies its argkeys.

    If the wrapped object is a generator function (has a yield statement),
    Creates a GeneratorDynamicItem. If the object is already a DynamicItem,
    just specifies the argkeys for that. Otherwise creates a new regular
    DynamicItem, with argkeys specified.

    The args are always passed to the function at the start. Generators could
    support sending new arguments, but for such use cases, simply create a new
    dynamic item. The GeneratorDynamicItem class is meant for pipelines which
    take in an input and transform it in multiple ways, where the intermediate
    representations may be needed for e.g. fitting a BPE segmenter.

    Arguments
    ---------
    *argkeys : tuple
        The data keys expected as input

    Returns
    -------
    The decorated function, with input argkeys specified

    Example
    -------
    >>> @takes("text")
    ... def tokenize(text):
    ...     return text.strip().lower().split()
    >>> tokenize.provides = ["tokenized"]
    >>> tokenize('	This Example gets tokenized')
    ['this', 'example', 'gets', 'tokenized']
    c                    sD   t | tr| jrtd | _| S t| rt | dS t | dS )Decorator definition.z!Can't overwrite DynamicItem.takes)r   r   )r0   r   r   
ValueErrorinspectisgeneratorfunctionr$   objargkeysr   r   	decorator      

ztakes.<locals>.decoratorr   )r?   r@   r   r>   r   r      s   !r   c                     r7   )a  Decorator which makes a DynamicItem and specifies what keys it provides.

    If the wrapped object is a generator function (has a yield statement),
    Creates a GeneratorDynamicItem. If the object is already a DynamicItem,
    just specifies the provided keys for that. Otherwise creates a new regular
    DynamicItem, with provided keys specified.

    Arguments
    ---------
    *output_keys : tuple
        The data keys to be produced by this function

    Returns
    -------
    The decorated function, with output keys specified

    NOTE
    ----
    The behavior is slightly different for generators and regular functions, if
    many output keys are specified, e.g. @provides("signal", "mfcc"). Regular
    functions should return a tuple with len equal to len(output_keys), while
    generators should yield the items one by one.

    >>> @provides("signal", "feat")
    ... def read_feat():
    ...     wav = [.1,.2,-.1]
    ...     feat = [s**2 for s in wav]
    ...     return wav, feat
    >>> @provides("signal", "feat")
    ... def read_feat():
    ...     wav = [.1,.2,-.1]
    ...     yield wav
    ...     feat = [s**2 for s in wav]
    ...     yield feat

    If multiple keys are yielded at once, write e.g.,

    >>> @provides("wav_read", ["left_channel", "right_channel"])
    ... def read_multi_channel():
    ...     wav = [[.1,.2,-.1],[.2,.1,-.1]]
    ...     yield wav
    ...     yield wav[0], wav[1]

    c                    sD   t | tr| jrtd | _| S t| rt|  dS t|  dS )r8   z*Can't overwrite DynamicItem provides-list.)r   r   )r0   r   r   r9   r:   r;   r$   r<   output_keysr   r   r@   3  rA   zprovides.<locals>.decoratorr   )rC   r@   r   rB   r   r     s   .r   c                   @   s   e Zd ZdZg g fddZdd Zdd Zdd	d
Zdd Zdd Z	e
dd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )DataPipelinea  Organises data transformations into a pipeline.

    Arguments
    ---------
    static_data_keys: list
        The keys which are provided as data
    dynamic_items: list
        A list of mappings with "func", "takes", and "provides"
    output_keys: list
        The keys to use as outputs

    Example
    -------
    >>> pipeline = DataPipeline(
    ...     static_data_keys=["text"],
    ...     dynamic_items=[
    ...     {"func": lambda x: x.lower(), "takes": "text", "provides": "foo"},
    ...     {"func": lambda x: x[::-1], "takes": "foo", "provides": "bar"},
    ...     ],
    ...     output_keys=["bar"],
    ... )
    >>> pipeline({"text": "Test"})
    {'bar': 'tset'}
    c                 C   sH   t  | _d | _i | _i | _g | _i | _| | | | | 	| d S r   )
r   dg_exec_orderkey_to_nodeunaccounted_keysdynamic_itemsoutput_mappingadd_static_keysadd_dynamic_itemsset_output_keys)r   static_data_keysrI   rC   r   r   r   r   _  s   

zDataPipeline.__init__c                 C   s,   |D ]}| j jt|dd}|| j|< qdS )zrInforms the pipeline about static items.

        Static items are the ones provided to __call__ as data.
        )r   dataN)rE   add_noder   rG   )r   static_keysr   node_idr   r   r   rK   j  s   zDataPipeline.add_static_keysc              	   C   s>   |D ]}z
| j di | W q ty   |  | Y qw dS )z#Add multiple dynamic items at once.Nr   )add_dynamic_item	TypeError)r   rI   itemr   r   r   rL   s  s   zDataPipeline.add_dynamic_itemsNc                 C   sr   t |tr|dus|durtd| | dS t |tr |g}t |tr(|g}t| t| |}| | dS )a	  Adds a dynamic item to the Pipeline.

        Two calling conventions. For DynamicItem objects, just use:
        add_dynamic_item(dynamic_item)
        But otherwise, should use:
        add_dynamic_item(func, takes, provides)

        Arguments
        ---------
        func : callable, DynamicItem
            If a DynamicItem is given, adds that directly. Otherwise a
            DynamicItem is created, and this specifies the callable to use. If
            a generator function is given, then create a GeneratorDynamicItem.
            Otherwise creates a normal DynamicItem.
        takes : list, str
            List of keys. When func is called, each key is resolved to
            either an entry in the data or the output of another dynamic_item.
            The func is then called with these as positional arguments,
            in the same order as specified here.
            A single key can be given as a bare string.
        provides : str, list
            For regular functions, the key or list of keys that it provides.
            If you give a generator function, key or list of keys that it
            yields, in order. Also see the provides decorator.
            A single key can be given as a bare string.

        Returns
        -------
        None
        NzDIf providing a DynamicItem directly, don't specify takes or provides)r0   r   r9   _add_dynamic_item_objectr
   takes_decoratorprovides_decorator)r   r   r   r   dir   r   r   rT   {  s   



zDataPipeline.add_dynamic_itemc           
      C   s   |j stdg }|jD ]}|| jvr"| j|g }||  q|| j|  q|	 D ]?}| j
j|d}|D ]$}|| j|< || jv r^| j| D ]}| j| }| j
|| qK| j|= q:|D ]	}	| j
||	 qa|g}q/| j| dS )a	  Internally adds the object.

        There is a node in the dependency graph for each call of the
        DynamicItem. Each call may return multiple keys and depend on multiple
        keys. An internal dict maps key to the id of the node that produces it.
        z@Won't add redundant dynamic item which doesn't provide anything.rO   N)r   r9   r   rG   rH   
setdefaultextendr   r3   r!   rE   rQ   add_edgerI   )
r   r=   dependedr   dependee_keysprovidedrS   dependee_keydependee_nodedep_idr   r   r   rW     s0   




z%DataPipeline._add_dynamic_item_objectc                 C   s   |  || _d| _dS )a  Use this to change the output keys.

        Also re-evaluates execution order.
        So if you request different outputs, some parts of the
        data pipeline may be skipped.

        Arguments
        ---------
        keys : dict, list, None
            List of keys (str) to produce in output.

            If a dict is given; it is used to map internal keys to output keys.
            From the output_keys dict key:value pairs the key appears outside,
            and value is the internal key.
        N)_output_keys_to_mappingrJ   rF   r1   r   r   r   rM     s   
zDataPipeline.set_output_keysc                 C   s4   | d u ri }|S t | tr| }|S dd | D }|S )Nc                 S   s   i | ]}||qS r   r   .0r   r   r   r   
<dictcomp>  s    z8DataPipeline._output_keys_to_mapping.<locals>.<dictcomp>)r0   dict)r2   rJ   r   r   r   rd     s   
z$DataPipeline._output_keys_to_mappingc                 C   s&   | j du r
| | | || j | jS )z
        Arguments
        ---------
        data : dict
            Dictionary with data entries by key.

        Returns
        -------
        dict
            With the keys that were set.
        N)rF   _prepare_run_computerJ   r   rP   r   r   r   compute_outputs  s   

zDataPipeline.compute_outputsc                 C   s,   |  |}| jj| |d}| |||S )z>Compute output of specific item, without changing output_keys.)selected_keys)rd   rE   get_evaluation_orderget_selected_node_idsrj   )r   r2   rP   rJ   orderr   r   r   compute_specific  s
   
zDataPipeline.compute_specificc              	      s   | j rd}|d| j 7 }t|i |D ]G\}}}t|tr7z |j  W q ty6   td|j dw  fdd| D }| }	|| }
t	|	dkrT|
g}

t|	|
 q| jD ]}|  q` fdd	| D S )
Nz;These keys are still unaccounted for in the data pipeline: z, zExpected key z	 in data!c                    s$   g | ]}| v r | n| qS r   r   )rf   argkeyrP   intermediater   r   
<listcomp>  s    z)DataPipeline._compute.<locals>.<listcomp>r+   c                    s*   i | ]\}}|| v r | n| qS r   r   )rf   outkeyinkeyrs   r   r   rg   #  s    z)DataPipeline._compute.<locals>.<dictcomp>)rH   joinr-   r0   r   r   KeyErrorr   r   r,   updateziprI   r#   items)r   rP   rp   rJ   MSGrS   edgesrV   r   provided_keysvaluesdynamic_itemr   rs   r   rj     s4   



zDataPipeline._computec                    s    fdd|D S )z2Translates selected keys to dependency graph keys.c                    s   g | ]} j | qS r   )rG   re   r   r   r   ru   *  s    z6DataPipeline.get_selected_node_ids.<locals>.<listcomp>r   )r   rm   r   r   r   ro   (  s   z"DataPipeline.get_selected_node_idsc                 C   s
   |  |S r   )rl   rk   r   r   r   r   ,  r   zDataPipeline.__call__c                 C   s"   t | j| | j | _d S r   )listrE   rn   ro   rJ   r   rF   rk   r   r   r   ri   /  s
   
zDataPipeline._prepare_run)NN)r   r   r   r	   r   rK   rL   rT   rW   rM   staticmethodrd   rl   rq   rj   ro   r   ri   r   r   r   r   rD   E  s     	
/%

#rD   )r	   r:   dataclassesr   speechbrain.utils.depgraphr   r   r   r$   r   rX   r   rY   rD   r   r   r   r   <module>   s    
;l0=