From 6c7abcef18cc985270b304af96e45f3345760eaa Mon Sep 17 00:00:00 2001 From: Jaakko Pasanen Date: Sun, 11 Sep 2022 11:28:32 +0300 Subject: [PATCH] Removed biquad and reworked peq2geq.py to use PEQ. Added rudimentatary unit tests for autoeq.py. --- autoeq.py | 138 +++++++++++++++++++----- frequency_response.py | 6 +- biquad.py => research/neo_peq/biquad.py | 0 research/peq2geq.py | 28 +++-- 4 files changed, 137 insertions(+), 35 deletions(-) rename biquad.py => research/neo_peq/biquad.py (100%) diff --git a/autoeq.py b/autoeq.py index 431b6086728..38ff3ece849 100644 --- a/autoeq.py +++ b/autoeq.py @@ -1,9 +1,15 @@ # -*- coding: utf-8 -*- import os +import re +import shutil +import tempfile +import unittest from glob import glob import argparse import multiprocessing +from pathlib import Path +import pandas as pd import soundfile as sf from time import time import numpy as np @@ -18,7 +24,7 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardize_input=False, compensation=None, - equalize=False, parametric_eq=False, fixed_band_eq=False, rockbox=False, fc=None, q=None, + equalize=False, parametric_eq=False, fixed_band_eq=False, rockbox=False, ten_band_eq=False, parametric_eq_config=None, fixed_band_eq_config=None, convolution_eq=False, fs=DEFAULT_FS, bit_depth=DEFAULT_BIT_DEPTH, phase=DEFAULT_PHASE, f_res=DEFAULT_F_RES, bass_boost_gain=DEFAULT_BASS_BOOST_GAIN, bass_boost_fc=DEFAULT_BASS_BOOST_FC, @@ -50,7 +56,7 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz elif bit_depth == 32: bit_depth = "PCM_32" else: - raise ValueError('Invalid bit depth. Accepted values are 16, 24 e 32.') + raise ValueError('Invalid bit depth. Accepted values are 16, 24 and 32.') if sound_signature is not None: sound_signature = FrequencyResponse.read_from_csv(sound_signature) @@ -68,7 +74,8 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz else: if type(parametric_eq_config) is str: parametric_eq_config = [parametric_eq_config] - parametric_eq_config = [PEQ_CONFIGS[config_name] for config_name in parametric_eq_config] + parametric_eq_config = [ + PEQ_CONFIGS[config] if type(config) is str else config for config in parametric_eq_config] if fixed_band_eq_config is not None and os.path.isfile(fixed_band_eq_config): # Parametric EQ config is a file path @@ -88,9 +95,9 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz file_paths.append((input_file_path, output_file_path)) n_total += 1 args = (input_file_path, output_file_path, bass_boost_fc, bass_boost_gain, bass_boost_q, bit_depth, - compensation, convolution_eq, equalize, f_res, fc, fixed_band_eq, fs, parametric_eq_config, + compensation, convolution_eq, equalize, f_res, fixed_band_eq, fs, parametric_eq_config, fixed_band_eq_config, max_gain, window_size, treble_window_size, - parametric_eq, phase, q, rockbox, show_plot, sound_signature, standardize_input, + parametric_eq, phase, rockbox, show_plot, sound_signature, standardize_input, ten_band_eq, tilt, treble_f_lower, treble_f_upper, treble_gain_k) args_list.append(args) @@ -102,10 +109,7 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz for result in tqdm.tqdm( pool.imap_unordered(process_file_wrapper, args_list, chunksize=1), total=len(args_list)): results.append(result) - - print('Updated results:\n') - for result in results: - print(result) + return results def process_file_wrapper(params): @@ -113,8 +117,8 @@ def process_file_wrapper(params): def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_gain, bass_boost_q, bit_depth, - compensation, convolution_eq, equalize, f_res, fc, fixed_band_eq, fs, parametric_eq_config, - fixed_band_eq_config, max_gain, window_size, treble_window_size, parametric_eq, phase, q, rockbox, + compensation, convolution_eq, equalize, f_res, fixed_band_eq, fs, parametric_eq_config, + fixed_band_eq_config, max_gain, window_size, treble_window_size, parametric_eq, phase, rockbox, show_plot, sound_signature, standardize_input, ten_band_eq, tilt, treble_f_lower, treble_f_upper, treble_gain_k): start_time = time() @@ -137,8 +141,6 @@ def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_ga equalize=equalize, parametric_eq=parametric_eq, fixed_band_eq=fixed_band_eq, - fc=fc, - q=q, ten_band_eq=ten_band_eq, parametric_eq_config=parametric_eq_config, fixed_band_eq_config=fixed_band_eq_config, @@ -223,7 +225,7 @@ def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_ga elif show_plot: fr.plot_graph(show=True, close=False) - return f'{time() - start_time:.0f}s: {fr.name} done.' + return fr def cli_args(): @@ -252,11 +254,6 @@ def cli_args(): arg_parser.add_argument('--rockbox', action='store_true', help='Will produce a Rockbox .cfg file with 10 band eq settings if this parameter exists,' 'no value needed.') - arg_parser.add_argument('--fc', type=str, help='Comma separated list of center frequencies for fixed band eq.') - arg_parser.add_argument('--q', type=str, - help='Comma separated list of Q values for fixed band eq. If only one ' - 'value is passed it is used for all bands. Q value can be ' - 'calculated from bandwidth in N octaves by Q = 2^(N/2)/(2^N-1).') arg_parser.add_argument('--ten_band_eq', action='store_true', help='Shortcut parameter for activating standard ten band eq optimization.') arg_parser.add_argument('--parametric_eq_config', type=str, default='4_PEAKING_WITH_LOW_SHELF,4_PEAKING_WITH_HIGH_SHELF', @@ -372,11 +369,6 @@ def cli_args(): # Named configurations, split by commas args['parametric_eq_config'] = args['parametric_eq_config'].split(',') - if 'fc' in args and args['fc'] is not None: - args['fc'] = [float(x) for x in args['fc'].split(',')] - if 'q' in args and args['q'] is not None: - args['q'] = [float(x) for x in args['q'].split(',')] - if 'fs' in args and args['fs'] is not None: args['fs'] = [int(x) for x in args['fs'].split(',')] @@ -396,3 +388,101 @@ def cli_args(): if __name__ == '__main__': batch_processing(**cli_args()) + + +class TestAutoEq(unittest.TestCase): + def setUp(self): + self._root = Path(tempfile.gettempdir()).joinpath(os.urandom(24).hex()) + self._input = self._root.joinpath('input') + self._output = self._root.joinpath('output') + for i in range(1, 3): + path = self._input.joinpath(f'Headphone {i}', f'Headphone {i}.csv') + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, 'w') as fh: + fh.write('frequency,raw\n20,2\n50,2\n200,0\n1000,1\n3000,10\n10000,0\n20000,-15') + self._compensation = self._root.joinpath('compensation.csv') + with open(self._compensation, 'w') as fh: + fr = FrequencyResponse( + name='compensation', + frequency=[20, 50, 200, 1000, 3000, 10000, 20000], + raw=[6, 6, -1, 0, 8, 1, -10]) + fr.interpolate(pol_order=2) + fr.smoothen_fractional_octave(window_size=2, treble_window_size=2) + fr.center() + fr.write_to_csv(self._compensation) + self._sound_signature = self._root.joinpath('sound_signature.csv') + with open(self._sound_signature, 'w') as fh: + fh.write('frequency,raw\n20.0,0\n10000,0.0\n20000,3') + + def tearDown(self): + shutil.rmtree(self._root) + + def test_batch_processing(self): + self.assertTrue(self._input.joinpath('Headphone 1', 'Headphone 1.csv').exists()) + self.assertTrue(self._input.joinpath('Headphone 2', 'Headphone 2.csv').exists()) + frs = batch_processing( + input_dir=self._input, output_dir=self._output, standardize_input=True, compensation=self._compensation, + equalize=True, parametric_eq=True, fixed_band_eq=True, rockbox=True, + ten_band_eq=True, + parametric_eq_config=['4_PEAKING_WITH_LOW_SHELF', PEQ_CONFIGS['4_PEAKING_WITH_HIGH_SHELF']], + fixed_band_eq_config=None, convolution_eq=True, + fs=[44100, 48000], bit_depth=DEFAULT_BIT_DEPTH, phase='both', f_res=DEFAULT_F_RES, + bass_boost_gain=DEFAULT_BASS_BOOST_GAIN, bass_boost_fc=DEFAULT_BASS_BOOST_FC, + bass_boost_q=DEFAULT_BASS_BOOST_Q, tilt=-0.2, sound_signature=self._sound_signature, + max_gain=DEFAULT_MAX_GAIN, + window_size=DEFAULT_SMOOTHING_WINDOW_SIZE, treble_window_size=DEFAULT_TREBLE_SMOOTHING_WINDOW_SIZE, + treble_f_lower=DEFAULT_TREBLE_F_LOWER, treble_f_upper=DEFAULT_TREBLE_F_UPPER, + treble_gain_k=DEFAULT_TREBLE_GAIN_K, show_plot=False, thread_count=1 + ) + self.assertEqual(len(frs), 2) + + self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1.png').exists()) + + # CSV file + self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1.csv').exists()) + df = pd.read_csv(self._output.joinpath('Headphone 1', 'Headphone 1.csv')) + columns = 'frequency,raw,error,smoothed,error_smoothed,equalization,parametric_eq,fixed_band_eq,' \ + 'equalized_raw,equalized_smoothed,target'.split(',') + self.assertEqual(list(df.columns), columns) + self.assertEqual(df.size, 695 * len(columns)) + + # Graphic equalizer + self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 GraphicEQ.txt').exists()) + with open(self._output.joinpath('Headphone 1', 'Headphone 1 GraphicEQ.txt')) as fh: + self.assertRegexpMatches(fh.read().strip() + '; ', r'GraphicEQ: \d{2,5} (-?\d(\.\d+)?; )+') + + # Fixed band equalizer + self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 FixedBandEq.txt').exists()) + with open(self._output.joinpath('Headphone 1', 'Headphone 1 FixedBandEq.txt')) as fh: + lines = fh.read().strip().split('\n') + self.assertTrue(re.match(r'Preamp: -?\d+(\.\d+)? dB', lines[0])) + for line in lines[1:]: + self.assertRegexpMatches(line, r'Filter \d{1,2}: ON PK Fc \d{2,5} Hz Gain -?\d(\.\d+)? dB Q 1.41') + + # Parametric equalizer + self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 ParametricEq.txt').exists()) + with open(self._output.joinpath('Headphone 1', 'Headphone 1 ParametricEq.txt')) as fh: + lines = fh.read().strip().split('\n') + self.assertTrue(re.match(r'Preamp: -?\d+(\.\d+)? dB', lines[0])) + for line in lines[1:]: + self.assertRegexpMatches( + line, r'Filter \d{1,2}: ON (PK|LS|HS) Fc \d{2,5} Hz Gain -?\d(\.\d+)? dB Q \d(\.\d+)?') + + # Convolution (FIR) filters + for phase in ['minimum', 'linear']: + for fs in [44100, 48000]: + fp = self._output.joinpath('Headphone 1', f'Headphone 1 {phase} phase {fs}Hz.wav') + self.assertTrue(fp.exists()) + # Frequency resolution is 10, 2 channels, 16 bits per sample, 8 bits per byte + # Real file size has headers + min_size = fs / 10 * 2 * 16 / 8 + self.assertGreater(os.stat(fp).st_size, min_size) + + # README + self.assertTrue(self._output.joinpath('Headphone 1', 'README.md').exists()) + with open(self._output.joinpath('Headphone 1', 'README.md')) as fh: + s = fh.read().strip() + self.assertTrue('# Headphone 1' in s) + self.assertTrue('### Parametric EQs' in s) + self.assertTrue('### Fixed Band EQs' in s) + self.assertTrue('### Graphs' in s) diff --git a/frequency_response.py b/frequency_response.py index 02c337ce407..5a878772e8b 100644 --- a/frequency_response.py +++ b/frequency_response.py @@ -166,7 +166,7 @@ def read_from_csv(cls, file_path): # Regex for AutoEq style CSV header_pattern = r'frequency(,(raw|smoothed|error|error_smoothed|equalization|parametric_eq|fixed_band_eq|equalized_raw|equalized_smoothed|target))+' - float_pattern = r'-?\d+\.?\d+' + float_pattern = r'-?\d+(\.\d+)?' data_2_pattern = r'{fl}[ ,;:\t]+{fl}?'.format(fl=float_pattern) data_n_pattern = r'{fl}([ ,;:\t]+{fl})+?'.format(fl=float_pattern) autoeq_pattern = r'^{header}(\n{data})+\n*$'.format(header=header_pattern, data=data_n_pattern) @@ -1252,8 +1252,6 @@ def process(self, equalize=False, parametric_eq=False, fixed_band_eq=False, - fc=None, - q=None, ten_band_eq=None, parametric_eq_config=None, fixed_band_eq_config=None, @@ -1280,8 +1278,6 @@ def process(self, equalize: Run equalization? parametric_eq: Optimize peaking filters for parametric eq? fixed_band_eq: Optimize peaking filters for fixed band (graphic) eq? - fc: List of center frequencies for fixed band eq - q: List of Q values for fixed band eq ten_band_eq: Optimize filters for standard ten band eq? parametric_eq_config: List of PEQ config dicts fixed_band_eq_config: PEQ config dict for fixed band equalizer diff --git a/biquad.py b/research/neo_peq/biquad.py similarity index 100% rename from biquad.py rename to research/neo_peq/biquad.py diff --git a/research/peq2geq.py b/research/peq2geq.py index f9fbbb6aee3..3bb1117bba5 100644 --- a/research/peq2geq.py +++ b/research/peq2geq.py @@ -1,5 +1,6 @@ import os import sys +import tempfile import unittest from pathlib import Path from argparse import ArgumentParser, SUPPRESS @@ -12,7 +13,6 @@ from constants import DEFAULT_FS from peq import PEQ, Peaking, LowShelf, HighShelf - classes = {'PK': Peaking, 'LS': LowShelf, 'HS': HighShelf} @@ -33,19 +33,19 @@ def read_eqapo(file_path): fcs = [] qs = [] gains = [] - filts = [] + types = [] for line in lines: if line[0] == '#': # Comment line continue tokens = line.split() - if tokens[0] == 'Filter:': + if tokens[0] == 'Filter': fcs.append(float(tokens[tokens.index('Fc') + 1])) qs.append(float(tokens[tokens.index('Q') + 1])) gains.append(float(tokens[tokens.index('Gain') + 1])) - filts.append(re.search(r'(PK|LS|HS)', line)[0]) + types.append(re.search(r'(PK|LS|HS)', line)[0]) else: print(f'Unsupported EqualizerAPO control type "{line}"') - return fcs, qs, gains, filts + return fcs, qs, gains, types def main(): @@ -85,7 +85,7 @@ def main(): class TestPeq2Geq(unittest.TestCase): - def test(self): + def test_peq2geq(self): f = FrequencyResponse.generate_frequencies() peq = PEQ(f, DEFAULT_FS, filters=[ Peaking(f, DEFAULT_FS, fc=500, q=1.41, gain=2), @@ -103,3 +103,19 @@ def test(self): fr_geq.interpolate() fr_geq.raw -= np.mean(peq.fr - fr_geq.raw) self.assertLess(np.mean(np.abs(peq.fr - fr_geq.raw)), 0.1) + + def test_read_eqapo(self): + s = 'Filter 1: ON LS Fc 105 Hz Gain -0.5 dB Q 0.70\n' \ + 'Filter 3: ON PK Fc 1773 Hz Gain 3.3 dB Q 1.83\n' \ + 'Filter 10: ON HS Fc 10000 Hz Gain -1.7 dB Q 0.70\n' + fp = Path('test_read_eqapo.txt') + with open(fp, 'w', encoding='utf-8') as fh: + fh.write(s) + fcs, qs, gains, types = read_eqapo(fp) + fp.unlink(missing_ok=True) + self.assertEqual(fcs, [105, 1773, 10000]) + self.assertEqual(qs, [0.7, 1.83, 0.7]) + self.assertEqual(gains, [-0.5, 3.3, -1.7]) + self.assertEqual(types, ['LS', 'PK', 'HS']) + +