Skip to content

Commit

Permalink
Removed biquad and reworked peq2geq.py to use PEQ. Added rudimentatar…
Browse files Browse the repository at this point in the history
…y unit tests for autoeq.py.
  • Loading branch information
jaakkopasanen committed Sep 11, 2022
1 parent dc68eef commit 6c7abce
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 35 deletions.
138 changes: 114 additions & 24 deletions autoeq.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# -*- coding: utf-8 -*-

import os
import re
import shutil
import tempfile
import unittest
from glob import glob
import argparse
import multiprocessing
from pathlib import Path
import pandas as pd
import soundfile as sf
from time import time
import numpy as np
Expand All @@ -18,7 +24,7 @@


def batch_processing(input_dir=None, output_dir=None, new_only=False, standardize_input=False, compensation=None,
equalize=False, parametric_eq=False, fixed_band_eq=False, rockbox=False, fc=None, q=None,
equalize=False, parametric_eq=False, fixed_band_eq=False, rockbox=False,
ten_band_eq=False, parametric_eq_config=None, fixed_band_eq_config=None, convolution_eq=False,
fs=DEFAULT_FS, bit_depth=DEFAULT_BIT_DEPTH, phase=DEFAULT_PHASE, f_res=DEFAULT_F_RES,
bass_boost_gain=DEFAULT_BASS_BOOST_GAIN, bass_boost_fc=DEFAULT_BASS_BOOST_FC,
Expand Down Expand Up @@ -50,7 +56,7 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz
elif bit_depth == 32:
bit_depth = "PCM_32"
else:
raise ValueError('Invalid bit depth. Accepted values are 16, 24 e 32.')
raise ValueError('Invalid bit depth. Accepted values are 16, 24 and 32.')

if sound_signature is not None:
sound_signature = FrequencyResponse.read_from_csv(sound_signature)
Expand All @@ -68,7 +74,8 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz
else:
if type(parametric_eq_config) is str:
parametric_eq_config = [parametric_eq_config]
parametric_eq_config = [PEQ_CONFIGS[config_name] for config_name in parametric_eq_config]
parametric_eq_config = [
PEQ_CONFIGS[config] if type(config) is str else config for config in parametric_eq_config]

if fixed_band_eq_config is not None and os.path.isfile(fixed_band_eq_config):
# Parametric EQ config is a file path
Expand All @@ -88,9 +95,9 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz
file_paths.append((input_file_path, output_file_path))
n_total += 1
args = (input_file_path, output_file_path, bass_boost_fc, bass_boost_gain, bass_boost_q, bit_depth,
compensation, convolution_eq, equalize, f_res, fc, fixed_band_eq, fs, parametric_eq_config,
compensation, convolution_eq, equalize, f_res, fixed_band_eq, fs, parametric_eq_config,
fixed_band_eq_config, max_gain, window_size, treble_window_size,
parametric_eq, phase, q, rockbox, show_plot, sound_signature, standardize_input,
parametric_eq, phase, rockbox, show_plot, sound_signature, standardize_input,
ten_band_eq, tilt, treble_f_lower, treble_f_upper, treble_gain_k)
args_list.append(args)

Expand All @@ -102,19 +109,16 @@ def batch_processing(input_dir=None, output_dir=None, new_only=False, standardiz
for result in tqdm.tqdm(
pool.imap_unordered(process_file_wrapper, args_list, chunksize=1), total=len(args_list)):
results.append(result)

print('Updated results:\n')
for result in results:
print(result)
return results


def process_file_wrapper(params):
return process_file(*params)


def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_gain, bass_boost_q, bit_depth,
compensation, convolution_eq, equalize, f_res, fc, fixed_band_eq, fs, parametric_eq_config,
fixed_band_eq_config, max_gain, window_size, treble_window_size, parametric_eq, phase, q, rockbox,
compensation, convolution_eq, equalize, f_res, fixed_band_eq, fs, parametric_eq_config,
fixed_band_eq_config, max_gain, window_size, treble_window_size, parametric_eq, phase, rockbox,
show_plot, sound_signature, standardize_input, ten_band_eq, tilt, treble_f_lower, treble_f_upper,
treble_gain_k):
start_time = time()
Expand All @@ -137,8 +141,6 @@ def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_ga
equalize=equalize,
parametric_eq=parametric_eq,
fixed_band_eq=fixed_band_eq,
fc=fc,
q=q,
ten_band_eq=ten_band_eq,
parametric_eq_config=parametric_eq_config,
fixed_band_eq_config=fixed_band_eq_config,
Expand Down Expand Up @@ -223,7 +225,7 @@ def process_file(input_file_path, output_file_path, bass_boost_fc, bass_boost_ga
elif show_plot:
fr.plot_graph(show=True, close=False)

return f'{time() - start_time:.0f}s: {fr.name} done.'
return fr


def cli_args():
Expand Down Expand Up @@ -252,11 +254,6 @@ def cli_args():
arg_parser.add_argument('--rockbox', action='store_true',
help='Will produce a Rockbox .cfg file with 10 band eq settings if this parameter exists,'
'no value needed.')
arg_parser.add_argument('--fc', type=str, help='Comma separated list of center frequencies for fixed band eq.')
arg_parser.add_argument('--q', type=str,
help='Comma separated list of Q values for fixed band eq. If only one '
'value is passed it is used for all bands. Q value can be '
'calculated from bandwidth in N octaves by Q = 2^(N/2)/(2^N-1).')
arg_parser.add_argument('--ten_band_eq', action='store_true',
help='Shortcut parameter for activating standard ten band eq optimization.')
arg_parser.add_argument('--parametric_eq_config', type=str, default='4_PEAKING_WITH_LOW_SHELF,4_PEAKING_WITH_HIGH_SHELF',
Expand Down Expand Up @@ -372,11 +369,6 @@ def cli_args():
# Named configurations, split by commas
args['parametric_eq_config'] = args['parametric_eq_config'].split(',')

if 'fc' in args and args['fc'] is not None:
args['fc'] = [float(x) for x in args['fc'].split(',')]
if 'q' in args and args['q'] is not None:
args['q'] = [float(x) for x in args['q'].split(',')]

if 'fs' in args and args['fs'] is not None:
args['fs'] = [int(x) for x in args['fs'].split(',')]

Expand All @@ -396,3 +388,101 @@ def cli_args():

if __name__ == '__main__':
batch_processing(**cli_args())


class TestAutoEq(unittest.TestCase):
def setUp(self):
self._root = Path(tempfile.gettempdir()).joinpath(os.urandom(24).hex())
self._input = self._root.joinpath('input')
self._output = self._root.joinpath('output')
for i in range(1, 3):
path = self._input.joinpath(f'Headphone {i}', f'Headphone {i}.csv')
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as fh:
fh.write('frequency,raw\n20,2\n50,2\n200,0\n1000,1\n3000,10\n10000,0\n20000,-15')
self._compensation = self._root.joinpath('compensation.csv')
with open(self._compensation, 'w') as fh:
fr = FrequencyResponse(
name='compensation',
frequency=[20, 50, 200, 1000, 3000, 10000, 20000],
raw=[6, 6, -1, 0, 8, 1, -10])
fr.interpolate(pol_order=2)
fr.smoothen_fractional_octave(window_size=2, treble_window_size=2)
fr.center()
fr.write_to_csv(self._compensation)
self._sound_signature = self._root.joinpath('sound_signature.csv')
with open(self._sound_signature, 'w') as fh:
fh.write('frequency,raw\n20.0,0\n10000,0.0\n20000,3')

def tearDown(self):
shutil.rmtree(self._root)

def test_batch_processing(self):
self.assertTrue(self._input.joinpath('Headphone 1', 'Headphone 1.csv').exists())
self.assertTrue(self._input.joinpath('Headphone 2', 'Headphone 2.csv').exists())
frs = batch_processing(
input_dir=self._input, output_dir=self._output, standardize_input=True, compensation=self._compensation,
equalize=True, parametric_eq=True, fixed_band_eq=True, rockbox=True,
ten_band_eq=True,
parametric_eq_config=['4_PEAKING_WITH_LOW_SHELF', PEQ_CONFIGS['4_PEAKING_WITH_HIGH_SHELF']],
fixed_band_eq_config=None, convolution_eq=True,
fs=[44100, 48000], bit_depth=DEFAULT_BIT_DEPTH, phase='both', f_res=DEFAULT_F_RES,
bass_boost_gain=DEFAULT_BASS_BOOST_GAIN, bass_boost_fc=DEFAULT_BASS_BOOST_FC,
bass_boost_q=DEFAULT_BASS_BOOST_Q, tilt=-0.2, sound_signature=self._sound_signature,
max_gain=DEFAULT_MAX_GAIN,
window_size=DEFAULT_SMOOTHING_WINDOW_SIZE, treble_window_size=DEFAULT_TREBLE_SMOOTHING_WINDOW_SIZE,
treble_f_lower=DEFAULT_TREBLE_F_LOWER, treble_f_upper=DEFAULT_TREBLE_F_UPPER,
treble_gain_k=DEFAULT_TREBLE_GAIN_K, show_plot=False, thread_count=1
)
self.assertEqual(len(frs), 2)

self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1.png').exists())

# CSV file
self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1.csv').exists())
df = pd.read_csv(self._output.joinpath('Headphone 1', 'Headphone 1.csv'))
columns = 'frequency,raw,error,smoothed,error_smoothed,equalization,parametric_eq,fixed_band_eq,' \
'equalized_raw,equalized_smoothed,target'.split(',')
self.assertEqual(list(df.columns), columns)
self.assertEqual(df.size, 695 * len(columns))

# Graphic equalizer
self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 GraphicEQ.txt').exists())
with open(self._output.joinpath('Headphone 1', 'Headphone 1 GraphicEQ.txt')) as fh:
self.assertRegexpMatches(fh.read().strip() + '; ', r'GraphicEQ: \d{2,5} (-?\d(\.\d+)?; )+')

# Fixed band equalizer
self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 FixedBandEq.txt').exists())
with open(self._output.joinpath('Headphone 1', 'Headphone 1 FixedBandEq.txt')) as fh:
lines = fh.read().strip().split('\n')
self.assertTrue(re.match(r'Preamp: -?\d+(\.\d+)? dB', lines[0]))
for line in lines[1:]:
self.assertRegexpMatches(line, r'Filter \d{1,2}: ON PK Fc \d{2,5} Hz Gain -?\d(\.\d+)? dB Q 1.41')

# Parametric equalizer
self.assertTrue(self._output.joinpath('Headphone 1', 'Headphone 1 ParametricEq.txt').exists())
with open(self._output.joinpath('Headphone 1', 'Headphone 1 ParametricEq.txt')) as fh:
lines = fh.read().strip().split('\n')
self.assertTrue(re.match(r'Preamp: -?\d+(\.\d+)? dB', lines[0]))
for line in lines[1:]:
self.assertRegexpMatches(
line, r'Filter \d{1,2}: ON (PK|LS|HS) Fc \d{2,5} Hz Gain -?\d(\.\d+)? dB Q \d(\.\d+)?')

# Convolution (FIR) filters
for phase in ['minimum', 'linear']:
for fs in [44100, 48000]:
fp = self._output.joinpath('Headphone 1', f'Headphone 1 {phase} phase {fs}Hz.wav')
self.assertTrue(fp.exists())
# Frequency resolution is 10, 2 channels, 16 bits per sample, 8 bits per byte
# Real file size has headers
min_size = fs / 10 * 2 * 16 / 8
self.assertGreater(os.stat(fp).st_size, min_size)

# README
self.assertTrue(self._output.joinpath('Headphone 1', 'README.md').exists())
with open(self._output.joinpath('Headphone 1', 'README.md')) as fh:
s = fh.read().strip()
self.assertTrue('# Headphone 1' in s)
self.assertTrue('### Parametric EQs' in s)
self.assertTrue('### Fixed Band EQs' in s)
self.assertTrue('### Graphs' in s)
6 changes: 1 addition & 5 deletions frequency_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def read_from_csv(cls, file_path):

# Regex for AutoEq style CSV
header_pattern = r'frequency(,(raw|smoothed|error|error_smoothed|equalization|parametric_eq|fixed_band_eq|equalized_raw|equalized_smoothed|target))+'
float_pattern = r'-?\d+\.?\d+'
float_pattern = r'-?\d+(\.\d+)?'
data_2_pattern = r'{fl}[ ,;:\t]+{fl}?'.format(fl=float_pattern)
data_n_pattern = r'{fl}([ ,;:\t]+{fl})+?'.format(fl=float_pattern)
autoeq_pattern = r'^{header}(\n{data})+\n*$'.format(header=header_pattern, data=data_n_pattern)
Expand Down Expand Up @@ -1252,8 +1252,6 @@ def process(self,
equalize=False,
parametric_eq=False,
fixed_band_eq=False,
fc=None,
q=None,
ten_band_eq=None,
parametric_eq_config=None,
fixed_band_eq_config=None,
Expand All @@ -1280,8 +1278,6 @@ def process(self,
equalize: Run equalization?
parametric_eq: Optimize peaking filters for parametric eq?
fixed_band_eq: Optimize peaking filters for fixed band (graphic) eq?
fc: List of center frequencies for fixed band eq
q: List of Q values for fixed band eq
ten_band_eq: Optimize filters for standard ten band eq?
parametric_eq_config: List of PEQ config dicts
fixed_band_eq_config: PEQ config dict for fixed band equalizer
Expand Down
File renamed without changes.
28 changes: 22 additions & 6 deletions research/peq2geq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
import unittest
from pathlib import Path
from argparse import ArgumentParser, SUPPRESS
Expand All @@ -12,7 +13,6 @@
from constants import DEFAULT_FS
from peq import PEQ, Peaking, LowShelf, HighShelf


classes = {'PK': Peaking, 'LS': LowShelf, 'HS': HighShelf}


Expand All @@ -33,19 +33,19 @@ def read_eqapo(file_path):
fcs = []
qs = []
gains = []
filts = []
types = []
for line in lines:
if line[0] == '#': # Comment line
continue
tokens = line.split()
if tokens[0] == 'Filter:':
if tokens[0] == 'Filter':
fcs.append(float(tokens[tokens.index('Fc') + 1]))
qs.append(float(tokens[tokens.index('Q') + 1]))
gains.append(float(tokens[tokens.index('Gain') + 1]))
filts.append(re.search(r'(PK|LS|HS)', line)[0])
types.append(re.search(r'(PK|LS|HS)', line)[0])
else:
print(f'Unsupported EqualizerAPO control type "{line}"')
return fcs, qs, gains, filts
return fcs, qs, gains, types


def main():
Expand Down Expand Up @@ -85,7 +85,7 @@ def main():


class TestPeq2Geq(unittest.TestCase):
def test(self):
def test_peq2geq(self):
f = FrequencyResponse.generate_frequencies()
peq = PEQ(f, DEFAULT_FS, filters=[
Peaking(f, DEFAULT_FS, fc=500, q=1.41, gain=2),
Expand All @@ -103,3 +103,19 @@ def test(self):
fr_geq.interpolate()
fr_geq.raw -= np.mean(peq.fr - fr_geq.raw)
self.assertLess(np.mean(np.abs(peq.fr - fr_geq.raw)), 0.1)

def test_read_eqapo(self):
s = 'Filter 1: ON LS Fc 105 Hz Gain -0.5 dB Q 0.70\n' \
'Filter 3: ON PK Fc 1773 Hz Gain 3.3 dB Q 1.83\n' \
'Filter 10: ON HS Fc 10000 Hz Gain -1.7 dB Q 0.70\n'
fp = Path('test_read_eqapo.txt')
with open(fp, 'w', encoding='utf-8') as fh:
fh.write(s)
fcs, qs, gains, types = read_eqapo(fp)
fp.unlink(missing_ok=True)
self.assertEqual(fcs, [105, 1773, 10000])
self.assertEqual(qs, [0.7, 1.83, 0.7])
self.assertEqual(gains, [-0.5, 3.3, -1.7])
self.assertEqual(types, ['LS', 'PK', 'HS'])


0 comments on commit 6c7abce

Please sign in to comment.