# Copyright 2017 Johns Hopkins University (Shinji Watanabe)
#  Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)

"""Training/decoding definition for the speech recognition task."""

import copy
import itertools
import json
import logging
import math
import os

import numpy as np
import torch
import torch.distributed as dist
from chainer import reporter as reporter_module
from chainer import training
from chainer.training import extensions
from chainer.training.updater import StandardUpdater
from packaging.version import parse as V
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.nn.parallel import data_parallel
from torch.utils.data.distributed import DistributedSampler

import espnet.lm.pytorch_backend.extlm as extlm_pytorch
import espnet.nets.pytorch_backend.lm.default as lm_pytorch
from espnet.asr.asr_utils import (
    CompareValueTrigger,
    adadelta_eps_decay,
    add_results_to_json,
    format_mulenc_args,
    get_model_conf,
    plot_spectrogram,
    restore_snapshot,
    snapshot_object,
    torch_load,
    torch_resume,
    torch_snapshot,
)
from espnet.asr.pytorch_backend.asr_init import (
    freeze_modules,
    load_trained_model,
    load_trained_modules,
)
from espnet.nets.asr_interface import ASRInterface
from espnet.nets.beam_search_transducer import BeamSearchTransducer
from espnet.nets.pytorch_backend.e2e_asr import pad_list
from espnet.nets.pytorch_backend.streaming.segment import SegmentStreamingE2E
from espnet.nets.pytorch_backend.streaming.window import WindowStreamingE2E
from espnet.transform.spectrogram import IStft
from espnet.transform.transformation import Transformation
from espnet.utils.cli_writers import file_writer_helper
from espnet.utils.dataset import ChainerDataLoader, Transform, TransformDataset
from espnet.utils.deterministic_utils import set_deterministic_pytorch
from espnet.utils.dynamic_import import dynamic_import
from espnet.utils.io_utils import LoadInputsAndTargets
from espnet.utils.training.batchfy import make_batchset
from espnet.utils.training.evaluator import BaseEvaluator
from espnet.utils.training.iterators import ShufflingEnabler
from espnet.utils.training.tensorboard_logger import TensorboardLogger
from espnet.utils.training.train_utils import check_early_stop, set_early_stop


def _recursive_to(xs, device):
    if torch.is_tensor(xs):
        return xs.to(device)
    if isinstance(xs, tuple):
        return tuple(_recursive_to(x, device) for x in xs)
    return xs


class DistributedDictSummary:
    """Distributed version of DictSummary.

    This implementation is based on an official implementation below.
    https://github.com/chainer/chainer/blob/v6.7.0/chainer/reporter.py

    To gather stats information from all processes and calculate exact mean values,
    this class is running AllReduce operation in compute_mean().
    """

    def __init__(self, device=None):
        self._local_summary = reporter_module.DictSummary()
        self._summary_names = None
        self._device = device

    def add(self, d):
        if self._summary_names is None:
            # This assumes that `d` always includes the same name list,
            # and the name list is identical accross all processes.
            self._summary_names = frozenset(d.keys())
        return self._local_summary.add(d)

    def compute_mean(self):
        # Even if `self._local_summary` doesn't have a few keys
        # due to invalid observations like NaN, zero, etc,
        # `raw_values` can properly these entries
        # thanks to zero as an initial value.
        raw_values = {name: [0.0, 0] for name in self._summary_names}
        for name, summary in self._local_summary._summaries.items():
            raw_values[name][0] += summary._x
            raw_values[name][1] += summary._n

        sum_list = []
        count_list = []
        for name in sorted(self._summary_names):
            sum_list.append(raw_values[name][0])
            count_list.append(raw_values[name][1])
        sum_tensor = torch.tensor(sum_list, device=self._device)
        count_tensor = torch.tensor(count_list, device=self._device)

        # AllReduce both of sum and count in parallel.
        sum_handle = dist.all_reduce(sum_tensor, async_op=True)
        count_handle = dist.all_reduce(count_tensor, async_op=True)
        sum_handle.wait()
        count_handle.wait()

        # Once both ops are enqueued, putting an op to calculate actual average value.
        mean_tensor = sum_tensor / count_tensor
        result_dict = {}
        for idx, name in enumerate(sorted(self._summary_names)):
            if name not in self._local_summary._summaries:
                # If an entry with a target name doesn't exist in `self._local_summary`,
                # this entry must be removed from `result_dict`.
                # This behavior is the same with original DictSummary.
                continue
            result_dict[name] = mean_tensor[idx].item()
        return result_dict


class CustomEvaluator(BaseEvaluator):
    """Custom Evaluator for Pytorch.

    Args:
        model (torch.nn.Module): The model to evaluate.
        iterator (chainer.dataset.Iterator) : The train iterator.

        target (link | dict[str, link]) :Link object or a dictionary of
            links to evaluate. If this is just a link object, the link is
            registered by the name ``'main'``.

        device (torch.device): The device used.
        ngpu (int): The number of GPUs.
        use_ddp (bool): The flag to use DDP.

    """

    def __init__(self, model, iterator, target, device, ngpu=None, use_ddp=False):
        super(CustomEvaluator, self).__init__(iterator, target)
        self.model = model
        self.device = device
        if ngpu is not None:
            self.ngpu = ngpu
        elif device.type == "cpu":
            self.ngpu = 0
        else:
            self.ngpu = 1
        self.use_ddp = use_ddp

    # The core part of the update routine can be customized by overriding
    def evaluate(self):
        """Main evaluate routine for CustomEvaluator."""
        iterator = self._iterators["main"]

        if self.eval_hook:
            self.eval_hook(self)

        if hasattr(iterator, "reset"):
            iterator.reset()
            it = iterator
        else:
            it = copy.copy(iterator)

        if self.use_ddp:
            summary = DistributedDictSummary(self.device)
        else:
            summary = reporter_module.DictSummary()

        self.model.eval()
        with torch.no_grad():
            for batch in it:
                x = _recursive_to(batch, self.device)
                observation = {}
                with reporter_module.report_scope(observation):
                    # read scp files
                    # x: original json with loaded features
                    #    will be converted to chainer variable later
                    if self.ngpu == 0 or self.use_ddp:
                        self.model(*x)
                    else:
                        # apex does not support torch.nn.DataParallel
                        data_parallel(self.model, x, range(self.ngpu))

                summary.add(observation)
        self.model.train()

        return summary.compute_mean()


