From 090fd22bf60e89c34740ca356dae61eed19db873 Mon Sep 17 00:00:00 2001 From: Steven Atkinson Date: Sun, 14 Jan 2024 13:41:37 -0800 Subject: [PATCH] [BREAKING] Remove parametric modeling code (#367) --- bin/export/main.py | 73 --- bin/train/inputs/models/catlstm.json | 42 -- bin/train/main.py | 15 +- nam/data.py | 110 +--- nam/models/base.py | 5 - nam/models/parametric/__init__.py | 3 - nam/models/parametric/catnets.py | 215 ------- nam/models/parametric/hyper_net.py | 559 ------------------ nam/models/parametric/params.py | 71 --- .../test_models/test_parametric/__init__.py | 3 - .../test_parametric/test_catnets.py | 50 -- .../test_parametric/test_hyper_net.py | 86 --- 12 files changed, 10 insertions(+), 1222 deletions(-) delete mode 100644 bin/export/main.py delete mode 100644 bin/train/inputs/models/catlstm.json delete mode 100644 nam/models/parametric/__init__.py delete mode 100644 nam/models/parametric/catnets.py delete mode 100644 nam/models/parametric/hyper_net.py delete mode 100644 nam/models/parametric/params.py delete mode 100644 tests/test_nam/test_models/test_parametric/__init__.py delete mode 100644 tests/test_nam/test_models/test_parametric/test_catnets.py delete mode 100644 tests/test_nam/test_models/test_parametric/test_hyper_net.py diff --git a/bin/export/main.py b/bin/export/main.py deleted file mode 100644 index 2b764a0d..00000000 --- a/bin/export/main.py +++ /dev/null @@ -1,73 +0,0 @@ -# File: export.py -# Created Date: Sunday February 6th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -""" -Export a model to TorchScript -""" - -import json -from argparse import ArgumentParser -from pathlib import Path - -import torch - -from nam.models import Model -from nam.models._base import ParametricBaseNet -from nam.models.parametric.catnets import Param - - -class Dummy(torch.nn.Module): - def forward(self, x): - return x[8191:] - - -def main(args): - outdir = Path(args.outdir) - with open(args.model_config_path, "r") as fp: - net = Model.load_from_checkpoint( - args.checkpoint, **Model.parse_config(json.load(fp)) - ).net - if not isinstance(net, ParametricBaseNet): - export_args = (outdir,) - else: - if args.param_config is None: - raise ValueError("Require param config for parametric model") - with open(Path(args.param_config), "r") as fp: - param_config = { - k: Param.init_from_config(v) for k, v in json.load(fp).items() - } - export_args = (outdir, param_config) - net.cpu() - net.eval() - outdir.mkdir(parents=True, exist_ok=True) - net.export(*export_args, include_snapshot=args.snapshot) - if args.cpp: - net.export_cpp_header( - Path(export_args[0], "HardCodedModel.h"), *export_args[1:] - ) - if args.onnx: - net.export_onnx(Path(outdir, "model.onnx")) - - -if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument("model_config_path", type=str) - parser.add_argument("checkpoint", type=str) - parser.add_argument("outdir") - parser.add_argument( - "--param-config", type=str, help="Configuration for a parametric model" - ) - parser.add_argument("--onnx", action="store_true", help="Export an ONNX model") - parser.add_argument( - "--cpp", action="store_true", help="Export a CPP header for hard-coding a model" - ) - parser.add_argument( - "--snapshot", - "-s", - action="store_true", - help="Computes an example input-output pair for the model for debugging " - "purposes", - ) - - main(parser.parse_args()) diff --git a/bin/train/inputs/models/catlstm.json b/bin/train/inputs/models/catlstm.json deleted file mode 100644 index 346ddcbb..00000000 --- a/bin/train/inputs/models/catlstm.json +++ /dev/null @@ -1,42 +0,0 @@ -{ - "_comments": [ - "Parametric extension of the LSTM model. All LSTM tips apply plus:", - " * Make sure that `input_size` is the number of knobs plus one. I've set it", - " up like we're modeling a tube screamer (drive/tone/level), so 1+3=4.", - " * Doesn't seem like the model needs to be all that bigger than the", - " non-parametric version, even if you're modeling a fair number of knobs.", - " * You'll probably have a much larger dataset, so validating every so often ", - " in steps instead of epochs helps. Make sure to also set val_check_interval", - " under the trainer dict in your learning config JSON.", - "", - "Dev note: Ensure that tests/test_bin/test_train/test_main.py's data is ", - "representative of this!" - ], - "net": { - "name": "CatLSTM", - "config": { - "num_layers": 4, - "hidden_size": 32, - "train_burn_in": 4096, - "train_truncate": 512, - "input_size": 4 - } - }, - "loss": { - "val_loss": "mse", - "mask_first": 4096, - "pre_emph_weight": 1.0, - "pre_emph_coef": 0.85 - }, - "optimizer": { - "lr": 0.01 - }, - "lr_scheduler": { - "class": "ExponentialLR", - "kwargs": { - "gamma": 0.995 - }, - "interval": "step", - "frequency": 100 - } -} \ No newline at end of file diff --git a/bin/train/main.py b/bin/train/main.py index aed72a2f..270e6c30 100644 --- a/bin/train/main.py +++ b/bin/train/main.py @@ -31,9 +31,8 @@ def _ensure_graceful_shutdowns(): import torch from torch.utils.data import DataLoader -from nam.data import ConcatDataset, ParametricDataset, Split, init_dataset +from nam.data import ConcatDataset, Split, init_dataset from nam.models import Model -from nam.models._base import BaseNet # HACK access from nam.util import filter_warnings, timestamp torch.manual_seed(0) @@ -86,8 +85,7 @@ def extend_savefig(i, savefig): tx = len(ds.x) / 48_000 print(f"Run (t={tx:.2f})") t0 = time() - args = (ds.vals, ds.x) if isinstance(ds, ParametricDataset) else (ds.x,) - output = model(*args).flatten().cpu().numpy() + output = model(ds.x).flatten().cpu().numpy() t1 = time() try: rt = f"{tx / (t1 - t0):.2f}" @@ -96,12 +94,8 @@ def extend_savefig(i, savefig): print(f"Took {t1 - t0:.2f} ({rt}x)") plt.figure(figsize=(16, 5)) - # plt.plot(ds.x[window_start:window_end], label="Input") plt.plot(output[window_start:window_end], label="Prediction") plt.plot(ds.y[window_start:window_end], linestyle="--", label="Target") - # plt.plot( - # ds.y[window_start:window_end] - output[window_start:window_end], label="Error" - # ) nrmse = _rms(torch.Tensor(output) - ds.y) / _rms(ds.y) esr = nrmse**2 plt.title(f"ESR={esr:.3f}") @@ -227,9 +221,8 @@ def main_inner( show=False, ) plot(model, dataset_validation, show=not no_show) - # Convenient export for snapshot models: - if isinstance(model.net, BaseNet): - model.net.export(outdir) + # Export! + model.net.export(outdir) if __name__ == "__main__": diff --git a/nam/data.py b/nam/data.py index deb0b699..a56ab4da 100644 --- a/nam/data.py +++ b/nam/data.py @@ -150,16 +150,10 @@ def np_to_wav( class AbstractDataset(_Dataset, abc.ABC): @abc.abstractmethod - def __getitem__( - self, idx: int - ) -> Union[ - Tuple[torch.Tensor, torch.Tensor], - Tuple[torch.Tensor, torch.Tensor, torch.Tensor], - ]: + def __getitem__(self, idx: int): """ + Get input and output audio segment for training / evaluation. :return: - Case 1: Input (N1,), Output (N2,) - Case 2: Parameters (D,), Input (N1,), Output (N2,) """ pass @@ -226,8 +220,6 @@ class StopError(StartStopError): class Dataset(AbstractDataset, InitializableFromConfig): """ Take a pair of matched audio files and serve input + output pairs. - - No conditioning parameters associated w/ the data. """ def __init__( @@ -666,75 +658,6 @@ def _validate_preceding_silence( ) -class ParametricDataset(Dataset): - """ - Additionally tracks some conditioning parameters - """ - - def __init__(self, params: Dict[str, Union[bool, float, int]], *args, **kwargs): - super().__init__(*args, **kwargs) - self._keys = sorted(tuple(k for k in params.keys())) - self._vals = torch.Tensor([float(params[k]) for k in self._keys]) - - @classmethod - def init_from_config(cls, config): - if "slices" not in config: - return super().init_from_config(config) - else: - return cls.init_from_config_with_slices(config) - - @classmethod - def init_from_config_with_slices(cls, config): - config, x, y, slices = cls.parse_config_with_slices(config) - datasets = [] - for s in tqdm(slices, desc="Slices..."): - c = deepcopy(config) - start, stop, params = [s[k] for k in ("start", "stop", "params")] - c.update(x=x[start:stop], y=y[start:stop], params=params) - if "delay" in s: - c["delay"] = s["delay"] - datasets.append(ParametricDataset(**c)) - return ConcatDataset(datasets) - - @classmethod - def parse_config(cls, config): - assert "slices" not in config - params = config["params"] - return { - "params": params, - "id": config.get("id"), - "common_params": config.get("common_params"), - "param_map": config.get("param_map"), - **super().parse_config(config), - } - - @classmethod - def parse_config_with_slices(cls, config): - slices = config["slices"] - config = super().parse_config(config) - x, y = [config.pop(k) for k in "xy"] - return config, x, y, slices - - def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - """ - :return: - Parameter values (D,) - Input (NX+NY-1,) - Output (NY,) - """ - # FIXME don't override signature - x, y = super().__getitem__(idx) - return self.vals, x, y - - @property - def keys(self) -> Tuple[str]: - return self._keys - - @property - def vals(self): - return self._vals - - class ConcatDataset(AbstractDataset, InitializableFromConfig): def __init__(self, datasets: Sequence[Dataset], flatten=True): if flatten: @@ -815,35 +738,23 @@ def _validate_datasets(cls, datasets: Sequence[Dataset]): raise ValueError( f"Mismatch between ny of datasets {ref_ny.index} ({ref_ny.val}) and {i} ({d.ny})" ) - if isinstance(d, ParametricDataset): - val = d.keys - if ref_keys is None: - ref_keys = Reference(i, val) - if val != ref_keys.val: - raise ValueError( - f"Mismatch between keys of datasets {ref_keys.index} " - f"({ref_keys.val}) and {i} ({val})" - ) -_dataset_init_registry = { - "dataset": Dataset.init_from_config, - "parametric": ParametricDataset.init_from_config, # To be removed in v0.8 -} +_dataset_init_registry = {"dataset": Dataset.init_from_config} def register_dataset_initializer( name: str, constructor: Callable[[Any], AbstractDataset], overwrite=False ): """ - If you have otehr data set types, you can register their initializer by name using + If you have other data set types, you can register their initializer by name using this. For example, the basic NAM is registered by default under the name "default", but if it weren't, you could register it like this: >>> from nam import data - >>> data.register_dataset_initializer("parametric", data.Dataset.init_from_config) + >>> data.register_dataset_initializer("parametric", MyParametricDataset.init_from_config) :param name: The name that'll be used in the config to ask for the data set type :param constructor: The constructor that'll be fed the config. @@ -856,16 +767,7 @@ def register_dataset_initializer( def init_dataset(config, split: Split) -> AbstractDataset: - if "parametric" in config: - logger.warning( - "Using the 'parametric' keyword is deprecated and will be removed in next " - "version. Instead, register the parametric dataset type using " - "`nam.data.register_dataset_initializer()` and then specify " - '`"type": "name"` in the config, using the name you registered.' - ) - name = "parametric" if config["parametric"] else "dataset" - else: - name = config.get("type", "dataset") + name = config.get("type", "dataset") base_config = config[split.value] common = config.get("common", {}) if isinstance(base_config, dict): diff --git a/nam/models/base.py b/nam/models/base.py index 11dfc75e..a448e4f6 100644 --- a/nam/models/base.py +++ b/nam/models/base.py @@ -24,8 +24,6 @@ from .conv_net import ConvNet from .linear import Linear from .losses import apply_pre_emphasis_filter, esr, multi_resolution_stft_loss, mse_fft -from .parametric.catnets import CatLSTM, CatWaveNet -from .parametric.hyper_net import HyperConvNet from .recurrent import LSTM from .wavenet import WaveNet @@ -120,10 +118,7 @@ class _LossItem(NamedTuple): _model_net_init_registry = { - "CatLSTM": CatLSTM.init_from_config, - "CatWaveNet": CatWaveNet.init_from_config, "ConvNet": ConvNet.init_from_config, - "HyperConvNet": HyperConvNet.init_from_config, "Linear": Linear.init_from_config, "LSTM": LSTM.init_from_config, "WaveNet": WaveNet.init_from_config, diff --git a/nam/models/parametric/__init__.py b/nam/models/parametric/__init__.py deleted file mode 100644 index ab3731ed..00000000 --- a/nam/models/parametric/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# File: __init__.py -# Created Date: Sunday July 17th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) diff --git a/nam/models/parametric/catnets.py b/nam/models/parametric/catnets.py deleted file mode 100644 index ac02e264..00000000 --- a/nam/models/parametric/catnets.py +++ /dev/null @@ -1,215 +0,0 @@ -# File: catnets.py -# Created Date: Wednesday June 22nd 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -""" -"Cat nets" -- parametric models where the parametric input is concatenated to the -input samples -""" - -import abc -import logging -from enum import Enum -from contextlib import contextmanager -from pathlib import Path -from typing import Any, Dict, Tuple - -import numpy as np -import torch - -from .._base import ParametricBaseNet -from ..recurrent import LSTM -from ..wavenet import WaveNet -from .params import Param - -logger = logging.getLogger(__name__) - - -class _ShapeType(Enum): - CONV = "conv" # (B,C,L) - RNN = "rnn" # (B,L,D) - - -class _CatMixin(ParametricBaseNet): - """ - Parameteric nets that concatenate the params with the input at each time point - Mix in with a non-parametric class like - - ``` - class CatLSTM(LSTM, _CatMixin): - pass - ``` - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # Hacky, see .export() - self._sidedoor_parametric_config = None - - @abc.abstractproperty - def _shape_type(self) -> _ShapeType: - pass - - @abc.abstractproperty - def _single_class(self): - """ " - The class for the non-parametric model that this is extending - """ - # TODO verify that single class satisfies requirements - # ._export_weights() - # ._export_input_output() - pass # HACK - - def export(self, outdir: Path, parametric_config: Dict[str, Param], **kwargs): - """ - Interface for exporting. - You should create at least a `config.json` containing the fields: - * "version" (str) - * "architecture" (str) - * "config": (dict w/ other necessary data like tensor shapes etc) - - :param outdir: Assumed to exist. Can be edited inside at will. - """ - with self._use_parametric_config(parametric_config): - return super().export(outdir, **kwargs) - - def export_cpp_header(self, filename: Path, parametric_config: Dict[str, Param]): - with self._use_parametric_config(parametric_config): - return super().export_cpp_header(filename) - - def _export_config(self): - """ - Adds in the sidedoored parametric pieces - - :paramtric_config: the dict of parameter info (name, type, etc) - """ - config = super()._export_config() - if not isinstance(config, dict): - raise TypeError( - f"Parameteric models' base configs must be a dict; got {type(config)}" - ) - parametric_key = "parametric" - if parametric_key in config: - raise ValueError( - f'Already found parametric key "{parametric_key}" in base config dict.' - ) - # Yucky sidedoor - config[parametric_key] = { - k: v.to_json() for k, v in self._sidedoor_parametric_config.items() - } - return config - - def _export_cpp_header_parametric(self, config): - if config is None: - return self._single_class._export_cpp_head_parametric(self, config) - s_parametric = [ - 'nlohmann::json PARAMETRIC = nlohmann::json::parse(R"(\n', - " {\n", - ] - for i, (key, val) in enumerate(config.items(), 1): - s_parametric.append(f' "{key}": ' "{\n") - for j, (k2, v2) in enumerate(val.items(), 1): - v_str = f'"{v2}"' if isinstance(v2, str) else str(v2) - s_parametric.append( - f' "{k2}": {v_str}' + (",\n" if j < len(val) else "\n") - ) - s_parametric.append(" }" f"{',' if i < len(config) else ''}\n") - s_parametric.append(" }\n") - s_parametric.append(')");\n') - return tuple(s_parametric) - - def _export_input_output_args(self) -> Tuple[torch.Tensor]: - return (self._sidedoor_params_to_tensor(),) - - def _forward(self, params, x): - """ - :param params: (N,D) - :param x: (N,L1) - - :return: (N,L2) - """ - sequence_length = x.shape[1] - x_augmented = ( - torch.cat( - [ - x[..., None], - torch.tile(params[:, None, :], (1, sequence_length, 1)), - ], - dim=2, - ) - if self._shape_type == _ShapeType.RNN - else torch.cat( - [x[:, None, :], torch.tile(params[..., None], (1, 1, sequence_length))], - dim=1, - ) - ) - return self._single_class._forward(self, x_augmented) - - def _sidedoor_params_to_tensor(self) -> torch.Tensor: - param_names = sorted([k for k in self._sidedoor_parametric_config.keys()]) - params = torch.Tensor( - [self._sidedoor_parametric_config[k].default_value for k in param_names] - ) - return params - - @contextmanager - def _use_parametric_config(self, c): - """ - Sneaks in the parametric config while exporting - """ - try: - self._sidedoor_parametric_config = c - yield None - finally: - self._sidedoor_parametric_config = None - - -class CatLSTM(_CatMixin, LSTM): - @property - def _shape_type(self) -> _ShapeType: - return _ShapeType.RNN - - @property - def _single_class(self): - return LSTM - - def _append_default_params(self, x: torch.Tensor) -> torch.Tensor: - """ - Requires sidedoor'd params - - :param x: (B,L) - :return: (B,L,1+D) - """ - assert x.ndim == 2 - params = self._sidedoor_params_to_tensor() - sequence_length = x.shape[1] - return torch.cat( - [ - x[:, :, None], - torch.tile(params[None, None, :], (1, sequence_length, 1)), - ], - dim=2, - ) - - def _at_nominal_settings(self, x: torch.Tensor) -> torch.Tensor: - if self._input_size != 1: - logger.warning( - "Nominal settings aren't defined for parametric models; outputting unity" - ) - return x - params = torch.zeros(()).to(x.device) - return self(params, x) - - def _get_initial_state(self) -> Tuple[torch.Tensor, torch.Tensor]: - inputs = self._append_default_params(torch.zeros((1, 48_000))) - return super()._get_initial_state(inputs=inputs) - - -class CatWaveNet(_CatMixin, WaveNet): - @property - def _shape_type(self) -> _ShapeType: - return _ShapeType.CONV - - @property - def _single_class(self): - return WaveNet diff --git a/nam/models/parametric/hyper_net.py b/nam/models/parametric/hyper_net.py deleted file mode 100644 index 73c9ea30..00000000 --- a/nam/models/parametric/hyper_net.py +++ /dev/null @@ -1,559 +0,0 @@ -# File: hyper_net.py -# Created Date: Sunday May 29th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -import abc -import json -import math -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from tempfile import TemporaryDirectory -from typing import Any, Callable, List, Optional, Tuple - -import numpy as np -import torch -import torch.nn as nn -import torch.nn.functional as F -from torch.nn.init import ( - calculate_gain, - _calculate_correct_fan, - _calculate_fan_in_and_fan_out, -) - -from ..._version import __version__ -from .._base import ParametricBaseNet - - -class SpecialLayers(Enum): - CONV = "conv" - BATCHNORM = "batchnorm" - - -@dataclass -class LayerSpec: - """ - Helps the hypernet - """ - - special_type: Optional[str] - shapes: Tuple[Tuple[int]] - norms: Tuple[float] - biases: Tuple[float] - - -class _NetLayer(nn.Module, abc.ABC): - @abc.abstractproperty - def num_tensors(self) -> int: - pass - - @abc.abstractmethod - def get_spec(self) -> LayerSpec: - pass - - -class _Conv(nn.Conv1d, _NetLayer): - @property - def num_tensors(self): - return 2 if self.bias is not None else 1 - - def forward(self, params, inputs): - # Use depthwise convolution trick to process the convolutions together - cout, cin, kernel_size = self.weight.shape - n = len(params[0]) - weight = params[0].reshape((n * cout, cin, kernel_size)) # (N, CinCout) - bias = params[1].flatten() if self.bias is not None else None - groups = n - return F.conv1d( - inputs.reshape((1, n * cin, -1)), - weight, - bias=bias, - stride=self.stride, - padding=self.padding, - dilation=self.dilation, - groups=groups, - ).reshape((n, cout, -1)) - - def get_spec(self): - shapes = ( - (self.weight.shape,) - if self.bias is None - else (self.weight.shape, self.bias.shape) - ) - norms = ( - (self._weight_norm(),) - if self.bias is None - else (self._weight_norm(), self._bias_norm()) - ) - biases = (0,) if self.bias is None else (0, 0) - return LayerSpec(SpecialLayers("conv"), shapes, norms, biases) - - def _bias_norm(self): - # https://pytorch.org/docs/stable/_modules/torch/nn/modules/conv.html#Conv1d - fan = _calculate_fan_in_and_fan_out(self.weight.data)[0] - bound = 1.0 / math.sqrt(fan) - std = math.sqrt(1.0 / 12.0) * (2 * bound) - # LayerNorm handles division by number of dimensions... - return std - - def _weight_norm(self): - """ - Std of the unfiorm distribution used in initialization - """ - # https://pytorch.org/docs/stable/_modules/torch/nn/modules/conv.html#Conv1d - fan = _calculate_correct_fan(self.weight.data, "fan_in") - # Kaiming uniform w/ a=sqrt(5) - gain = calculate_gain("leaky_relu", 5.0) - std = gain / math.sqrt(fan) - # LayerNorm handles division by number of dimensions... - return std - - -class _BatchNorm(nn.BatchNorm1d, _NetLayer): - def __init__(self, num_features, *args, affine=True, **kwargs): - # Handle affine params outside of parent class - super().__init__(num_features, *args, affine=False, **kwargs) - self._num_features = num_features - assert affine - self._affine = affine - - @property - def num_tensors(self) -> int: - return 2 - - def get_spec(self) -> LayerSpec: - return LayerSpec( - SpecialLayers.BATCHNORM, - ((self._num_features,), (self._num_features,)), - (1.0e-5, 1.0e-5), - (1.0, 0.0), - ) - - def forward(self, params, inputs): - """ - Only change is we need to provide *params into F.batch_norm instead of - self.weight, self.bias - """ - # Also use "inputs" instead of "input" to not collide w/ builtin (ew) - weight, bias = [z[:, :, None] for z in params] - pre_affine = super().forward(inputs) - return weight * pre_affine + bias - - -class _Affine(nn.Module): - def __init__(self, scale: torch.Tensor, bias: torch.Tensor): - super().__init__() - self._weight = nn.Parameter(scale) - self._bias = nn.Parameter(bias) - - @property - def bias(self) -> nn.Parameter: - return self._bias - - @property - def weight(self) -> nn.Parameter: - return self._weight - - def forward(self, inputs): - return self._weight * inputs + self._bias - - -class HyperNet(nn.Module): - """ - MLP followed by layer norms on split-up dims - """ - - def __init__(self, d_in, net, numels, norms, biases): - super().__init__() - self._net = net - # Figure out the scale factor empirically - norm0 = net(torch.randn((10_000, d_in))).std(dim=0).mean().item() - self._cum_numel = torch.cat( - [torch.LongTensor([0]), torch.cumsum(torch.LongTensor(numels), dim=0)] - ) - affine_scale = torch.cat( - [torch.full((numel,), norm / norm0) for numel, norm in zip(numels, norms)] - ) - affine_bias = torch.cat( - [ - torch.full((numel,), bias, dtype=torch.float) - for numel, bias in zip(numels, biases) - ] - ) - self._affine = _Affine(affine_scale, affine_bias) - - @property - def batchnorm(self) -> bool: - """ - Does the hypernet use batchnorm layers - """ - return any(isinstance(m, nn.BatchNorm1d) for m in self.modules()) - - @property - def input_dim(self) -> int: - return self._net[0][0].weight.shape[1] - - @property - def num_layers(self) -> int: - return len([layer for layer in self._net if isinstance(layer, _HyperNetBlock)]) - - @property - def num_units(self) -> int: - return self._net[0][0].weight.shape[0] - - def forward(self, x) -> Tuple[torch.Tensor]: - """ - Just return a flat array of param tensors for now - """ - y = self._affine(self._net(x)) - return tuple( - y[:, i:j] for i, j in zip(self._cum_numel[:-1], self._cum_numel[1:]) - ) - - def get_export_params(self) -> np.ndarray: - params = [] - for block in self._net[:-1]: - linear = block[0] - params.append(linear.weight.flatten()) - params.append(linear.bias.flatten()) - if self.batchnorm: - bn = block[1] - params.append(bn.running_mean.flatten()) - params.append(bn.running_var.flatten()) - params.append(bn.weight.flatten()) - params.append(bn.bias.flatten()) - params.append(torch.Tensor([bn.eps]).to(bn.weight.device)) - assert len(block) <= 3, "Linear-(BN)-activation" - assert ( - len([p for p in block[-1].parameters()]) == 0 - ), "No params in activation" - head = self._net[-1] - params.append(head.weight.flatten()) - params.append(head.bias.flatten()) - affine = self._affine - params.append(affine.weight.flatten()) - params.append(affine.bias.flatten()) - return torch.cat(params).detach().cpu().numpy() - - -class _Activation(abc.ABC): - """ - Indicate that a module is an activation within the main net - """ - - @abc.abstractproperty - def name(self) -> str: - """ - What to call the layer by when making a config w/ it - """ - pass - - -def _extend_activation(C, name: str) -> _Activation: - class Activation(C, _NetLayer, _Activation): - @property - def name(self): - return name - - @property - def num_tensors(self) -> int: - return 0 - - def get_spec(self) -> LayerSpec: - return LayerSpec(None, (), (), ()) - - def forward(self, params, inputs): - return super().forward(inputs) - - return Activation - - -_Tanh = _extend_activation(nn.Tanh, "Tanh") -_ReLU = _extend_activation(nn.ReLU, "ReLU") -_Flatten = _extend_activation(nn.Flatten, "Flatten") # Hah, works - - -def _get_activation(name): - return {"Tanh": _Tanh, "ReLU": _ReLU}[name]() - - -class _HyperNetBlock(nn.Sequential): - """ - For IDing blocks of the hypernet - """ - - pass - - -class HyperConvNet(ParametricBaseNet): - """ - For parameteric data - - Conditioning is input to a hypernetwork that outputs the parameters of the conv net. - """ - - def __init__( - self, hyper_net: HyperNet, net: Callable[[Any, torch.Tensor], torch.Tensor] - ): - super().__init__() - self._hyper_net = hyper_net - self._net = net - - @classmethod - def parse_config(cls, config): - config = super().parse_config(config) - net, specs = cls._get_net(config["net"]) - hyper_net = cls._get_hyper_net(config["hyper_net"], specs) - return {"hyper_net": hyper_net, "net": net} - - @property - def pad_start_default(self) -> bool: - return True - - @property - def receptive_field(self) -> int: - # Last conv is the collapser--compensate w/ a minus 1 - return sum([m.dilation[0] for m in self._net if isinstance(m, _Conv)]) + 1 - 1 - - def export(self, outdir: Path, include_snapshot: bool=False): - """ - Files created: - * config.json - * weights.npy - * test_signal_params.npy - * test_signal_input.npy - * test_signal_output.npy - - weights are serialized to weights.npy in the following order: - * Hypernet - * Loop layers: - * Linear - * weights (din*dout) - * biases (dout) - * BN - * running_mean (d) - * running_var (d) - * weight (d) - * bias (d) - * eps () - * activation - * (Assume no params bc Tanh) - * Linear out - * weights (units*dy) - * bias (dy) - * affine - * weights (dy) - * bias (dy) - * Main net - (Layers are conv-BN-act) - (All params are outputted by the hypernet except the BatchNorm buffers!) - * Loop layers - * BN - * running_mean - * running_var - * eps () - * (flatten: no params) - - A test input & output are also provided, input.npy and output.npy - """ - training = self.training - self.eval() - with open(Path(outdir, "config.json"), "w") as fp: - json.dump(self._export_config(), fp, indent=4) - - # Hope I don't regret using np.save... - np.save(Path(outdir, "weights.npy"), self._export_weights()) - - # And an input/output to verify correct computation: - if include_snapshot: - params, x, y = self._export_input_output() - np.save(Path(outdir, "test_signal_params.npy"), params.detach().cpu().numpy()) - np.save(Path(outdir, "test_signal_input.npy"), x.detach().cpu().numpy()) - np.save(Path(outdir, "test_signal_output.npy"), y.detach().cpu().numpy()) - - # And resume training state - self.train(training) - - def export_cpp_header(self, filename: Path): - with TemporaryDirectory() as tmpdir: - tmpdir = Path(tmpdir) - self.export(Path(tmpdir)) - with open(Path(tmpdir, "config.json"), "r") as fp: - _c = json.load(fp) - version = _c["version"] - config = _c["config"] - params = np.load(Path(tmpdir, "weights.npy")) - with open(filename, "w") as f: - f.writelines( - ( - "#pragma once\n", - "// Automatically-generated model file\n", - "// HyperConvNet model\n" "#include \n", - f'#define PYTHON_MODEL_VERSION "{version}"\n', - f'const int HYPER_NET_INPUT_DIM = {config["hyper_net"]["input_dim"]};\n', - f'const int HYPER_NET_NUM_LAYERS = {config["hyper_net"]["num_layers"]};\n', - f'const int HYPER_NET_NUM_UNITS = {config["hyper_net"]["num_units"]};\n', - f'const int HYPER_NET_BATCHNORM = {"true" if config["hyper_net"]["batchnorm"] else "false"};\n', - f"const int CHANNELS = {config['net']['channels']};\n", - f"const bool BATCHNORM = {'true' if config['net']['batchnorm'] else 'false'};\n", - "std::vector DILATIONS{" - + ",".join([str(d) for d in config["net"]["dilations"]]) - + "};\n", - f"const std::string ACTIVATION = \"{config['net']['activation']}\";\n", - "std::vector PARAMS{" - + ",".join([f"{w:.16f}" for w in params]) - + "};\n", - ) - ) - - @classmethod - def _get_net(cls, config): - channels = config["channels"] - dilations = config["dilations"] - batchnorm = config["batchnorm"] - activation = config["activation"] - - layers = [] - layer_specs = [] - cin = 1 - for dilation in dilations: - layer = _Conv(cin, channels, 2, dilation=dilation, bias=not batchnorm) - layers.append(layer) - layer_specs.append(layer.get_spec()) - if batchnorm: - # Slow momentum on main net bc it's wild - layer = _BatchNorm(channels, momentum=0.01) - layers.append(layer) - layer_specs.append(layer.get_spec()) - layer = _get_activation(activation) - layers.append(layer) - layer_specs.append(layer.get_spec()) - cin = channels - layer = _Conv(cin, 1, 1) - layers.append(layer) - layer_specs.append(layer.get_spec()) - layer = _Flatten() - layers.append(layer) - layer_specs.append(layer.get_spec()) - - return nn.ModuleList(layers), layer_specs - - @classmethod - def _get_hyper_net(cls, config, specs) -> HyperNet: - def block(dx, dy, batchnorm) -> _HyperNetBlock: - layer_list = [nn.Linear(dx, dy)] - if batchnorm: - layer_list.append(nn.BatchNorm1d(dy)) - layer_list.append(nn.ReLU()) - return _HyperNetBlock(*layer_list) - - num_inputs = config["num_inputs"] - num_layers = config["num_layers"] - num_units = config["num_units"] - batchnorm = config["batchnorm"] - # Flatten specs - numels = [np.prod(np.array(shape)) for spec in specs for shape in spec.shapes] - norms = [norm for spec in specs for norm in spec.norms] - biases = [bias for spec in specs for bias in spec.biases] - num_outputs = sum(numels) - - din, layer_list = num_inputs, [] - for _ in range(num_layers): - layer_list.append(block(din, num_units, batchnorm)) - din = num_units - layer_list.append(nn.Linear(din, num_outputs)) - net = nn.Sequential(*layer_list) - - return HyperNet(num_inputs, net, numels, norms, biases) - - @property - def _activation(self) -> str: - """ - What activation does the main net use - """ - for m in self._net.modules(): - if isinstance(m, _Activation): - return m.name - - @property - def _batchnorm(self) -> bool: - return any(isinstance(x, _BatchNorm) for x in self._net) - - @property - def _channels(self) -> int: - return self._net[0].weight.shape[0] - - @property - def _net_no_head(self): - return self._net[:-2] - - def _forward(self, params: torch.Tensor, x: torch.Tensor) -> torch.Tensor: - net_params = self._hyper_net(params) - i = 0 - for m in self._net: - j = i + m.num_tensors - x = m(net_params[i:j], x) - i = j - assert j == len(net_params) - return x - - def _get_dilations(self) -> List[int]: - dilations = [] - for ( - layer - ) in self._net_no_head: # Last two layers are a 1D conv head and flatten - if isinstance(layer, _Conv): - dilations.append(layer.dilation[0]) - return dilations - - def _export_config(self): - return { - "version": __version__, - "architecture": "HyperConvNet", - "config": { - "hyper_net": { - "input_dim": self._hyper_net.input_dim, - "num_layers": self._hyper_net.num_layers, - "num_units": self._hyper_net.num_units, - "batchnorm": self._hyper_net.batchnorm, - }, - "net": { - "channels": self._channels, - "dilations": self._get_dilations(), - "batchnorm": self._batchnorm, - "activation": self._activation, - }, - }, - } - - def _export_weights(self) -> np.ndarray: - """ - Flatten the parameters of the network to be exported. - See doctsring for .export() for ensured layout. - """ - return np.concatenate( - [self._hyper_net.get_export_params(), self._export_net_weights()] - ) - - def _export_net_weights(self) -> np.ndarray: - """ - Only the buffers--parameters are outputted by the hypernet! - """ - params = [] - for bn in self._net_no_head: - if isinstance(bn, _BatchNorm): - params.append(bn.running_mean.flatten()) - params.append(bn.running_var.flatten()) - params.append(torch.Tensor([bn.eps]).to(bn.running_mean.device)) - return ( - np.array([]) - if len(params) == 0 - else torch.cat(params).detach().cpu().numpy() - ) - - def _export_input_output(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - params = torch.randn((self._hyper_net.input_dim,)) - x = torch.randn((2 * self.receptive_field,)) - x = 0.5 * x / x.abs().max() - y = self(params, x) - return params, x, y diff --git a/nam/models/parametric/params.py b/nam/models/parametric/params.py deleted file mode 100644 index f7ffcd8e..00000000 --- a/nam/models/parametric/params.py +++ /dev/null @@ -1,71 +0,0 @@ -# File: params.py -# Created Date: Sunday July 17th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -""" -Handling parametric inputs -""" - -import abc -import inspect -from dataclasses import dataclass, fields -from enum import Enum -from typing import Any - -from ..._core import InitializableFromConfig - - -# class ParamType(Enum): -# CONTINUOUS = "continuous" -# BOOLEAN = "boolean" - - -@dataclass -class Param(InitializableFromConfig): - default_value: Any - - @classmethod - def init_from_config(cls, config): - C, kwargs = cls.parse_config(config) - return C(**kwargs) - - @classmethod - def parse_config(cls, config): - for C in [ - _C - for _C in globals().values() - if inspect.isclass(_C) and _C is not Param and issubclass(_C, Param) - ]: - if C.typestr() == config["type"]: - config.pop("type") - break - else: - raise ValueError(f"Unrecognized aprameter type {config['type']}") - return C, config - - @abc.abstractclassmethod - def typestr(cls) -> str: - pass - - def to_json(self): - return { - "type": self.typestr(), - **{f.name: getattr(self, f.name) for f in fields(self)}, - } - - -@dataclass -class BooleanParam(Param): - @classmethod - def typestr(cls) -> str: - return "boolean" - - -@dataclass -class ContinuousParam(Param): - minval: float - maxval: float - - @classmethod - def typestr(self) -> str: - return "continuous" diff --git a/tests/test_nam/test_models/test_parametric/__init__.py b/tests/test_nam/test_models/test_parametric/__init__.py deleted file mode 100644 index ab3731ed..00000000 --- a/tests/test_nam/test_models/test_parametric/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# File: __init__.py -# Created Date: Sunday July 17th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) diff --git a/tests/test_nam/test_models/test_parametric/test_catnets.py b/tests/test_nam/test_models/test_parametric/test_catnets.py deleted file mode 100644 index 05367a4a..00000000 --- a/tests/test_nam/test_models/test_parametric/test_catnets.py +++ /dev/null @@ -1,50 +0,0 @@ -# File: test_catnets.py -# Created Date: Sunday July 17th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -from pathlib import Path -from tempfile import TemporaryDirectory - -import pytest - -from ..base import Base - -from nam.models.parametric import catnets, params - - -_mock_params = { - "gain": params.ContinuousParam(0.5, 0.0, 1.0), - "tone": params.ContinuousParam(0.5, 0.0, 1.0), - "level": params.ContinuousParam(0.5, 0.0, 1.0), -} - - -class _ParametricBase(Base): - pass - - -class TestCatLSTM(_ParametricBase): - @classmethod - def setup_class(cls): - # Using init_from_config - return super().setup_class( - catnets.CatLSTM, - args=(), - kwargs={ - "num_layers": 1, - "hidden_size": 2, - "train_truncate": 11, - "train_burn_in": 7, - "input_size": 1 + len(_mock_params), - }, - ) - - def test_export(self, args=None, kwargs=None): - # Override to provide params info - model = self._construct(args=args, kwargs=kwargs) - with TemporaryDirectory() as tmpdir: - model.export(Path(tmpdir), _mock_params) - - -if __name__ == "__main__": - pytest.main() diff --git a/tests/test_nam/test_models/test_parametric/test_hyper_net.py b/tests/test_nam/test_models/test_parametric/test_hyper_net.py deleted file mode 100644 index 7a28b055..00000000 --- a/tests/test_nam/test_models/test_parametric/test_hyper_net.py +++ /dev/null @@ -1,86 +0,0 @@ -# File: test_hyper_net.py -# Created Date: Saturday June 4th 2022 -# Author: Steven Atkinson (steven@atkinson.mn) - -from pathlib import Path -from tempfile import TemporaryDirectory - -import pytest - -from nam.models.parametric import hyper_net - -from ..base import Base - - -class TestHyperConvNet(Base): - @classmethod - def setup_class(cls): - return super().setup_class(hyper_net.HyperConvNet, (), {}) - - @pytest.mark.parametrize( - ("batchnorm,activation"), ((False, "ReLU"), (True, "Tanh")) - ) - def test_init(self, batchnorm, activation): - # TODO refactor - channels = 3 - dilations = [1, 2, 4] - assert isinstance( - self._construct( - self._config( - batchnorm=batchnorm, - activation=activation, - dilations=dilations, - channels=channels, - ) - ), - hyper_net.HyperConvNet, - ) - - @pytest.mark.parametrize( - ("batchnorm,activation"), ((False, "ReLU"), (True, "Tanh")) - ) - def test_export(self, batchnorm, activation): - # TODO refactor - channels = 3 - dilations = [1, 2, 4] - model = self._construct( - self._config( - batchnorm=batchnorm, - activation=activation, - dilations=dilations, - channels=channels, - ) - ) - with TemporaryDirectory() as tmpdir: - model.export(Path(tmpdir)) - - def test_export_cpp_header(self): - # TODO refactor - with TemporaryDirectory() as tmpdir: - self._construct().export_cpp_header(Path(tmpdir, "model.h")) - - def _config(self, batchnorm=True, activation="Tanh", dilations=None, channels=7): - dilations = [1, 2, 4] if dilations is None else dilations - return { - "net": { - "channels": channels, - "dilations": dilations, - "batchnorm": batchnorm, - "activation": activation, - }, - "hyper_net": { - "num_inputs": 3, - "num_layers": 2, - "num_units": 11, - "batchnorm": True, - }, - } - - def _construct(self, config=None): - # Override for simplicity... - config = self._config() if config is None else config - return self._C.init_from_config(config) - - -if __name__ == "__main__": - pytest.main()