o
    ei2                     @   s   d Z ddlZddlmZ ddlZddlmZ ddlmZ ddlm	Z	 dZ
e	jG dd dejZd	d
 ZdejdedejfddZG dd dejZG dd dejZdS )zThe SpeechBrain implementation of various pre-trained model adapters e.g.
LoRA, Houlsby

Authors
 * Titouan Parcollet 2024
 * Peter Plantinga 2024
    Nfnmatch)Swish)checkpointsz
Torch's native multi-head attention is not adaptable since it accesses layer
weights directly to pass to highly optimized fused kernels. We are excluding
all native Torch MHA layers from the list of layers to adapt.
c                       s   e Zd ZdZddg g i dfdejdejdedededed	ed
ef fddZ	dd Z
dd Zejdd Zejdd Zejdd Z fddZ  ZS )AdaptedModela
  Given any torch model, e.g. asr_brain.modules.Transformer, and an adapter
    class, e.g. HoulsbyAdapter, this class will replace the target layers
    with this new adapter class (while preserving the parameters).

    Arguments
    ---------
    model_to_adapt: nn.Module
        The base PyTorch model to add adapters to.
    adapter_class: class
        An (uninitialized) adapter of this SpeechBrain library.
    all_linear: bool
        Whether to add the adapter to all linear layers (default: False)
    all_conv: bool
        Whether to add the adapter to all conv layers (default: False)
    target_layers: list of str
        A list of module names in the given model that should be replaced.
        Supports Unix shell-style wildcards `(*, ?, [seq], [!seq])` with `fnmatch`.
    unfrozen_layers: list of str
        List of layers to be unfrozen during training.
        Supports Unix shell-style wildcards `(*, ?, [seq], [!seq])` with `fnmatch`.
    adapter_kwargs: dict
        Ensemble of parameters that should be given to the adapter.
    manual_adapter_insertion: bool
        The default value (`False`) leads to the adapters being inserted at
        the time of initialization. However, in some cases, it is preferable
        to wait to insert the adapters, e.g. when pretrained parameters need to
        be loaded. In this case, one can set this to `True` and call
        `insert_adapters` manually after the parameters have been loaded.

    Example
    -------
    >>> from collections import OrderedDict
    >>> model = torch.nn.Sequential(
    ...   OrderedDict([
    ...     ("layer1", torch.nn.Linear(10, 20)),
    ...     ("layer2", torch.nn.Linear(20, 20)),
    ...     ("layer3", torch.nn.Linear(20, 10)),
    ...   ])
    ... )
    >>> lora_model = AdaptedModel(
    ...   model_to_adapt=model,
    ...   adapter_class=LoRA,
    ...   target_layers=["layer[13]"],
    ...   unfrozen_layers=["layer2"],
    ...   adapter_kwargs={"rank": 2},
    ... )
    >>> lora_model
    AdaptedModel(
      (adapted_model): Sequential(
        (layer1): LoRA(
          (pretrained_module): Linear(in_features=10, out_features=20, bias=True)
          (adapter_down_proj): Linear(in_features=10, out_features=2, bias=False)
          (adapter_up_proj): Linear(in_features=2, out_features=20, bias=False)
        )
        (layer2): Linear(in_features=20, out_features=20, bias=True)
        (layer3): LoRA(
          (pretrained_module): Linear(in_features=20, out_features=10, bias=True)
          (adapter_down_proj): Linear(in_features=20, out_features=2, bias=False)
          (adapter_up_proj): Linear(in_features=2, out_features=10, bias=False)
        )
      )
    )
    Fmodel_to_adaptadapter_class
all_linearall_convtarget_layersunfrozen_layersadapter_kwargsmanual_adapter_insertionc	                    s   t    || _|| _|| _| D ]}	d|	_qg | _| D ]F\ }
t	 |
|||rPd
 dd d }||}t|tjjrItt q| j  qt fdd|D re|
 D ]}	d|	_q_q|sn|   d S d S )NF.c                 3       | ]}t  |V  qd S Nr   .0layername W/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/speechbrain/nnet/adapters.py	<genexpr>}       z(AdaptedModel.__init__.<locals>.<genexpr>T)super__init__adapted_modelr   r   