class CustomUpdater(StandardUpdater):
    """Custom Updater for Pytorch.

    Args:
        model (torch.nn.Module): The model to update.
        grad_clip_threshold (float): The gradient clipping value to use.
        train_iter (chainer.dataset.Iterator): The training iterator.
        optimizer (torch.optim.optimizer): The training optimizer.

        device (torch.device): The device to use.
        ngpu (int): The number of gpus to use.
        use_apex (bool): The flag to use Apex in backprop.
        use_ddp (bool): The flag to use DDP for multi-GPU training.

    """

    def __init__(
        self,
        model,
        grad_clip_threshold,
        train_iter,
        optimizer,
        device,
        ngpu,
        grad_noise=False,
        accum_grad=1,
        use_apex=False,
        use_ddp=False,
    ):
        super(CustomUpdater, self).__init__(train_iter, optimizer)
        self.model = model
        self.grad_clip_threshold = grad_clip_threshold
        self.device = device
        self.ngpu = ngpu
        self.accum_grad = accum_grad
        self.forward_count = 0
        self.grad_noise = grad_noise
        self.iteration = 0
        self.use_apex = use_apex
        self.use_ddp = use_ddp

    # The core part of the update routine can be customized by overriding.
    def update_core(self):
        """Main update routine of the CustomUpdater."""
        # When we pass one iterator and optimizer to StandardUpdater.__init__,
        # they are automatically named 'main'.
        train_iter = self.get_iterator("main")
        optimizer = self.get_optimizer("main")
        epoch = train_iter.epoch

        # Get the next batch (a list of json files)
        batch = train_iter.next()
        # self.iteration += 1 # Increase may result in early report,
        # which is done in other place automatically.
        x = _recursive_to(batch, self.device)
        is_new_epoch = train_iter.epoch != epoch
        # When the last minibatch in the current epoch is given,
        # gradient accumulation is turned off in order to evaluate the model
        # on the validation set in every epoch.
        # see details in https://github.com/espnet/espnet/pull/1388

        # Compute the loss at this time step and accumulate it
        if self.ngpu == 0 or self.use_ddp:
            loss = self.model(*x).mean() / self.accum_grad
        else:
            # apex does not support torch.nn.DataParallel
            loss = (
                data_parallel(self.model, x, range(self.ngpu)).mean() / self.accum_grad
            )
        if self.use_apex:
            from apex import amp

            # NOTE: for a compatibility with noam optimizer
            opt = optimizer.optimizer if hasattr(optimizer, "optimizer") else optimizer
            with amp.scale_loss(loss, opt) as scaled_loss:
                scaled_loss.backward()
        else:
            loss.backward()
        # gradient noise injection
        if self.grad_noise:
            from espnet.asr.asr_utils import add_gradient_noise

            add_gradient_noise(
                self.model, self.iteration, duration=100, eta=1.0, scale_factor=0.55
            )

        # update parameters
        self.forward_count += 1
        if not is_new_epoch and self.forward_count != self.accum_grad:
            return
        self.forward_count = 0
        # compute the gradient norm to check if it is normal or not
        grad_norm = torch.nn.utils.clip_grad_norm_(
            self.model.parameters(), self.grad_clip_threshold
        )
        if self.use_ddp:
            # NOTE: assuming gradients have not been reduced yet here.
            # Try to gather the norm of gradients from all workers,
            # and calculate average grad norm.
            dist.all_reduce(grad_norm)
        logging.info("grad norm={}".format(grad_norm))
        if math.isnan(grad_norm):
            logging.warning("grad norm is nan. Do not update model.")
        else:
            optimizer.step()
        optimizer.zero_grad()

    def update(self):
        self.update_core()
        # #iterations with accum_grad > 1
        # Ref.: https://github.com/espnet/espnet/issues/777
        if self.forward_count == 0:
            self.iteration += 1


class CustomConverter(object):
    """Custom batch converter for Pytorch.

    Args:
        subsampling_factor (int): The subsampling factor.
        dtype (torch.dtype): Data type to convert.

    """

    def __init__(self, subsampling_factor=1, dtype=torch.float32):
        """Construct a CustomConverter object."""
        self.subsampling_factor = subsampling_factor
        self.ignore_id = -1
        self.dtype = dtype

    def __call__(self, batch, device=torch.device("cpu")):
        """Transform a batch and send it to a device.

        Args:
            batch (list): The batch to transform.
            device (torch.device): The device to send to.

        Returns:
            tuple(torch.Tensor, torch.Tensor, torch.Tensor)

        """
        # batch should be located in list
        assert len(batch) == 1
        xs, ys = batch[0]

        # perform subsampling
        if self.subsampling_factor > 1:
            xs = [x[:: self.subsampling_factor, :] for x in xs]

        # get batch of lengths of input sequences
        ilens = np.array([x.shape[0] for x in xs])

        # perform padding and convert to tensor
        # currently only support real number
        if xs[0].dtype.kind == "c":
            xs_pad_real = pad_list(
                [torch.from_numpy(x.real).float() for x in xs], 0
            ).to(device, dtype=self.dtype)
            xs_pad_imag = pad_list(
                [torch.from_numpy(x.imag).float() for x in xs], 0
            ).to(device, dtype=self.dtype)
            # Note(kamo):
            # {'real': ..., 'imag': ...} will be changed to ComplexTensor in E2E.
            # Don't create ComplexTensor and give it E2E here
            # because torch.nn.DataParellel can't handle it.
            xs_pad = {"real": xs_pad_real, "imag": xs_pad_imag}
        else:
            xs_pad = pad_list([torch.from_numpy(x).float() for x in xs], 0).to(
                device, dtype=self.dtype
            )

        ilens = torch.from_numpy(ilens).to(device)
        # NOTE: this is for multi-output (e.g., speech translation)
        ys_pad = pad_list(
            [
                torch.from_numpy(
                    np.array(y[0][:]) if isinstance(y, tuple) else y
                ).long()
                for y in ys
            ],
            self.ignore_id,
        ).to(device)

        return xs_pad, ilens, ys_pad


class CustomConverterMulEnc(object):
    """Custom batch converter for Pytorch in multi-encoder case.

    Args:
        subsampling_factors (list): List of subsampling factors for each encoder.
        dtype (torch.dtype): Data type to convert.

    """

    def __init__(self, subsampling_factors=[1, 1], dtype=torch.float32):
        """Initialize the converter."""
        self.subsampling_factors = subsampling_factors
        self.ignore_id = -1
        self.dtype = dtype
        self.num_encs = len(subsampling_factors)

    def __call__(self, batch, device=torch.device("cpu")):
        """Transform a batch and send it to a device.

        Args:
            batch (list): The batch to transform.
            device (torch.device): The device to send to.

        Returns:
            tuple( list(torch.Tensor), list(torch.Tensor), torch.Tensor)

        """
        # batch should be located in list
        assert len(batch) == 1
        xs_list = batch[0][: self.num_encs]
        ys = batch[0][-1]

        # perform subsampling
        if np.sum(self.subsampling_factors) > self.num_encs:
            xs_list = [
                [x[:: self.subsampling_factors[i], :] for x in xs_list[i]]
                for i in range(self.num_encs)
            ]

        # get batch of lengths of input sequences
        ilens_list = [
            np.array([x.shape[0] for x in xs_list[i]]) for i in range(self.num_encs)
        ]

        # perform padding and convert to tensor
        # currently only support real number
        xs_list_pad = [
            pad_list([torch.from_numpy(x).float() for x in xs_list[i]], 0).to(
                device, dtype=self.dtype
            )
            for i in range(self.num_encs)
        ]

        ilens_list = [
            torch.from_numpy(ilens_list[i]).to(device) for i in range(self.num_encs)
        ]
        # NOTE: this is for multi-task learning (e.g., speech translation)
        ys_pad = pad_list(
            [
                torch.from_numpy(np.array(y[0]) if isinstance(y, tuple) else y).long()
                for y in ys
            ],
            self.ignore_id,
        ).to(device)

        return xs_list_pad, ilens_list, ys_pad


def is_writable_process(args, worldsize, rank, localrank):
    return not args.use_ddp or rank == 0


def train(args):
    """Train with the given args.

    Args:
        args (namespace): The program arguments.

    """
    if args.use_ddp:
        # initialize distributed environment.
        # NOTE: current implementation supports
        # only single-node training.

        # get process information.
        worldsize = os.environ.get("WORLD_SIZE", None)
        assert worldsize is not None
        worldsize = int(worldsize)
        assert worldsize == args.ngpu

        rank = os.environ.get("RANK", None)
        assert rank is not None
        rank = int(rank)

        localrank = os.environ.get("LOCAL_RANK", None)
        assert localrank is not None
        localrank = int(localrank)

        dist.init_process_group(
            backend="nccl",
            init_method="env://",
            rank=rank,
            world_size=worldsize,
        )

        if rank != 0:
            # Disable all logs in non-master process.
            logging.disable()
    else:
        worldsize = 1
        rank = 0
        localrank = 0

    set_deterministic_pytorch(args)
    if args.num_encs > 1:
        args = format_mulenc_args(args)

    # check cuda availability
    if not torch.cuda.is_available():
        logging.warning("cuda is not available")

    # get input and output dimension info
    with open(args.valid_json, "rb") as f:
        valid_json = json.load(f)["utts"]
    utts = list(valid_json.keys())
    idim_list = [
        int(valid_json[utts[0]]["input"][i]["shape"][-1]) for i in range(args.num_encs)
    ]
    odim = int(valid_json[utts[0]]["output"][0]["shape"][-1])
    for i in range(args.num_encs):
        logging.info("stream{}: input dims : {}".format(i + 1, idim_list[i]))
    logging.info("#output dims: " + str(odim))

    # specify attention, CTC, hybrid mode
    if "transducer" in args.model_module:
        if (
            getattr(args, "etype", False) == "custom"
            or getattr(args, "dtype", False) == "custom"
        ):
            mtl_mode = "custom_transducer"
        else:
            mtl_mode = "transducer"
        logging.info("Pure transducer mode")
    elif args.mtlalpha == 1.0:
        mtl_mode = "ctc"
        logging.info("Pure CTC mode")
    elif args.mtlalpha == 0.0:
        mtl_mode = "att"
        logging.info("Pure attention mode")
    else:
        mtl_mode = "mtl"
        logging.info("Multitask learning mode")

    if (args.enc_init is not None or args.dec_init is not None) and args.num_encs == 1:
        model = load_trained_modules(idim_list[0], odim, args)
    else:
        model_class = dynamic_import(args.model_module)
        model = model_class(
            idim_list[0] if args.num_encs == 1 else idim_list, odim, args
        )
    assert isinstance(model, ASRInterface)
    total_subsampling_factor = model.get_total_subsampling_factor()

    logging.info(
        " Total parameter of the model = "
        + str(sum(p.numel() for p in model.parameters()))
    )

    if args.rnnlm is not None:
        rnnlm_args = get_model_conf(args.rnnlm, args.rnnlm_conf)
        rnnlm = lm_pytorch.ClassifierWithState(
            lm_pytorch.RNNLM(len(args.char_list), rnnlm_args.layer, rnnlm_args.unit)
        )
        torch_load(args.rnnlm, rnnlm)
        model.rnnlm = rnnlm

    if is_writable_process(args, worldsize, rank, localrank):
        # write model config
        if not os.path.exists(args.outdir):
            os.makedirs(args.outdir)
        model_conf = args.outdir + "/model.json"
        with open(model_conf, "wb") as f:
            logging.info("writing a model config file to " + model_conf)
            f.write(
                json.dumps(
                    (
                        idim_list[0] if args.num_encs == 1 else idim_list,
                        odim,
                        vars(args),
                    ),
                    indent=4,
                    ensure_ascii=False,
                    sort_keys=True,
                ).encode("utf_8")
            )
    for key in sorted(vars(args).keys()):
        logging.info("ARGS: " + key + ": " + str(vars(args)[key]))

    reporter = model.reporter

    if args.use_ddp:
        if args.num_encs > 1:
            # TODO(ruizhili): implement data parallel for multi-encoder setup.
            raise NotImplementedError(
                "Data parallel is not supported for multi-encoder setup."
            )
    else:
        # check the use of multi-gpu
        if args.ngpu > 1:
            if args.batch_size != 0:
                logging.warning(
                    "batch size is automatically increased (%d -> %d)"
                    % (args.batch_size, args.batch_size * args.ngpu)
                )
                args.batch_size *= args.ngpu
            if args.num_encs > 1:
                # TODO(ruizhili): implement data parallel for multi-encoder setup.
                raise NotImplementedError(
                    "Data parallel is not supported for multi-encoder setup."
                )

    # set torch device
    if args.use_ddp:
        device = torch.device(f"cuda:{localrank}")
    else:
        device = torch.device("cuda" if args.ngpu > 0 else "cpu")
    if args.train_dtype in ("float16", "float32", "float64"):
        dtype = getattr(torch, args.train_dtype)
    else:
        dtype = torch.float32
    model = model.to(device=device, dtype=dtype)

    if args.freeze_mods:
        model, model_params = freeze_modules(model, args.freeze_mods)
    else:
        model_params = model.parameters()

    logging.warning(
        "num. model params: {:,} (num. trained: {:,} ({:.1f}%))".format(
            sum(p.numel() for p in model.parameters()),
            sum(p.numel() for p in model.parameters() if p.requires_grad),
            sum(p.numel() for p in model.parameters() if p.requires_grad)
            * 100.0
            / sum(p.numel() for p in model.parameters()),
        )
    )

    # Setup an optimizer
    if args.opt == "adadelta":
        optimizer = torch.optim.Adadelta(
            model_params, rho=0.95, eps=args.eps, weight_decay=args.weight_decay
        )
    elif args.opt == "adam":
        optimizer = torch.optim.Adam(model_params, weight_decay=args.weight_decay)
    elif args.opt == "noam":
        from espnet.nets.pytorch_backend.transformer.optimizer import get_std_opt

        if "transducer" in mtl_mode:
            if args.noam_adim > 0:
                optimizer = get_std_opt(
                    model_params,
                    args.noam_adim,
                    args.optimizer_warmup_steps,
                    args.noam_lr,
                )
            else:
                raise ValueError("noam-adim option should be set to use Noam scheduler")
        else:
            optimizer = get_std_opt(
                model_params,
                args.adim,
                args.transformer_warmup_steps,
                args.transformer_lr,
            )
    else:
        raise NotImplementedError("unknown optimizer: " + args.opt)

    # setup apex.amp
    if args.train_dtype in ("O0", "O1", "O2", "O3"):
        try:
            from apex import amp
        except ImportError as e:
            logging.error(
                f"You need to install apex for --train-dtype {args.train_dtype}. "
                "See https://github.com/NVIDIA/apex#linux"
            )
            raise e
        if args.opt == "noam":
            model, optimizer.optimizer = amp.initialize(
                model, optimizer.optimizer, opt_level=args.train_dtype
            )
        else:
            model, optimizer = amp.initialize(
                model, optimizer, opt_level=args.train_dtype
            )
        use_apex = True

        from espnet.nets.pytorch_backend.ctc import CTC

        amp.register_float_function(CTC, "loss_fn")
        amp.init()
        logging.warning("register ctc as float function")
    else:
        use_apex = False

    # FIXME: TOO DIRTY HACK
    setattr(optimizer, "target", reporter)
    setattr(optimizer, "serialize", lambda s: reporter.serialize(s))

    # Setup a converter
    if args.num_encs == 1:
        converter = CustomConverter(subsampling_factor=model.subsample[0], dtype=dtype)
    else:
        converter = CustomConverterMulEnc(
            [i[0] for i in model.subsample_list], dtype=dtype
        )

    # read json data
    with open(args.train_json, "rb") as f:
        train_json = json.load(f)["utts"]
    with open(args.valid_json, "rb") as f:
        valid_json = json.load(f)["utts"]

    use_sortagrad = args.sortagrad == -1 or args.sortagrad > 0
    # make minibatch list (variable length)
    if args.use_ddp:
        # When using DDP, minimum batch size for each process is 1.
        min_batch_size = 1
    else:
        min_batch_size = args.ngpu if args.ngpu > 1 else 1
    train = make_batchset(
        train_json,
        args.batch_size,
        args.maxlen_in,
        args.maxlen_out,
        args.minibatches,
        min_batch_size=min_batch_size,
        shortest_first=use_sortagrad,
        count=args.batch_count,
        batch_bins=args.batch_bins,
        batch_frames_in=args.batch_frames_in,
        batch_frames_out=args.batch_frames_out,
        batch_frames_inout=args.batch_frames_inout,
        iaxis=0,
        oaxis=0,
    )
    valid = make_batchset(
        valid_json,
        args.batch_size,
        args.maxlen_in,
        args.maxlen_out,
        args.minibatches,
        min_batch_size=min_batch_size,
        count=args.batch_count,
        batch_bins=args.batch_bins,
        batch_frames_in=args.batch_frames_in,
        batch_frames_out=args.batch_frames_out,
        batch_frames_inout=args.batch_frames_inout,
        iaxis=0,
        oaxis=0,
    )

    load_tr = LoadInputsAndTargets(
        mode="asr",
        load_output=True,
        preprocess_conf=args.preprocess_conf,
        preprocess_args={"train": True},  # Switch the mode of preprocessing
    )
    load_cv = LoadInputsAndTargets(
        mode="asr",
        load_output=True,
        preprocess_conf=args.preprocess_conf,
        preprocess_args={"train": False},  # Switch the mode of preprocessing
    )
    # hack to make batchsize argument as 1
    # actual bathsize is included in a list
    # default collate function converts numpy array to pytorch tensor
    # we used an empty collate function instead which returns list
    train_ds = TransformDataset(train, Transform(converter, load_tr))
    val_ds = TransformDataset(valid, Transform(converter, load_cv))
    train_sampler = None
    val_sampler = None
    shuffle = not use_sortagrad
    if args.use_ddp:
        train_sampler = DistributedSampler(train_ds)
        val_sampler = DistributedSampler(val_ds)
        shuffle = False

    train_iter = ChainerDataLoader(
        dataset=train_ds,
        batch_size=1,
        num_workers=args.n_iter_processes,
        shuffle=shuffle,
        sampler=train_sampler,
        collate_fn=ChainerDataLoader.get_first_element,
    )
    valid_iter = ChainerDataLoader(
        dataset=val_ds,
        batch_size=1,
        shuffle=False,
        sampler=val_sampler,
        collate_fn=ChainerDataLoader.get_first_element,
        num_workers=args.n_iter_processes,
    )

    # Set up a trainer
    if args.use_ddp:
        model = DDP(model, device_ids=[localrank])
    updater = CustomUpdater(
        model,
        args.grad_clip,
        {"main": train_iter},
        optimizer,
        device,
        args.ngpu,
        args.grad_noise,
        args.accum_grad,
        use_apex=use_apex,
        use_ddp=args.use_ddp,
    )
    trainer = training.Trainer(updater, (args.epochs, "epoch"), out=args.outdir)

    # call DistributedSampler.set_epoch at begining of each epoch.
    if args.use_ddp:

        @training.make_extension(trigger=(1, "epoch"))
        def set_epoch_to_distributed_sampler(trainer):
            # NOTE: at the first time when this fuction is called,
            # `sampler.epoch` should be 0, and a given trainer object
            # has 1 as a `trainer.updater.epoch`.
            # This means that, in the first epoch,
            # dataset is shuffled with random seed and a value 0,
            # and, in the second epoch, dataset is shuffled
            # with the same random seed and a value 1.
            #
            # See a link below for more details.
            # https://pytorch.org/docs/stable/data.html#torch.utils.data.distributed.DistributedSampler
            train_sampler.set_epoch(trainer.updater.epoch)
            val_sampler.set_epoch(trainer.updater.epoch)

        trainer.extend(set_epoch_to_distributed_sampler)

    if use_sortagrad:
        trainer.extend(
            ShufflingEnabler([train_iter]),
            trigger=(args.sortagrad if args.sortagrad != -1 else args.epochs, "epoch"),
        )

    # Resume from a snapshot
    if args.resume:
        logging.info("resumed from %s" % args.resume)
        torch_resume(args.resume, trainer)

    # Evaluate the model with the test dataset for each epoch
    if args.save_interval_iters > 0:
        trainer.extend(
            CustomEvaluator(
                model, {"main": valid_iter}, reporter, device, args.ngpu, args.use_ddp
            ),
            trigger=(args.save_interval_iters, "iteration"),
        )
    else:
        trainer.extend(
            CustomEvaluator(
                model, {"main": valid_iter}, reporter, device, args.ngpu, args.use_ddp
            )
        )

    if is_writable_process(args, worldsize, rank, localrank):
        # Save attention weight each epoch
        is_attn_plot = (
            "transformer" in args.model_module
            or "conformer" in args.model_module
            or mtl_mode in ["att", "mtl", "custom_transducer"]
        )

        if args.num_save_attention > 0 and is_attn_plot:
            data = sorted(
                list(valid_json.items())[: args.num_save_attention],
                key=lambda x: int(x[1]["input"][0]["shape"][1]),
                reverse=True,
            )
            if hasattr(model, "module"):
                att_vis_fn = model.module.calculate_all_attentions
                plot_class = model.module.attention_plot_class
            else:
                att_vis_fn = model.calculate_all_attentions
                plot_class = model.attention_plot_class
            att_reporter = plot_class(
                att_vis_fn,
                data,
                args.outdir + "/att_ws",
                converter=converter,
                transform=load_cv,
                device=device,
                subsampling_factor=total_subsampling_factor,
            )
            trainer.extend(att_reporter, trigger=(1, "epoch"))
        else:
            att_reporter = None

        # Save CTC prob at each epoch
        if mtl_mode in ["ctc", "mtl"] and args.num_save_ctc > 0:
            # NOTE: sort it by output lengths
            data = sorted(
                list(valid_json.items())[: args.num_save_ctc],
                key=lambda x: int(x[1]["output"][0]["shape"][0]),
                reverse=True,
            )
            if hasattr(model, "module"):
                ctc_vis_fn = model.module.calculate_all_ctc_probs
                plot_class = model.module.ctc_plot_class
            else:
                ctc_vis_fn = model.calculate_all_ctc_probs
                plot_class = model.ctc_plot_class
            ctc_reporter = plot_class(
                ctc_vis_fn,
                data,
                args.outdir + "/ctc_prob",
                converter=converter,
                transform=load_cv,
                device=device,
                subsampling_factor=total_subsampling_factor,
            )
            trainer.extend(ctc_reporter, trigger=(1, "epoch"))
        else:
            ctc_reporter = None

        # Make a plot for training and validation values
        if args.num_encs > 1:
            report_keys_loss_ctc = [
                "main/loss_ctc{}".format(i + 1) for i in range(model.num_encs)
            ] + [
                "validation/main/loss_ctc{}".format(i + 1)
                for i in range(model.num_encs)
            ]
            report_keys_cer_ctc = [
                "main/cer_ctc{}".format(i + 1) for i in range(model.num_encs)
            ] + [
                "validation/main/cer_ctc{}".format(i + 1) for i in range(model.num_encs)
            ]

        if hasattr(model, "is_transducer"):
            trans_keys = [
                "main/loss",
                "validation/main/loss",
                "main/loss_trans",
                "validation/main/loss_trans",
            ]

            ctc_keys = (
                ["main/loss_ctc", "validation/main/loss_ctc"]
                if args.use_ctc_loss
                else []
            )

            aux_trans_keys = (
                [
                    "main/loss_aux_trans",
                    "validation/main/loss_aux_trans",
                ]
                if args.use_aux_transducer_loss
                else []
            )

            symm_kl_div_keys = (
                [
                    "main/loss_symm_kl_div",
                    "validation/main/loss_symm_kl_div",
                ]
                if args.use_symm_kl_div_loss
                else []
            )

            lm_keys = (
                [
                    "main/loss_lm",
                    "validation/main/loss_lm",
                ]
                if args.use_lm_loss
                else []
            )

            transducer_keys = (
                trans_keys + ctc_keys + aux_trans_keys + symm_kl_div_keys + lm_keys
            )

            trainer.extend(
                extensions.PlotReport(
                    transducer_keys,
                    "epoch",
                    file_name="loss.png",
                )
            )
        else:
            trainer.extend(
                extensions.PlotReport(
                    [
                        "main/loss",
                        "validation/main/loss",
                        "main/loss_ctc",
                        "validation/main/loss_ctc",
                        "main/loss_att",
                        "validation/main/loss_att",
                    ]
                    + ([] if args.num_encs == 1 else report_keys_loss_ctc),
                    "epoch",
                    file_name="loss.png",
                )
            )

        trainer.extend(
            extensions.PlotReport(
                ["main/acc", "validation/main/acc"], "epoch", file_name="acc.png"
            )
        )
        trainer.extend(
            extensions.PlotReport(
                ["main/cer_ctc", "validation/main/cer_ctc"]
                + ([] if args.num_encs == 1 else report_keys_loss_ctc),
                "epoch",
                file_name="cer.png",
            )
        )

        # Save best models
        trainer.extend(
            snapshot_object(model, "model.loss.best"),
            trigger=training.triggers.MinValueTrigger("validation/main/loss"),
        )
        if mtl_mode not in ["ctc", "transducer", "custom_transducer"]:
            trainer.extend(
                snapshot_object(model, "model.acc.best"),
                trigger=training.triggers.MaxValueTrigger("validation/main/acc"),
            )

        # save snapshot which contains model and optimizer states
        if args.save_interval_iters > 0:
            trainer.extend(
                torch_snapshot(filename="snapshot.iter.{.updater.iteration}"),
                trigger=(args.save_interval_iters, "iteration"),
            )

        # save snapshot at every epoch - for model averaging
        trainer.extend(torch_snapshot(), trigger=(1, "epoch"))

    # epsilon decay in the optimizer
    if args.opt == "adadelta":
        if args.criterion == "acc" and mtl_mode != "ctc":
            trainer.extend(
                restore_snapshot(
                    model, args.outdir + "/model.acc.best", load_fn=torch_load
                ),
                trigger=CompareValueTrigger(
                    "validation/main/acc",
                    lambda best_value, current_value: best_value > current_value,
                ),
            )
            trainer.extend(
                adadelta_eps_decay(args.eps_decay),
                trigger=CompareValueTrigger(
                    "validation/main/acc",
                    lambda best_value, current_value: best_value > current_value,
                ),
            )
        elif args.criterion == "loss":
            trainer.extend(
                restore_snapshot(
                    model, args.outdir + "/model.loss.best", load_fn=torch_load
                ),
                trigger=CompareValueTrigger(
                    "validation/main/loss",
                    lambda best_value, current_value: best_value < current_value,
                ),
            )
            trainer.extend(
                adadelta_eps_decay(args.eps_decay),
                trigger=CompareValueTrigger(
                    "validation/main/loss",
                    lambda best_value, current_value: best_value < current_value,
                ),
            )
        # NOTE: In some cases, it may take more than one epoch for the model's loss
        # to escape from a local minimum.
        # Thus, restore_snapshot extension is not used here.
        # see details in https://github.com/espnet/espnet/pull/2171
        elif args.criterion == "loss_eps_decay_only":
            trainer.extend(
                adadelta_eps_decay(args.eps_decay),
                trigger=CompareValueTrigger(
                    "validation/main/loss",
                    lambda best_value, current_value: best_value < current_value,
                ),
            )

    if is_writable_process(args, worldsize, rank, localrank):
        # Write a log of evaluation statistics for each epoch
        trainer.extend(
            extensions.LogReport(trigger=(args.report_interval_iters, "iteration"))
        )

        if hasattr(model, "is_transducer"):
            report_keys = (
                [
                    "epoch",
                    "iteration",
                ]
                + transducer_keys
                + ["elapsed_time"]
            )
        else:
            report_keys = [
                "epoch",
                "iteration",
                "main/loss",
                "main/loss_ctc",
                "main/loss_att",
                "validation/main/loss",
                "validation/main/loss_ctc",
                "validation/main/loss_att",
                "main/acc",
                "validation/main/acc",
                "main/cer_ctc",
                "validation/main/cer_ctc",
                "elapsed_time",
            ] + (
                [] if args.num_encs == 1 else report_keys_cer_ctc + report_keys_loss_ctc
            )

        if args.opt == "adadelta":
            trainer.extend(
                extensions.observe_value(
                    "eps",
                    lambda trainer: trainer.updater.get_optimizer("main").param_groups[
                        0
                    ]["eps"],
                ),
                trigger=(args.report_interval_iters, "iteration"),
            )
            report_keys.append("eps")
        if args.report_cer:
            report_keys.append("validation/main/cer")
        if args.report_wer:
            report_keys.append("validation/main/wer")
        trainer.extend(
            extensions.PrintReport(report_keys),
            trigger=(args.report_interval_iters, "iteration"),
        )

        trainer.extend(
            extensions.ProgressBar(update_interval=args.report_interval_iters)
        )
    set_early_stop(trainer, args)

    if is_writable_process(args, worldsize, rank, localrank):
        if args.tensorboard_dir is not None and args.tensorboard_dir != "":
            from torch.utils.tensorboard import SummaryWriter

            trainer.extend(
                TensorboardLogger(
                    SummaryWriter(args.tensorboard_dir),
                    att_reporter=att_reporter,
                    ctc_reporter=ctc_reporter,
                ),
                trigger=(args.report_interval_iters, "iteration"),
            )

    if args.use_ddp:
        # To avoid busy wait on non-main processes
        # during a main process is writing plot, logs, etc,
        # one additional extension must be added at the last.
        # Within this additional extension,
        # a main process will send a notification tensor
        # to other processes when the main process finishes
        # all operations like writing plot, log, etc.
        src_rank = 0  # TODO(lazykyama): removing hard-coded value.

        @training.make_extension(trigger=(1, "epoch"))
        def barrier_extension_per_epoch(trainer):
            notification = torch.zeros(1, device=device)
            dist.broadcast(notification, src=src_rank)
            torch.cuda.synchronize(device=device)

        trainer.extend(barrier_extension_per_epoch)

    # Run the training
    trainer.run()
    if is_writable_process(args, worldsize, rank, localrank):
        check_early_stop(trainer, args.epochs)