parametersrequires_gradreplace_layersnamed_modulesis_layer_adaptablejoinsplitget_submodule
isinstancetorchnnMultiheadAttentionwarningswarnMHA_WARNINGappendanyinsert_adapters)selfr   r   r	   r
   r   r   r   r   parammoduleparent_nameparent	__class__r   r   r   [   s.   


zAdaptedModel.__init__c                 C   s>   | j D ]}| j|}| j|fi | j}t| j|| qdS )zIf this is in `__init__` it conflicts with `Pretrainer`.
        Ensure this function is called exactly once before training.
        See ``__init__.manual_adapter_insertion``
        N)r!   r   r&   r   r   replace_module)r1   r   r3   
new_moduler   r   r   r0      s
   
zAdaptedModel.insert_adaptersc                 O   s   | j |i |S )z Pass arguments to adapted model.)r   )r1   argskwargsr   r   r   forward   s   zAdaptedModel.forwardc                 C   s*   dd | j dd D }t|| dS )z$Saves only the trainable parameters.c                 S   s    i | ]\}}|j r|| qS r   )r    detach)r   r   r2   r   r   r   
<dictcomp>   s    z&AdaptedModel.saver.<locals>.<dictcomp>T)	keep_varsN)
state_dictitemsr(   save)r1   pathr@   r   r   r   saver   s   zAdaptedModel.saverc                 C   s"   ~t j|dd}| j|dd dS )z)Loads the base model plus trained params.cpu)map_locationF)strictN)r(   loadload_state_dict)r1   rC   end_of_epochr@   r   r   r   loader   s   zAdaptedModel.loaderc                 C   s   |  |d dS )z3Avoids warnings due to only loading trained params.TN)rK   )r1   rC   r   r   r   parameter_transfer   s   zAdaptedModel.parameter_transferc                    s,   t  d}t||rt||S t  |S )z<Override getattr to pass item accesses to pre-adapted model.r   )r   __getattr__hasattrgetattr)r1   itemmodelr6   r   r   rM      s   

zAdaptedModel.__getattr__)__name__
__module____qualname____doc__r)   Moduleboollistdictr   r0   r<   r   mark_as_saverrD   mark_as_loaderrK   mark_as_transferrL   rM   __classcell__r   r   r6   r   r      sD    D	*



r   c                    sD   |rt |tjp!|ot |tjtjtjfp! o!t fdd|D S )a  Check if layer is among list of layers to be adapted.

    Arguments
    ---------
    name: str
        The name of the module to check.
    module: torch.nn.Module
        The module to check.
    all_linear: bool
        Whether all linear layers should be adapted.
    all_conv: bool
        Whether all conv layers should be adapted.
    target_layers: str or list of str
        See `add_adapters_to_model`

    Returns
    -------
    bool
        Whether the layer is to be adapted or not.
    c                 3   r   r   r   r   r   r   r   r      r   z%is_layer_adaptable.<locals>.<genexpr>)r'   r)   LinearConv1dConv2dConv3dr/   )r   r3   r	   r
   r   r   r   r   r#      s   
r#   rQ   r   r9   c                 C   sJ   z| dd\}}| |}W n ty   | }|}Y nw t||| dS )a  Replace layer with a new module based on a parent assignation.
    This is used to replace layers with an Adapter layer wrapped around
    the original layer. Hence, old parameters are preserved and new ones are
    added.

    Arguments
    ---------
    model: nn.Module
        Model containing the module to be replaced.
    name: str
        Name of the target module to replace.
    new_module: nn.Module
        New module made of the old plus the new parameters.
    r      N)rsplitr&   