def recog(args):
    """Decode with the given args.

    Args:
        args (namespace): The program arguments.

    """
    set_deterministic_pytorch(args)
    model, train_args = load_trained_model(args.model, training=False)
    assert isinstance(model, ASRInterface)
    model.recog_args = args

    if args.quantize_config is not None:
        q_config = set([getattr(torch.nn, q) for q in args.quantize_config])
    else:
        q_config = {torch.nn.Linear}

    if args.quantize_asr_model:
        logging.info("Use a quantized ASR model for decoding.")

        # It seems quantized LSTM only supports non-packed sequence before torch 1.4.0.
        # Reference issue: https://github.com/pytorch/pytorch/issues/27963
        if (
            V(torch.__version__) < V("1.4.0")
            and "lstm" in train_args.etype
            and torch.nn.LSTM in q_config
        ):
            raise ValueError(
                "Quantized LSTM in ESPnet is only supported with torch 1.4+."
            )

        # Dunno why but weight_observer from dynamic quantized module must have
        # dtype=torch.qint8 with torch < 1.5 although dtype=torch.float16 is supported.
        if args.quantize_dtype == "float16" and V(torch.__version__) < V("1.5.0"):
            raise ValueError(
                "float16 dtype for dynamic quantization is not supported with torch "
                "version < 1.5.0. Switching to qint8 dtype instead."
            )

        dtype = getattr(torch, args.quantize_dtype)

        model = torch.quantization.quantize_dynamic(model, q_config, dtype=dtype)

    if args.streaming_mode and "transformer" in train_args.model_module:
        raise NotImplementedError("streaming mode for transformer is not implemented")
    logging.info(
        " Total parameter of the model = "
        + str(sum(p.numel() for p in model.parameters()))
    )

    # read rnnlm
    if args.rnnlm:
        rnnlm_args = get_model_conf(args.rnnlm, args.rnnlm_conf)
        if getattr(rnnlm_args, "model_module", "default") != "default":
            raise ValueError(
                "use '--api v2' option to decode with non-default language model"
            )
        rnnlm = lm_pytorch.ClassifierWithState(
            lm_pytorch.RNNLM(
                len(train_args.char_list),
                rnnlm_args.layer,
                rnnlm_args.unit,
                getattr(rnnlm_args, "embed_unit", None),  # for backward compatibility
            )
        )
        torch_load(args.rnnlm, rnnlm)
        if args.quantize_lm_model:
            dtype = getattr(torch, args.quantize_dtype)
            rnnlm = torch.quantization.quantize_dynamic(rnnlm, q_config, dtype=dtype)
        rnnlm.eval()
    else:
        rnnlm = None

    if args.word_rnnlm:
        rnnlm_args = get_model_conf(args.word_rnnlm, args.word_rnnlm_conf)
        word_dict = rnnlm_args.char_list_dict
        char_dict = {x: i for i, x in enumerate(train_args.char_list)}
        word_rnnlm = lm_pytorch.ClassifierWithState(
            lm_pytorch.RNNLM(
                len(word_dict),
                rnnlm_args.layer,
                rnnlm_args.unit,
                getattr(rnnlm_args, "embed_unit", None),  # for backward compatibility
            )
        )
        torch_load(args.word_rnnlm, word_rnnlm)
        word_rnnlm.eval()

        if rnnlm is not None:
            rnnlm = lm_pytorch.ClassifierWithState(
                extlm_pytorch.MultiLevelLM(
                    word_rnnlm.predictor, rnnlm.predictor, word_dict, char_dict
                )
            )
        else:
            rnnlm = lm_pytorch.ClassifierWithState(
                extlm_pytorch.LookAheadWordLM(
                    word_rnnlm.predictor, word_dict, char_dict
                )
            )

    # gpu
    if args.ngpu == 1:
        gpu_id = list(range(args.ngpu))
        logging.info("gpu id: " + str(gpu_id))
        model.cuda()
        if rnnlm:
            rnnlm.cuda()

    # read json data
    with open(args.recog_json, "rb") as f:
        js = json.load(f)["utts"]
    new_js = {}

    load_inputs_and_targets = LoadInputsAndTargets(
        mode="asr",
        load_output=False,
        sort_in_input_length=False,
        preprocess_conf=train_args.preprocess_conf
        if args.preprocess_conf is None
        else args.preprocess_conf,
        preprocess_args={"train": False},
    )

    # load transducer beam search
    if hasattr(model, "is_transducer"):
        if hasattr(model, "dec"):
            trans_decoder = model.dec
        else:
            trans_decoder = model.decoder
        joint_network = model.transducer_tasks.joint_network

        beam_search_transducer = BeamSearchTransducer(
            decoder=trans_decoder,
            joint_network=joint_network,
            beam_size=args.beam_size,
            lm=rnnlm,
            lm_weight=args.lm_weight,
            search_type=args.search_type,
            max_sym_exp=args.max_sym_exp,
            u_max=args.u_max,
            nstep=args.nstep,
            prefix_alpha=args.prefix_alpha,
            expansion_gamma=args.expansion_gamma,
            expansion_beta=args.expansion_beta,
            score_norm=args.score_norm,
            softmax_temperature=args.softmax_temperature,
            nbest=args.nbest,
            quantization=args.quantize_asr_model,
        )

    if args.batchsize == 0:
        with torch.no_grad():
            for idx, name in enumerate(js.keys(), 1):
                logging.info("(%d/%d) decoding " + name, idx, len(js.keys()))
                batch = [(name, js[name])]
                feat = load_inputs_and_targets(batch)
                feat = (
                    feat[0][0]
                    if args.num_encs == 1
                    else [feat[idx][0] for idx in range(model.num_encs)]
                )
                if args.streaming_mode == "window" and args.num_encs == 1:
                    logging.info(
                        "Using streaming recognizer with window size %d frames",
                        args.streaming_window,
                    )
                    se2e = WindowStreamingE2E(e2e=model, recog_args=args, rnnlm=rnnlm)
                    for i in range(0, feat.shape[0], args.streaming_window):
                        logging.info(
                            "Feeding frames %d - %d", i, i + args.streaming_window
                        )
                        se2e.accept_input(feat[i : i + args.streaming_window])
                    logging.info("Running offline attention decoder")
                    se2e.decode_with_attention_offline()
                    logging.info("Offline attention decoder finished")
                    nbest_hyps = se2e.retrieve_recognition()
                elif args.streaming_mode == "segment" and args.num_encs == 1:
                    logging.info(
                        "Using streaming recognizer with threshold value %d",
                        args.streaming_min_blank_dur,
                    )
                    nbest_hyps = []
                    for n in range(args.nbest):
                        nbest_hyps.append({"yseq": [], "score": 0.0})
                    se2e = SegmentStreamingE2E(e2e=model, recog_args=args, rnnlm=rnnlm)
                    r = np.prod(model.subsample)
                    for i in range(0, feat.shape[0], r):
                        hyps = se2e.accept_input(feat[i : i + r])
                        if hyps is not None:
                            text = "".join(
                                [
                                    train_args.char_list[int(x)]
                                    for x in hyps[0]["yseq"][1:-1]
                                    if int(x) != -1
                                ]
                            )
                            text = text.replace(
                                "\u2581", " "
                            ).strip()  # for SentencePiece
                            text = text.replace(model.space, " ")
                            text = text.replace(model.blank, "")
                            logging.info(text)
                            for n in range(args.nbest):
                                nbest_hyps[n]["yseq"].extend(hyps[n]["yseq"])
                                nbest_hyps[n]["score"] += hyps[n]["score"]
                elif hasattr(model, "is_transducer"):
                    nbest_hyps = model.recognize(feat, beam_search_transducer)
                else:
                    nbest_hyps = model.recognize(
                        feat, args, train_args.char_list, rnnlm
                    )
                new_js[name] = add_results_to_json(
                    js[name], nbest_hyps, train_args.char_list
                )

    else:

        def grouper(n, iterable, fillvalue=None):
            kargs = [iter(iterable)] * n
            return itertools.zip_longest(*kargs, fillvalue=fillvalue)

        # sort data if batchsize > 1
        keys = list(js.keys())
        if args.batchsize > 1:
            feat_lens = [js[key]["input"][0]["shape"][0] for key in keys]
            sorted_index = sorted(range(len(feat_lens)), key=lambda i: -feat_lens[i])
            keys = [keys[i] for i in sorted_index]

        with torch.no_grad():
            for names in grouper(args.batchsize, keys, None):
                names = [name for name in names if name]
                batch = [(name, js[name]) for name in names]
                feats = (
                    load_inputs_and_targets(batch)[0]
                    if args.num_encs == 1
                    else load_inputs_and_targets(batch)
                )
                if args.streaming_mode == "window" and args.num_encs == 1:
                    raise NotImplementedError
                elif args.streaming_mode == "segment" and args.num_encs == 1:
                    if args.batchsize > 1:
                        raise NotImplementedError
                    feat = feats[0]
                    nbest_hyps = []
                    for n in range(args.nbest):
                        nbest_hyps.append({"yseq": [], "score": 0.0})
                    se2e = SegmentStreamingE2E(e2e=model, recog_args=args, rnnlm=rnnlm)
                    r = np.prod(model.subsample)
                    for i in range(0, feat.shape[0], r):
                        hyps = se2e.accept_input(feat[i : i + r])
                        if hyps is not None:
                            text = "".join(
                                [
                                    train_args.char_list[int(x)]
                                    for x in hyps[0]["yseq"][1:-1]
                                    if int(x) != -1
                                ]
                            )
                            text = text.replace(
                                "\u2581", " "
                            ).strip()  # for SentencePiece
                            text = text.replace(model.space, " ")
                            text = text.replace(model.blank, "")
                            logging.info(text)
                            for n in range(args.nbest):
                                nbest_hyps[n]["yseq"].extend(hyps[n]["yseq"])
                                nbest_hyps[n]["score"] += hyps[n]["score"]
                    nbest_hyps = [nbest_hyps]
                else:
                    nbest_hyps = model.recognize_batch(
                        feats, args, train_args.char_list, rnnlm=rnnlm
                    )

                for i, nbest_hyp in enumerate(nbest_hyps):
                    name = names[i]
                    new_js[name] = add_results_to_json(
                        js[name], nbest_hyp, train_args.char_list
                    )

    with open(args.result_label, "wb") as f:
        f.write(
            json.dumps(
                {"utts": new_js}, indent=4, ensure_ascii=False, sort_keys=True
            ).encode("utf_8")
        )