ValueErrorsetattr)rQ   r   r9   r4   target_nameparent_moduler   r   r   r8      s   r8   c                       s6   e Zd ZdZedf fdd	ZdejfddZ  Z	S )HoulsbyAdapterLineara  This class implements the Houlsby Adapter as described in:
    'Parameter-Efficient Transfer Learning for NLP'
    https://arxiv.org/abs/1902.00751

    Arguments
    ---------
    target_linear: nn.Module
        Module corresponding to the pretrained Linear that will be wrapped with
        this adapter.
    projection_size: int
        Size of the projection layer (usually smaller).
    activation: nn.Module
        The activation function. Default is Swish.
    bias: bool
        Whether to use biases in the linear projections.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 64))
    >>> base_linear = nn.Linear(64, 64)
    >>> adapt = HoulsbyAdapterLinear(base_linear, 8)
    >>> output = adapt(x)
    >>> output.shape
    torch.Size([8, 60, 64])
    Tc                    s   t    t|tjstdt| d|jjj	d }|jj
}|| _d| j_tj||||d| _tj||||d| _| | _|rS| jjjd | jjjd d S d S )NzEHoulsbyLinear currently only supports linear layers, but instead got r   r   Fbiasdevice        )r   r   r'   r)   r^   rd   typeweightdatashaperk   pretrained_linearr    adapter_down_projadapter_up_proj
activationrj   fill_)r1   target_linearprojection_sizert   rj   output_sizerk   r6   r   r   r     s,   
zHoulsbyAdapterLinear.__init__xc                 C   s$   |  |}| | | || S )zApplies the HoulsbyAdapter to an input tensor `x`.

        Arguments
        ---------
        x: torch.Tensor
            Input tensor to the adapter module. Shape: [B, Time, X]

        Returns
        -------
        The linear outputs
        )rq   rs   rt   rr   )r1   ry   x_pretrainedr   r   r   r<   -  s   
zHoulsbyAdapterLinear.forward)
rR   rS   rT   rU   r   r   r(   Tensorr<   r]   r   r   r6   r   rh      s     rh   c                       s2   e Zd ZdZd	 fdd	ZdejfddZ  ZS )
LoRAa  This class implements the LoRA Adapter as described in:
    'LoRA: Low-Rank Adaptation of Large Language Models'
    https://arxiv.org/abs/2106.09685

    Arguments
    ---------
    target_module: nn.Module
        Module corresponding to the pretrained layer that will be wrapped with
        this adapter. Works with nn.Linear and nn.Conv
    rank: int
        Size of the projection layer or rank (usually smaller).
    alpha : float
        Value used to control the scaling in LoRA. Default is one.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 64))
    >>> base_linear = nn.Linear(64, 64)
    >>> adapt = LoRA(base_linear, 64, 4)
    >>> output = adapt(x)
    >>> output.shape
    torch.Size([8, 60, 64])
             ?c                    s   t    |jjjd }|jjjd }|| _| j D ]}d|_q|jj}t	j
||d|d| _t	j
||d|d| _| jjjd || | _d S )Nrb   r   Fri   rl   )r   r   rn   ro   rp   pretrained_moduler   r    rk   r)   r^   rr   rs   ru   scaling)r1   target_modulerankalpha
input_sizerx   r2   rk   r6   r   r   r   ^  s   
zLoRA.__init__ry   c                 C   s(   |  |}| | || j }|| S )zApplies the LoRA Adapter.

        Arguments
        ---------
        x: torch.Tensor
            Input tensor to the adapter module.

        Returns
        -------
        The linear outputs
        )r   rs   rr   r   )r1   ry   rz   x_lorar   r   r   r<   t  s   
zLoRA.forward)r}   r~   )	rR   rS   rT   rU   r   r(   r{   r<   r]   r   r   r6   r   r|   D  s    r|   )rU   r+   r   r(   torch.nnr)   speechbrain.nnet.activationsr   speechbrain.utilsr   r-   register_checkpoint_hooksrV   r   r#   strr8   rh   r|   r   r   r   r   <module>   s     S