def enhance(args):
    """Dumping enhanced speech and mask.

    Args:
        args (namespace): The program arguments.
    """
    set_deterministic_pytorch(args)
    # read training config
    idim, odim, train_args = get_model_conf(args.model, args.model_conf)

    # TODO(ruizhili): implement enhance for multi-encoder model
    assert args.num_encs == 1, "number of encoder should be 1 ({} is given)".format(
        args.num_encs
    )

    # load trained model parameters
    logging.info("reading model parameters from " + args.model)
    model_class = dynamic_import(train_args.model_module)
    model = model_class(idim, odim, train_args)
    assert isinstance(model, ASRInterface)
    torch_load(args.model, model)
    model.recog_args = args

    # gpu
    if args.ngpu == 1:
        gpu_id = list(range(args.ngpu))
        logging.info("gpu id: " + str(gpu_id))
        model.cuda()

    # read json data
    with open(args.recog_json, "rb") as f:
        js = json.load(f)["utts"]

    load_inputs_and_targets = LoadInputsAndTargets(
        mode="asr",
        load_output=False,
        sort_in_input_length=False,
        preprocess_conf=None,  # Apply pre_process in outer func
    )
    if args.batchsize == 0:
        args.batchsize = 1

    # Creates writers for outputs from the network
    if args.enh_wspecifier is not None:
        enh_writer = file_writer_helper(args.enh_wspecifier, filetype=args.enh_filetype)
    else:
        enh_writer = None

    # Creates a Transformation instance
    preprocess_conf = (
        train_args.preprocess_conf
        if args.preprocess_conf is None
        else args.preprocess_conf
    )
    if preprocess_conf is not None:
        logging.info(f"Use preprocessing: {preprocess_conf}")
        transform = Transformation(preprocess_conf)
    else:
        transform = None

    # Creates a IStft instance
    istft = None
    frame_shift = args.istft_n_shift  # Used for plot the spectrogram
    if args.apply_istft:
        if preprocess_conf is not None:
            # Read the conffile and find stft setting
            with open(preprocess_conf) as f:
                # Json format: e.g.
                #    {"process": [{"type": "stft",
                #                  "win_length": 400,
                #                  "n_fft": 512, "n_shift": 160,
                #                  "window": "han"},
                #                 {"type": "foo", ...}, ...]}
                conf = json.load(f)
                assert "process" in conf, conf
                # Find stft setting
                for p in conf["process"]:
                    if p["type"] == "stft":
                        istft = IStft(
                            win_length=p["win_length"],
                            n_shift=p["n_shift"],
                            window=p.get("window", "hann"),
                        )
                        logging.info(
                            "stft is found in {}. "
                            "Setting istft config from it\n{}".format(
                                preprocess_conf, istft
                            )
                        )
                        frame_shift = p["n_shift"]
                        break
        if istft is None:
            # Set from command line arguments
            istft = IStft(
                win_length=args.istft_win_length,
                n_shift=args.istft_n_shift,
                window=args.istft_window,
            )
            logging.info(
                "Setting istft config from the command line args\n{}".format(istft)
            )

    # sort data
    keys = list(js.keys())
    feat_lens = [js[key]["input"][0]["shape"][0] for key in keys]
    sorted_index = sorted(range(len(feat_lens)), key=lambda i: -feat_lens[i])
    keys = [keys[i] for i in sorted_index]

    def grouper(n, iterable, fillvalue=None):
        kargs = [iter(iterable)] * n
        return itertools.zip_longest(*kargs, fillvalue=fillvalue)

    num_images = 0
    if not os.path.exists(args.image_dir):
        os.makedirs(args.image_dir)

    for names in grouper(args.batchsize, keys, None):
        batch = [(name, js[name]) for name in names]

        # May be in time region: (Batch, [Time, Channel])
        org_feats = load_inputs_and_targets(batch)[0]
        if transform is not None:
            # May be in time-freq region: : (Batch, [Time, Channel, Freq])
            feats = transform(org_feats, train=False)
        else:
            feats = org_feats

        with torch.no_grad():
            enhanced, mask, ilens = model.enhance(feats)

        for idx, name in enumerate(names):
            # Assuming mask, feats : [Batch, Time, Channel. Freq]
            #          enhanced    : [Batch, Time, Freq]
            enh = enhanced[idx][: ilens[idx]]
            mas = mask[idx][: ilens[idx]]
            feat = feats[idx]

            # Plot spectrogram
            if args.image_dir is not None and num_images < args.num_images:
                import matplotlib

                matplotlib.use("Agg")
                import matplotlib.pyplot as plt

                num_images += 1
                ref_ch = 0

                plt.figure(figsize=(20, 10))
                plt.subplot(4, 1, 1)
                plt.title("Mask [ref={}ch]".format(ref_ch))
                plot_spectrogram(
                    plt,
                    mas[:, ref_ch].T,
                    fs=args.fs,
                    mode="linear",
                    frame_shift=frame_shift,
                    bottom=False,
                    labelbottom=False,
                )

                plt.subplot(4, 1, 2)
                plt.title("Noisy speech [ref={}ch]".format(ref_ch))
                plot_spectrogram(
                    plt,
                    feat[:, ref_ch].T,
                    fs=args.fs,
                    mode="db",
                    frame_shift=frame_shift,
                    bottom=False,
                    labelbottom=False,
                )

                plt.subplot(4, 1, 3)
                plt.title("Masked speech [ref={}ch]".format(ref_ch))
                plot_spectrogram(
                    plt,
                    (feat[:, ref_ch] * mas[:, ref_ch]).T,
                    frame_shift=frame_shift,
                    fs=args.fs,
                    mode="db",
                    bottom=False,
                    labelbottom=False,
                )

                plt.subplot(4, 1, 4)
                plt.title("Enhanced speech")
                plot_spectrogram(
                    plt, enh.T, fs=args.fs, mode="db", frame_shift=frame_shift
                )

                plt.savefig(os.path.join(args.image_dir, name + ".png"))
                plt.clf()

            # Write enhanced wave files
            if enh_writer is not None:
                if istft is not None:
                    enh = istft(enh)
                else:
                    enh = enh

                if args.keep_length:
                    if len(org_feats[idx]) < len(enh):
                        # Truncate the frames added by stft padding
                        enh = enh[: len(org_feats[idx])]
                    elif len(org_feats) > len(enh):
                        padwidth = [(0, (len(org_feats[idx]) - len(enh)))] + [
                            (0, 0)
                        ] * (enh.ndim - 1)
                        enh = np.pad(enh, padwidth, mode="constant")

                if args.enh_filetype in ("sound", "sound.hdf5"):
                    enh_writer[name] = (args.fs, enh)
                else:
                    # Hint: To dump stft_signal, mask or etc,
                    # enh_filetype='hdf5' might be convenient.
                    enh_writer[name] = enh

            if num_images >= args.num_images and enh_writer is None:
                logging.info("Breaking the process.")
                break
