From a9f206ee542b0ae2f4a67b30ae4cbde17b8278c5 Mon Sep 17 00:00:00 2001 From: Jesper Brunnstroem Date: Fri, 2 Feb 2024 11:04:13 +0100 Subject: [PATCH] some work on spherical harmonics, and written testsagainst sf_analysis_ueno, a temporary module --- aspcol/sf_analysis_ueno.py | 200 ++++ aspcol/soundfieldestimation.py | 263 +++--- aspcol/sphericalharmonics.py | 979 ++++++++++++++++++++ aspcol/unittests/test_sphericalharmonics.py | 509 ++++++++++ 4 files changed, 1836 insertions(+), 115 deletions(-) create mode 100644 aspcol/sf_analysis_ueno.py create mode 100644 aspcol/sphericalharmonics.py create mode 100644 aspcol/unittests/test_sphericalharmonics.py diff --git a/aspcol/sf_analysis_ueno.py b/aspcol/sf_analysis_ueno.py new file mode 100644 index 0000000..7788605 --- /dev/null +++ b/aspcol/sf_analysis_ueno.py @@ -0,0 +1,200 @@ +import numpy as np +import scipy.spatial.distance as distfuncs +import scipy.special as special + +""" +This module is lifted directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py +Only temporarily here to check results against. + +""" + + +def coefEstOprGen(posEst, orderEst, posMic, orderMic, coefMic, k, reg=1e-3): + """Generate operator to estimate expansion coefficients of spherical wavefunctions from measurement vectors + - N. Ueno, S. Koyama, and H. Saruwatari, 'Sound Field Recording Using Distributed Microphones Based on + Harmonic Analysis of Infinite Order,' IEEE SPL, DOI: 10.1109/LSP.2017.2775242, 2018. + + Parameters + ------ + posEst: Position of expansion center for estimation + orderEst: Maximum order for estimation + poMic: Microphone positions + orderMic: Maximum order of microphone directivity + coefMic: Expansion coefficients of microphone directivity + Returns + ------ + Operator for estimation + (Expansion coefficeints are estimated by multiplying with measurement vectors) + """ + numMic = posMic.shape[0] + if np.isscalar(k): + numFreq = 1 + k = np.array([k]) + else: + numFreq = k.shape[0] + + Xi = np.zeros((numFreq, (orderEst+1)**2, numMic), dtype=complex) + Psi = np.zeros((numFreq, numMic, numMic), dtype=complex) + for ff in np.arange(numFreq): + print('Frequency: %d/%d' % (ff, numFreq)) + for j in np.arange(numMic): + T = trjmat3d(orderEst, orderMic, posEst[0, 0]-posMic[j, 0], posEst[0, 1]-posMic[j, 1], posEst[0, 2]-posMic[j, 2], k[ff]) + Xi[ff, :, j] = T @ coefMic[:, j] + Psi[ff, j, j] = coefMic[:, j].conj().T @ coefMic[:, j] + for i in np.arange(j, numMic): + T = trjmat3d(orderMic, orderMic, posMic[i, 0]-posMic[j, 0], posMic[i, 1]-posMic[j, 1], posMic[i, 2]-posMic[j, 2], k[ff]) + Psi[ff, i, j] = coefMic[:, i].conj().T @ T @ coefMic[:, j] + Psi[ff, j, i] = Psi[ff, i, j].conj() + eigPsi, _ = np.linalg.eig(Psi) + regPsi = eigPsi[:,0] * reg + Psi_inv = np.linalg.inv(Psi + regPsi[:,None,None] * np.eye(numMic, numMic)[None, :, :]) + coefEstOpr = Xi @ Psi_inv + + return coefEstOpr + + + +def trjmat3d(order1, order2, x, y, z, k): + """Translation operator in 3D + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + """ + if np.all([x, y, z] == 0): + T = np.eye((order1+1)**2, (order2+1)**2) + return T + else: + order = order1 + order2 + n, m = sph_harm_nmvec(order) + P = sf_int_basis3d(n, m, x, y, z, k) + T = np.zeros(((order1+1)**2, (order2+1)**2), dtype=complex) + + icol = 0 + for n in np.arange(0, order2+1): + for m in np.arange(-n, n+1): + irow = 0 + for nu in np.arange(0, order1+1): + for mu in np.arange(-nu, nu+1): + l = np.arange((n+nu), max( [np.abs(n-nu), np.abs(m-mu)] )-1, -2) + G = np.zeros(l.shape) + for ll in np.arange(0, l.shape[0]): + G[ll] = gaunt_coef(n, m, nu, -mu, l[ll]) + T[irow, icol] = np.sqrt(4.*np.pi) * 1j**(nu-n) * (-1.)**m * np.sum( 1j**(l) * P[l**2 + l - (mu-m)] * G ) + irow = irow + 1 + icol = icol+1 + return T + +def gaunt_coef(l1, m1, l2, m2, l3): + """Gaunt coefficients + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + """ + m3 = -m1 - m2 + l = int((l1 + l2 + l3) / 2) + + t1 = l2 - m1 - l3 + t2 = l1 + m2 - l3 + t3 = l1 + l2 - l3 + t4 = l1 - m1 + t5 = l2 + m2 + + tmin = max([0, max([t1, t2])]) + tmax = min([t3, min([t4, t5])]) + + t = np.arange(tmin, tmax+1) + gl_tbl = np.array(special.gammaln(np.arange(1, l1+l2+l3+3))) + G = np.sum( (-1.)**t * np.exp( -np.sum( gl_tbl[np.array([t, t-t1, t-t2, t3-t, t4-t, t5-t])] ) \ + +np.sum( gl_tbl[np.array([l1+l2-l3, l1-l2+l3, -l1+l2+l3, l])] ) \ + -np.sum( gl_tbl[np.array([l1+l2+l3+1, l-l1, l-l2, l-l3])] ) \ + +np.sum( gl_tbl[np.array([l1+m1, l1-m1, l2+m2, l2-m2, l3+m3, l3-m3])] ) * 0.5 ) ) \ + * (-1.)**( l + l1 - l2 - m3) * np.sqrt( (2*l1+1) * (2*l2+1) * (2*l3+1) / (4*np.pi) ) + return G + + + + + +def sph_harm_nmvec(order, rep=None): + """Vectors of spherical harmonic orders and degrees + Returns (order+1)**2 size vectors of n and m + n = [0, 1, 1, 1, ..., order, ..., order]^T + m = [0, -1, 0, 1, ..., -order, ..., order]^T + Parameters + ------ + order: Maximum order + rep: Same vectors are copied as [n, .., n] and [m, ..., m] + Returns + ------ + n, m: Vectors of orders and degrees + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + """ + n = np.array([0]) + m = np.array([0]) + for nn in np.arange(1, order+1): + nn_vec = np.tile([nn], 2*nn+1) + n = np.append(n, nn_vec) + mm = np.arange(-nn, nn+1) + m = np.append(m, mm) + if rep is not None: + n = np.tile(n[:, None], (1, rep)) + m = np.tile(m[:, None], (1, rep)) + return n, m + + +def sf_int_basis3d(n, m, x, y, z, k): + """Spherical wavefunction for interior sound field in 3D + + Parameters + ------ + n, m: orders and degrees + x, y, z: Position in Cartesian coordinates + k: Wavenumber + Returns + ------ + sqrt(4pi) j_n(kr) Y_n^m(phi,theta) + (Normalized so that 0th order coefficient corresponds to pressure) + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + """ + phi, theta, r = cart2sph(x, y, z) + J = special.spherical_jn(n, k * r) + Y = special.sph_harm(m, n, phi, theta) + f = np.sqrt(4*np.pi) * J * Y + return f + + + +def sph2cart(phi, theta, r): + """Conversion from spherical to Cartesian coordinates + Parameters + ------ + phi, theta, r: Azimuth angle, zenith angle, distance + Returns + ------ + x, y, z : Position in Cartesian coordinates + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + should be replaced with util.spherical2cart + """ + x = r * np.sin(theta) * np.cos(phi) + y = r * np.sin(theta) * np.sin(phi) + z = r * np.cos(theta) + return x, y, z + + +def cart2sph(x, y, z): + """Conversion from Cartesian to spherical coordinates + Parameters + ------ + x, y, z : Position in Cartesian coordinates + Returns + ------ + phi, theta, r: Azimuth angle, zenith angle, distance + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + should be replaced with util.cart2spherical + """ + r_xy = np.sqrt(x**2 + y**2) + phi = np.arctan2(y, x) + theta = np.arctan2(r_xy, z) + r = np.sqrt(x**2 + y**2 + z**2) + return phi, theta, r \ No newline at end of file diff --git a/aspcol/soundfieldestimation.py b/aspcol/soundfieldestimation.py index 5045c87..98d8ba7 100644 --- a/aspcol/soundfieldestimation.py +++ b/aspcol/soundfieldestimation.py @@ -21,6 +21,106 @@ import aspcol.utilities as util import aspcol.filterdesign as fd import aspcol.pseq as pseq +import aspcol.sphericalharmonics as sph + + + +#============= FREQUENCY DOMAIN METHODS - STATIONARY MICROPHONES ============= + +def est_ki_diffuse_freq(p_freq, pos, pos_eval, k, reg_param): + """ + Estimates the RIR in the frequency domain using kernel interpolation + Uses the frequency domain sound pressure as input + + Parameters + ---------- + p_freq : ndarray of shape (num_real_freqs, num_mics) + sound pressure in frequency domain at num_mic microphone positions + pos : ndarray of shape (num_mic, 3) + positions of the microphones + pos_eval : ndarray of shape (num_eval, 3) + positions of the evaluation points + k : ndarray of shape (num_freq) + wavenumbers + reg_param : float + regularization parameter for kernel interpolation + + Returns + ------- + est_sound_pressure : ndarray of shape (num_real_freqs, num_eval) + estimated RIR per frequency at the evaluation points + """ + est_filt = ki.get_krr_parameters(ki.kernel_helmholtz_3d, reg_param, pos_eval, pos, k) + p_ki = est_filt @ p_freq[:,:,None] + return np.squeeze(p_ki, axis=-1) + + +def nearest_neighbour_freq(p_freq, pos, pos_eval): + """ + Estimates the sound field at the evaluation points by simply selecting the value + associated with the nearest microphone position. + + Parameters + ---------- + p_freq : ndarray of shape (num_real_freqs, num_mics) + sound pressure in frequency domain at num_mic microphone positions + pos : ndarray of shape (num_mic, 3) + positions of the microphones + pos_eval : ndarray of shape (num_eval, 3) + positions of the evaluation points + + Returns + ------- + est_sound_pressure : ndarray of shape (num_real_freqs, num_eval) + estimated RIR per frequency at the evaluation points + """ + dist = spdist.cdist(pos, pos_eval) + min_idx = np.argmin(dist, axis=0) + + num_real_freqs = p_freq.shape[0] + est_sound_pressure = np.zeros((num_real_freqs, pos_eval.shape[0]), dtype=complex) + for pos_idx in range(pos_eval.shape[0]): + est_sound_pressure[:,pos_idx] = p_freq[:,min_idx[pos_idx]] + return est_sound_pressure + + +def inf_dim_shd_analysis(p_freq, pos, pos_eval, wave_num, dir_coeffs, reg_param): + """Estimates the sound field using directional microphones + + Since we reconstruct the sound pressure immediately, without explicitly computing + the spherical harmonic coefficients, no truncation or expansion center must be set. + + Parameters + ---------- + p_freq : ndarray of shape (num_freqs, num_mics) + sound pressure in frequency domain at num_mic microphone positions + pos : ndarray of shape (num_mic, 3) + positions of the microphones + pos_eval : ndarray of shape (num_eval, 3) + positions of the evaluation points + wave_num : ndarray of shape (num_freq) + wavenumbers + dir_coeffs : ndarray of shape (num_freq, num_mic, num_coeffs) + harmonic coefficients of the directionality of the microphones + reg_param : float + regularization parameter. Must be non-negative + + Returns + ------- + est_sound_pressure : ndarray of shape (num_real_freqs, num_eval) + estimated RIR per frequency at the evaluation points + """ + num_mic = pos.shape[0] + + psi = sph.translated_inner_product(pos, pos, dir_coeffs, dir_coeffs, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * reg_param + regression_vec = np.linalg.solve(psi_plus_noise_cov, p_freq)[...,None] + + omni_dir = sph.directivity_omni() + estimator_matrix = sph.translated_inner_product(pos_eval, pos, omni_dir, dir_coeffs, wave_num) + est_sound_pressure = np.squeeze(estimator_matrix @ regression_vec, axis=-1) + return est_sound_pressure +# ============= TIME DOMAIN METHODS - STATIONARY MICROPHONES ============= def pseq_nearest_neighbour(p, seq, pos, pos_eval): @@ -92,37 +192,20 @@ def est_ki_diffuse(p, seq, pos, pos_eval, samplerate, c, reg_param): return est_ki_diffuse_freq(rir_freq, pos, pos_eval, k, reg_param) -def est_ki_diffuse_freq(p_freq, pos, pos_eval, k, reg_param): - """ - Estimates the RIR in the frequency domain using kernel interpolation - Uses the frequency domain sound pressure as input - Parameters - ---------- - p_freq : ndarray of shape (num_real_freqs, num_mics) - sound pressure in frequency domain at num_mic microphone positions - pos : ndarray of shape (num_mic, 3) - positions of the microphones - pos_eval : ndarray of shape (num_eval, 3) - positions of the evaluation points - k : ndarray of shape (num_freq) - wavenumbers - reg_param : float - regularization parameter for kernel interpolation - Returns - ------- - est_sound_pressure : ndarray of shape (num_real_freqs, num_eval) - estimated RIR per frequency at the evaluation points - """ - est_filt = ki.get_krr_parameters(ki.kernel_helmholtz_3d, reg_param, pos_eval, pos, k) - p_ki = est_filt @ p_freq[:,:,None] - return np.squeeze(p_ki, axis=-1) + + + + +#============= MOVING MICROPHONE ESTIMATION ============= + + def est_inf_dimensional_shd_dynamic(p, pos, pos_eval, sequence, samplerate, c, reg_param, verbose=False): """ Estimates the RIR at evaluation positions using data from a moving microphone @@ -196,26 +279,55 @@ def est_inf_dimensional_shd_dynamic(p, pos, pos_eval, sequence, samplerate, c, r psi += 2*np.real(np.sinc(dist_mat * k[f]) * phi_rank1_matrix) noise_cov = reg_param * np.eye(N) - right_side = splin.solve(psi + noise_cov, p, assume_a = "pos") + regressor = splin.solve(psi + noise_cov, p, assume_a = "pos") - right_side = Phi.conj() * right_side[None,:] + regressor = Phi.conj() * regressor[None,:] # ======= Reconstruction of RIR ======= est_sound_pressure = np.zeros((num_real_freqs, pos_eval.shape[0]), dtype=complex) for f in range(num_real_freqs): kernel_val = ki.kernel_helmholtz_3d(pos_eval, pos, k[f:f+1]).astype(complex)[0,:,:] - est_sound_pressure[f, :] = np.sum(kernel_val * right_side[f,None,:], axis=-1) + est_sound_pressure[f, :] = np.sum(kernel_val * regressor[f,None,:], axis=-1) if verbose: - diagnostics = {} - diagnostics["regularization parameter"] = reg_param - diagnostics["condition number"] = np.linalg.cond(psi).tolist() - diagnostics["smallest eigenvalue"] = splin.eigh(psi, subset_by_index=(0,0), eigvals_only=True).tolist() - diagnostics["largest eigenvalue"] = splin.eigh(psi, subset_by_index=(N-1, N-1), eigvals_only=True).tolist() - return est_sound_pressure, diagnostics + #diagnostics = {} + #diagnostics["regularization parameter"] = reg_param + #diagnostics["condition number"] = np.linalg.cond(psi).tolist() + #diagnostics["smallest eigenvalue"] = splin.eigh(psi, subset_by_index=(0,0), eigvals_only=True).tolist() + #diagnostics["largest eigenvalue"] = splin.eigh(psi, subset_by_index=(N-1, N-1), eigvals_only=True).tolist() + return est_sound_pressure, regressor, psi#, diagnostics else: return est_sound_pressure +def reconstruct_inf_dimensional_shd_dynamic(regressor, pos_eval, pos, k): + """ + Reconstructs the sound field at the evaluation points using the regressor matrix + from est_inf_dimensional_shd_dynamic + + Parameters + ---------- + regressor : ndarray of shape (num_real_freqs, N) + regressor matrix from est_inf_dimensional_shd_dynamic + pos_eval : ndarray of shape (num_eval, 3) + positions of the evaluation points + pos : ndarray of shape (N, 3) + positions of the trajectory for each sample + k : ndarray of shape (num_freq) + wavenumbers + + Returns + ------- + est_sound_pressure : ndarray of shape (num_real_freqs, num_eval) + estimated RIR per frequency at the evaluation points + """ + num_real_freqs = regressor.shape[0] + est_sound_pressure = np.zeros((num_real_freqs, pos_eval.shape[0]), dtype=complex) + for f in range(num_real_freqs): + kernel_val = ki.kernel_helmholtz_3d(pos_eval, pos, k[f:f+1]).astype(complex)[0,:,:] + est_sound_pressure[f, :] = np.sum(kernel_val * regressor[f,None,:], axis=-1) + return est_sound_pressure + + def est_spatial_spectrum_dynamic(p, pos, pos_eval, sequence, samplerate, c, r_max, verbose=False): @@ -273,13 +385,13 @@ def est_spatial_spectrum_dynamic(p, pos, pos_eval, sequence, samplerate, c, r_ma # ======= Estimation of spatial spectrum coefficients ======= phi = _sequence_stft_multiperiod(sequence, num_periods) - max_orders = shd_min_order(k, r_max) + max_orders = sph.shd_min_order(k, r_max) r, angles = util.cart2spherical(pos) Sigma = [] for f in range(num_freqs): - order, modes = shd_num_degrees_vector(max_orders[f]) + order, modes = sph.shd_num_degrees_vector(max_orders[f]) Y_f = spspec.sph_harm(modes[None,:], order[None,:], angles[:,0:1], angles[:,1:2]) B_f = spspec.spherical_jn(order[None,:], k[f]*r[:,None]) @@ -299,7 +411,7 @@ def est_spatial_spectrum_dynamic(p, pos, pos_eval, sequence, samplerate, c, r_ma r_eval, angles_eval = util.cart2spherical(pos_eval) ord_idx = 0 for f in range(num_real_freqs): - order, modes = shd_num_degrees_vector(max_orders[f]) + order, modes = sph.shd_num_degrees_vector(max_orders[f]) num_ord = len(order) j_denom = spspec.spherical_jn(order, k[f]*r_max) @@ -376,82 +488,3 @@ def _sequence_stft(sequence): - - -def shd_num_degrees(max_order : int): - """ - Returns a list of mode indices for each order - when order = n, the degrees are only non-zero for -n <= degree <= n - - Parameters - ---------- - max_order : int - is the maximum order that is included - - Returns - ------- - degree : list of ndarrays of shape (2*order+1) - so the ndarrays will grow larger for higher list indices - """ - degree = [] - for n in range(max_order+1): - pos_degrees = np.arange(n+1) - degree_n = np.concatenate((-np.flip(pos_degrees[1:]), pos_degrees)) - degree.append(degree_n) - return degree - -def shd_num_degrees_vector(max_order : int): - """ - Constructs a vector with the index of each order and degree - when order = n, the degrees are only non-zero for -n <= degree <= n - - Parameters - ---------- - max_order : int - is the maximum order that is included - - Returns - ------- - order : ndarray of shape () - degree : ndarray of shape () - """ - order = [] - degree = [] - - for n in range(max_order+1): - pos_degrees = np.arange(n+1) - degree_n = np.concatenate((-np.flip(pos_degrees[1:]), pos_degrees)) - degree.append(degree_n) - - order.append(n*np.ones_like(degree_n)) - degree = np.concatenate(degree) - order = np.concatenate(order) - return order, degree - -def shd_min_order(wavenumber, radius): - """ - Returns the minimum order of the spherical harmonics that should be used - for a given wavenumber and radius - - Here according to the definition in Katzberg et al., Spherical harmonic - representation for dynamic sound-field measurements - - Parameters - ---------- - wavenumber : ndarray of shape (num_freqs) - radius : float - represents r_max in the Katzberg paper - - Returns - ------- - M_f : ndarray of shape (num_freqs) - contains an integer which is the minimum order of the spherical harmonics - """ - return np.ceil(wavenumber * radius).astype(int) - - - - - - - diff --git a/aspcol/sphericalharmonics.py b/aspcol/sphericalharmonics.py new file mode 100644 index 0000000..5d195cc --- /dev/null +++ b/aspcol/sphericalharmonics.py @@ -0,0 +1,979 @@ +import numpy as np +import itertools as it +import scipy.linalg as splin +import scipy.special as special +import scipy.spatial.distance as distance + +import aspcol.kernelinterpolation as ki +import aspcol.utilities as utils +import aspcol.filterdesign as fd + +import aspcol.sf_analysis_ueno as sau + +import wigners + + +# ==================== BASIC SHD FUNCTIONALITY ==================== + +def shd_max_order(num_coeffs): + """Returns the maximum order of the spherical harmonics that is required to represent a given number of coefficients + + Parameters + ---------- + num_coeffs : int + number of coefficients + + Returns + ------- + max_order : int + maximum order of the spherical harmonics that is required to represent the coefficients + """ + order = np.sqrt(num_coeffs) - 1 + assert order == int(order) + return int(order) + +def shd_num_coeffs(max_order): + """Number of coefficients required to represent a spherical harmonic function of a given order + + Parameters + ---------- + max_order : int + maximum order of the spherical harmonics + + Returns + ------- + num_coeffs : int + number of coefficients required to represent a spherical harmonic function of the given order + """ + return (max_order+1)**2 + + +def shd_num_degrees(max_order : int): + """ + Returns a list of mode indices for each order + when order = n, the degrees are only non-zero for -n <= degree <= n + + Parameters + ---------- + max_order : int + is the maximum order that is included + + Returns + ------- + degree : list of ndarrays of shape (2*order+1) + so the ndarrays will grow larger for higher list indices + """ + degree = [] + for n in range(max_order+1): + pos_degrees = np.arange(n+1) + degree_n = np.concatenate((-np.flip(pos_degrees[1:]), pos_degrees)) + degree.append(degree_n) + return degree + +def shd_num_degrees_vector(max_order : int): + """ + Constructs a vector with the index of each order and degree + when order = n, the degrees are only non-zero for -n <= degree <= n + + Parameters + ---------- + max_order : int + is the maximum order that is included + + Returns + ------- + order : ndarray of shape () + degree : ndarray of shape () + """ + order = [] + degree = [] + + for n in range(max_order+1): + pos_degrees = np.arange(n+1) + degree_n = np.concatenate((-np.flip(pos_degrees[1:]), pos_degrees)) + degree.append(degree_n) + + order.append(n*np.ones_like(degree_n)) + degree = np.concatenate(degree) + order = np.concatenate(order) + return order, degree + +def shd_min_order(wavenumber, radius): + """ + Returns the minimum order of the spherical harmonics that should be used + for a given wavenumber and radius + + Here according to the definition in Katzberg et al., Spherical harmonic + representation for dynamic sound-field measurements + + Parameters + ---------- + wavenumber : ndarray of shape (num_freqs) + radius : float + represents r_max in the Katzberg paper + + Returns + ------- + M_f : ndarray of shape (num_freqs) + contains an integer which is the minimum order of the spherical harmonics + """ + return np.ceil(wavenumber * radius).astype(int) + + + + + + + +def shd_basis(pos, order, degree, wavenum): + """Spherical harmonic basis function for sound field in 3D + + Implements: sqrt(4pi) j_order(kr) Y_order^degree(polar angle, zenith angle) + degree and order of the spherical harmonics might be swapped according to some + definitions. + + Parameters + ---------- + pos : ndarray of shape (num_pos, 3) + positions of the evaluation points + order : ndarray of shape (num_coeffs,) + order of the spherical harmonics. Can be any non-negative integer + degree : ndarray of shape (num_coeffs,) + degree of the spherical harmonics. Must satisfy |degree[i]| <= order[i] for all i + wave_num : float or ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular + frequency and c is the speed of sound + + Returns + ------- + f : ndarray of shape (num_freq, num_coeffs, num_pos) + values of the spherical harmonic basis function at the evaluation points + + Notes + ----- + See more detailed derivations in any of: + [1] Natsuki Ueno, Shoichi Koyama, Hiroshi Saruwatai, 'Sound Field Recording Using + Distributed Microphones Based on Harmonic Analysis of Infinite Order' + [2] J. Brunnström, M.B. Moeller, M. Moonen, 'Bayesian sound field estimation using + moving microphones' + """ + #Parse arguments to get correct shapes + radius, angles = utils.cart2spherical(pos) + radius = radius[None, None, ...] + angles = angles[None, None, ...] + + order = order[None, :, None] + degree = degree[None, :, None] + + if isinstance(wavenum, (int, float)): + wavenum = np.array([wavenum]) + assert wavenum.ndim == 1 + wavenum = wavenum[:, None, None] + + # Calculate the function values + f = np.sqrt(4*np.pi) * special.spherical_jn(order, wavenum * radius) * special.sph_harm(degree, order, angles[...,0], angles[...,1]) + return f + + +def reconstruct_pressure(shd_coeffs, pos, expansion_center, wavenum): + """Returns the complex sound pressure at some positions using the provided spherical harmonic coefficients + + Parameters + ---------- + shd_coeffs : ndarray of shape (num_freq, num_coeffs,) + spherical harmonic coefficients of the sound field + pos : ndarray of shape (num_pos, 3) + positions of the evaluation points + expansion_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order : int + maximum order of the spherical harmonics. Could be removed in a future version of + the function, but must currently equal the max_order of the provided coefficients + wavenum : ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + + Returns + ------- + p_est : ndarray of shape (num_freq, num_pos,) + complex sound pressure at the evaluation points + """ + num_coeffs = shd_coeffs.shape[1] + max_order = shd_max_order(num_coeffs) + + rel_pos = pos - expansion_center + order, degree = shd_num_degrees_vector(max_order) + basis_values = shd_basis(rel_pos, order, degree, wavenum) + p_est = np.sum(shd_coeffs[...,None] * basis_values, axis=1) + return p_est + + + + + + + + + + + + + +# ==================== TRANSLATION OF SHD COEFFICIENTS ==================== + +def translation(pos, wave_num, max_order_input, max_order_output): + """Creates a translation operator for a sequence of spherical harmonic coefficients to another expansion center + + The operator can be applied to a sequence of spherical harmonic coefficients to translate + the expansion center of the sequence. + shd_coeffs(pos) = T(pos - pos_orig) @ shd_coeffs(pos_orig) + + Parameters + ---------- + pos : ndarray of shape (num_pos, 3) + position argument to the translation operator + wave_num : ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + max_order_input : int + maximum order of the coefficients that should be translated + max_order_output : int + maximum order of the translated coefficients + + Returns + ------- + shd_coeffs_translated : ndarray of shape (num_freq, num_pos, num_coeffs_output, num_coeffs_input) + translated coefficients + """ + test_mat = sau.trjmat3d(max_order_output, max_order_input, pos[0,0], pos[0,1], pos[0, 2], wave_num[0]) + num_freqs = wave_num.shape[0] + num_pos = pos.shape[0] + + T_all = np.zeros((num_freqs, num_pos, test_mat.shape[0], test_mat.shape[1]), dtype=complex) + for m in range(num_pos): + for f in range(num_freqs): + T_all[f, m, :, :] = sau.trjmat3d(max_order_output, max_order_input, pos[m,0], pos[m,1], pos[m, 2], wave_num[f]) + return T_all + + +def translate_shd_coeffs(shd_coeffs, pos, wave_num, max_order_input, max_order_output): + """Translates the provided shd_coeffs to another expansion center + + shd_coeffs(pos_orig + pos) = translate(pos) shd_coeffs(pos_orig) + + Parameters + ---------- + shd_coeffs : ndarray of shape (num_freqs, num_coeffs,) + coefficients of the sequence + pos : ndarray of shape (num_pos, 3) + position argument to the translation operator + wave_num : ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + max_order_output : int + maximum order of the coefficients that should be translated + max_order_output : int + maximum order of the translated coefficients + + Returns + ------- + shd_coeffs_translated : ndarray of shape (num_freqs, num_pos, num_coeffs,) + translated coefficients + if num_pos == 1, the returned array will have shape (num_freqs, num_coeffs,) + """ + T_all = translation(pos, wave_num, max_order_input, max_order_output) + translated_coeffs = T_all @ shd_coeffs[:,None,:,None] + + num_pos = pos.shape[0] + if num_pos == 1: + translated_coeffs = np.squeeze(translated_coeffs, axis=1) + return np.squeeze(translated_coeffs, axis=-1) + + +def translation_operator(pos, wave_num, max_order_input, max_order_output): + """Translation operator for harmonic coefficients, such that + shd_coeffs(pos_orig + pos) = T(pos) @ shd_coeffs(pos_orig) + + Parameters + ---------- + pos : ndarray of shape (num_pos, 3) + position argument to the translation operator + wave_num : ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + max_order_input : int + maximum order of the coefficients that should be translated + max_order_output : int + maximum order of the translated coefficients + + Returns + ------- + T : ndarray of shape (num_freqs, num_pos, num_coeffs_output, num_coeffs_input) + translation operator, such that shd_coeffs(pos_orig + pos) = T(pos) @ shd_coeffs(pos_orig) + """ + num_coeffs_input = shd_num_coeffs(max_order_input) + num_coeffs_output = shd_num_coeffs(max_order_output) + num_pos = pos.shape[0] + num_freq = wave_num.shape[0] + + tr_op = np.zeros((num_freq, num_pos, num_coeffs_output, num_coeffs_input), dtype=complex) + + orders_input, degrees_input = shd_num_degrees_vector(max_order_input) + orders_output, degrees_output = shd_num_degrees_vector(max_order_output) + for f in range(num_freq): + for m in range(num_pos): + for out_idx, (l1, m1) in enumerate(zip(orders_output, degrees_output)): + for in_idx, (l2, m2) in enumerate(zip(orders_input, degrees_input)): + sum_val = 0 + for q in range(l1+l2+1): + if np.abs(m2-m1) <= q: + basis_val = np.conj(shd_basis(pos[m:m+1,:], np.array([q]), np.array([m2-m1]), wave_num[f])) + g = gaunt_coefficient(l1, m1, l2, -m2, q) + new_val = (1j)**q * basis_val * g + new_val *= (-1.0)**(m1) * (1j)**(l2-l1) + sum_val += new_val + + tr_op[f, m, out_idx, in_idx] = np.squeeze(sum_val) + tr_op *= np.sqrt(4*np.pi) + return tr_op + +def gaunt_coefficient(l1, m1, l2, m2, l3): + """Gaunt coefficient G(l1, m1, l2, m2, l3) + + As defined by P. A. Martin, 2006, 'Multiple scattering: + Interaction of time-harmonic waves with N obstacles'. + Defined on page 83, equation (3.71). Argument order is the same as in the reference + + Parameters + ---------- + l1 : int + Spherical harmonic order + m1 : int + Spherical harmonic degree + l2 : int + Spherical harmonic order + m2 : int + Spherical harmonic degree + l3 : int + Spherical harmonic order + + Notes + ----- + The relationship between this and triple_harmonic_integral + (the latter is the definition of gaunt coefficient given by Sympy) + is I(l1, l2, l3, m1, m2, m3) = delta(m1+m2+m3,0) * (-1)**(m3) * gaunt(l1, m1, l2, m2, l3) + + Can be seen on the final equation on page 328 in Multiple scattering: Interaction of time-harmonic waves with N obstacles + by P. A. Martin, 2006. + + A recursive algorithm can be found in: + Fast evaluation of Gaunt coefficients: recursive approach - Yu-lin Xu, 1997 + """ + f1 = (-1.0)**(m1+m2) + f2 = np.sqrt((2*l1+1)*(2*l2+1)*(2*l3+1)/(4*np.pi)) + f3 = wigners.wigner_3j(l1,l2,l3,m1,m2,-m1-m2) + f4 = wigners.wigner_3j(l1,l2,l3,0,0,0) + return f1 * f2 * f3 * f4 + +def triple_harmonic_integral(l1, l2, l3, m1, m2, m3): + """Integral of the product of three spherical harmonics + + Defined as int_{\omega} Y_{l1,m1}(\omega) Y_{l2,m2}(\omega) Y_{l3,m3}(\omega) d\omega + where \omega is the angle. It is sometimes (in Sympy for example) called gaunt coefficient. + + Parameters + ---------- + l1, l2, l3 : int + Spherical harmonic orders + m1, m2, m3 : int + Spherical harmonic degrees + + """ + f1 = np.sqrt((2*l1+1)*(2*l2+1)*(2*l3+1)/(4*np.pi)) + f2 = wigners.wigner_3j(l1,l2,l3,m1,m2,m3) + f3 = wigners.wigner_3j(l1,l2,l3,0,0,0) + return f1 * f2 * f3 + +def gaunt_coef_lookup(max_order): + """ + Returns a lookup table for the gaunt coefficients + """ + all_params = np.array([i for i in _all_gaunt_parameters(max_order)]) + + tbl = np.zeros((max_order, 2*max_order+1, max_order, 2*max_order+1, max_order)) + + val_ref = [] + for i in range(all_params.shape[0]): + (l1, m1, l2, m2, l3) = all_params[i] + tbl[l1, m1, l2, m2, l3] = gaunt_coefficient(l1, m1, l2, m2, l3) + val_ref = np.array(val_ref) + return tbl + +def _all_gaunt_parameters(max_order): + """ This might not include all parameters""" + all_orders = it.product(range(max_order), range(max_order), range(max_order)) + for l1, l2, l3 in all_orders: + all_modes = it.product(range(0, l1), range(0, l2)) + for m1, m2 in all_modes: + if abs(l1-l2) <= l3 <= l1+l2: + if abs(m1 + m2) <= l3: + if (l1 + l2 + l3) % 2 == 0: + yield l1, m1, l2, m2, l3 + + + + + + +# =================== DIRECTIVITY DEFINITIONS =================== + +def directivity_omni(max_order = 0): + """Harmonic coefficients of an omnidirectional directivity function + + """ + if max_order == 0: + return np.ones((1,1), complex) + else: + num_coeffs = shd_num_coeffs(max_order) + dir = np.zeros((1, num_coeffs), dtype=complex) + dir[0,0] = 1 + return dir + +def directivity_linear(A, d_mic, max_order = 1): + """Harmonic coefficients of a linear directivity function + + The directivity function is defined as + c(d, omega) = A + A * d^T d_mic + + Omni directivity is obtained by setting A = 0 + Cardoid directivity is obtained by setting A = 1/2 + Figure-8 directivity is obtained by setting A = 1 + + Parameters + ---------- + A : float + directivity parameter. Must be between 0 and 1. + d_mic : ndarray of shape (1, 3) + direction of the microphone. It is the angle at which a cardioid microphone would give the strongest response + It is the outward direction, so from the center of the microphone towards the sources + Must be a unit vector + max_order : int + maximum order of the spherical harmonics. If set higher to 1, + the coefficients of the higher orders will be zero. + + Returns + ------- + dir_coeffs : ndarray of shape (1, num_coeffs) + coefficients of the directivity function + """ + assert max_order >= 1 + assert 0 <= A <= 1 + radius, angles = utils.cart2spherical(-d_mic) + assert np.isclose(radius, 1) + dir_coeffs = np.zeros((1, shd_num_coeffs(max_order)), dtype=complex) + dir_coeffs[0,0] = 1 - A + + harm = special.sph_harm(np.array([-1,0,1]), 1, angles[...,0], angles[...,1]) + + dir_coeffs[0,1:4] = (-1j * A / 3) * np.sqrt(4*np.pi) * harm.conj() + return dir_coeffs + + + + + + + + + + + + + + + + +# ====================== SHD ESTIMATION ====================== + +def measurement_omni(pos, exp_center, max_order, wave_num): + T = translation(pos - exp_center, wave_num, max_order, 0) + T = np.sum(T, axis=-2) # inner product with omni directionality + return T + +def measurement_conj_omni(pos, exp_center, max_order, wave_num): + """Returns the adjoint measurement operator Xi^H + + Can be applied as shd_coeffs(pos) = Xi^H @ p + where p is complex sound pressure at the measurement points + + Parameters + ---------- + pos : ndarray of shape (num_pos, 3) + positions of the measurement points where the sound pressure was measured + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order = int + maximum order of the spherical harmonics that is output by the operator + wave_num : ndarray of shape (num_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + + Returns + ------- + operator : ndarray of shape (num_freqs, num_coeffs, num_pos) + adjoint of the measurement operator + """ + T = translation(exp_center - pos, wave_num, 0, max_order) + T = np.sum(T, axis=-1) # inner product with omni directionality + return np.moveaxis(T, 1,2) + +def apply_measurement_omni(shd_coeffs, pos, exp_center, max_order, wave_num): + """Applies the measurement operator to a sequence of spherical harmonic coefficients + + The operator is a function from a complex infinite (l2) sequence to a complex vector + Xi : l^2(C) -> C^M + + Parameters + ---------- + shd_coeffs : ndarray of shape (num_freq, num_coeffs,) + spherical harmonic coefficients + pos : ndarray of shape (num_pos, 3) + positions of the measurement points where the sound pressure should be estimated + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order = int + maximum order of the spherical harmonics that is input by the operator. Is in theory + redundant information and could be removed in a future version of the function. + wave_num : ndarray of shape (num_freq,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + + Returns + ------- + p_est : ndarray of shape (num_freq, num_mic,) + complex sound pressure at the measurement points + """ + rel_pos = pos - exp_center + shd_translated = translate_shd_coeffs(shd_coeffs, rel_pos, wave_num, max_order, 0) + p_est = np.sum(shd_translated, axis=-1) # must be multiplied by dir.conj() when not omnidirectional + return p_est + + +def apply_measurement_conj_omni(vec, pos, exp_center, max_order, wave_num): + """Applies the conjugate measurement operator to a complex vector + + The operator is a function from a complex vector to a complex infinite (l2) sequence + Xi^H : C^M -> l^2(C) + + Parameters + ---------- + vec : ndarray of shape (num_freqs, num_pos) + complex vector + pos : ndarray of shape (num_pos, 3) + positions of the measurement points where the sound pressure was measured + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order = int + maximum order of the spherical harmonics that is output by the operator + wave_num : ndarray of shape (num_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + + Returns + ------- + shd_coeffs : ndarray of shape (num_freqs, num_coeffs,) + coefficients of the infinite sequence + """ + rel_pos = exp_center - pos + directivity = directivity_omni() + dir_translated = translate_shd_coeffs(directivity, rel_pos, wave_num, 0, max_order) + return np.sum(dir_translated * vec[:,:,None], axis=1) + + +def apply_measurement_conj(vec, pos, exp_center, max_order, wave_num, dir_coeffs=None): + """Applies the conjugate measurement operator to a complex vector + + The operator is a function from a complex vector to a complex infinite (l2) sequence + Xi^H : C^M -> l^2(C) + + Parameters + ---------- + vec : ndarray of shape (num_freqs, num_pos) + complex vector + pos : ndarray of shape (num_pos, 3) + positions of the measurement points where the sound pressure was measured + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order = int + maximum order of the spherical harmonics that is output by the operator + wave_num : ndarray of shape (num_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + dir_coeffs : ndarray of shape (num_freqs, num_coeffs) + coefficients of the directivity function. If None, the directivity is assumed to be omnidirectional + + Returns + ------- + shd_coeffs : ndarray of shape (num_freqs, num_coeffs,) + coefficients of the infinite sequence + """ + rel_pos = exp_center - pos + if dir_coeffs is None: + directivity = directivity_omni() + else: + directivity = dir_coeffs + max_order_dir = shd_max_order(directivity.shape[1]) + + dir_translated = translate_shd_coeffs(directivity, rel_pos, wave_num, max_order_dir, max_order) + return np.sum(dir_translated * vec[:,:,None], axis=1) + +def translated_inner_product(pos1, pos2, dir_coeffs1, dir_coeffs2, wave_num): + """ + + + + Parameters + ---------- + pos1 : ndarray of shape (num_pos1, 3) + positions of the first set of measurement points + pos2 : ndarray of shape (num_pos2, 3) + positions of the second set of measurement points + dir_coeffs1 : ndarray of shape (num_freqs, num_coeffs1) + coefficients of the directivity function for the first set of measurement points + dir_coeffs2 : ndarray of shape (num_freqs, num_coeffs2) + coefficients of the directivity function for the second set of measurement points + wave_num : ndarray of shape (num_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound. + + Returns + ------- + psi : ndarray of shape (num_freqs, num_pos1, num_pos2) + inner product of the translated directivity functions + """ + max_order1 = shd_max_order(dir_coeffs1.shape[1]) + max_order2 = shd_max_order(dir_coeffs2.shape[1]) + num_pos1 = pos1.shape[0] + num_pos2 = pos2.shape[0] + num_freqs = len(wave_num) + + pos_diff = pos1[:,None,:] - pos2[None,:,:] + pos_diff = pos_diff.reshape(-1, 3) + + translated_coeffs2 = translate_shd_coeffs(dir_coeffs2, pos_diff, wave_num, max_order2, max_order1) + translated_coeffs2 = translated_coeffs2.reshape(num_freqs, num_pos1, num_pos2, -1) + + inner_product_matrix = np.sum(translated_coeffs2 * dir_coeffs1.conj()[:,None,None,:], axis=-1) + return inner_product_matrix + +def inf_dimensional_shd(p, pos, exp_center, max_order, wave_num, reg_param, dir_coeffs=None): + """Estimates the spherical harmonic coefficients with Bayesian inference, allows for arbitrary directivity + + Implements the method in Natsuki Ueno, Shoichi Koyama, Hiroshi Saruwatai, + 'Sound Field Recording Using Distributed Microphones Based on Harmonic Analysis of Infinite Order' + + Parameters + ---------- + p : ndarray of shape (num_real_freqs, num_mics) + complex sound pressure for each microphone and frequency + pos : ndarray of shape (num_mics, 3) + positions of the microphones + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order : int + maximum order of the spherical harmonics + wave_num : float + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound + reg_param : float + regularization parameter. Must be non-negative + dir_coeffs : ndarray of shape (num_real_freqs, num_coeffs) + coefficients of the directivity function. If None, the directivity is assumed to be omnidirectional + + Returns + ------- + shd_coeffs : complex ndarray of shape (num_real_freqs, num_coeffs) + harmonic coefficients of the estimated sound field + """ + num_mic = pos.shape[0] + + psi = translated_inner_product(pos, pos, dir_coeffs, dir_coeffs, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * reg_param + regression_vec = np.linalg.solve(psi_plus_noise_cov, p) + + shd_coeffs = apply_measurement_conj(regression_vec, pos, exp_center, max_order, wave_num, dir_coeffs=dir_coeffs) + return shd_coeffs + + +def inf_dimensional_shd_omni(p, pos, exp_center, max_order, wave_num, reg_param): + """Estimates the spherical harmonic coefficients with Bayesian inference + + Implements the method in Natsuki Ueno, Shoichi Koyama, Hiroshi Saruwatai, + 'Sound Field Recording Using Distributed Microphones Based on Harmonic Analysis of Infinite Order' + + Assumes all microphones are omnidirectional + + Parameters + ---------- + p : ndarray of shape (num_real_freqs, num_mics) + complex sound pressure for each microphone and frequency + pos : ndarray of shape (num_mics, 3) + positions of the microphones + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order : int + maximum order of the spherical harmonics + wave_num : float + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound + reg_param : float + regularization parameter. Must be non-negative + + Returns + ------- + shd_coeffs : complex ndarray of shape (num_real_freqs, num_coeffs) + harmonic coefficients of the estimated sound field + """ + num_mic = pos.shape[0] + + psi = ki.kernel_helmholtz_3d(pos, pos, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * reg_param + regression_vec = np.linalg.solve(psi_plus_noise_cov, p) + + shd_coeffs = apply_measurement_conj_omni(regression_vec, pos, exp_center, max_order, wave_num) + return shd_coeffs + +def inf_dimensional_shd_omni_prior(p, pos, exp_center, max_order, wave_num, reg_param, prior_mean): + """Estimates spherical harmonic coefficients with Bayesian inference of an infinite sequence of spherical harmonics + + Implements the method in Natsuki Ueno, Shoichi Koyama, Hiroshi Saruwatai, + 'Sound Field Recording Using Distributed Microphones Based on Harmonic Analysis of Infinite Order' + + Parameters + ---------- + p : ndarray of shape (num_real_freqs, num_mics) + complex sound pressure for each microphone and frequency + pos : ndarray of shape (num_mics, 3) + positions of the microphones + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order : int + maximum order of the spherical harmonics + reg_param : float + regularization parameter. Must be non-negative + wave_num : ndarray of shape (num_real_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound + prior_mean : ndarray of shape (num_real_freqs, num_coeffs,) + mean of the prior distribution for the sherical harmonic coefficients + + Returns + ------- + shd_coeffs : complex ndarray of shape (num_real_freqs, num_coeffs) + harmonic coefficients of the estimated sound field + """ + num_mic = pos.shape[0] + + psi = ki.kernel_helmholtz_3d(pos, pos, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * reg_param + p_prior = apply_measurement_omni(prior_mean, pos, exp_center, max_order, wave_num) + regression_vec = np.linalg.solve(psi_plus_noise_cov, p - p_prior) + + shd_coeffs = prior_mean + apply_measurement_conj_omni(regression_vec, pos, exp_center, max_order, wave_num) + return shd_coeffs + + + + +def posterior_mean_omni_arbitrary_cov(p, pos, exp_center, max_order, wave_num, noise_power, prior_mean, prior_covariance): + num_mic = pos.shape[0] + measure = measurement_omni(pos, exp_center, max_order, wave_num) + measure_conj = measurement_conj_omni(pos, exp_center, max_order, wave_num) + + if prior_mean is not None: + p_prior = measure @ prior_mean[...,None] + p = p - p_prior[...,0] + + psi = measure @ prior_covariance @ measure_conj + psi_plus_noise_cov = psi + np.eye(num_mic) * noise_power + regression_vec = np.linalg.solve(psi_plus_noise_cov, p) + + shd_coeffs = prior_covariance @ measure_conj @ regression_vec[...,None] + shd_coeffs = np.squeeze(shd_coeffs, axis=-1) + + if prior_mean is not None: + shd_coeffs += prior_mean + return shd_coeffs + +def posterior_mean_omni(p, pos, exp_center, max_order, wave_num, prior_variance, noise_power, prior_mean = None): + if prior_mean is not None: + p_prior = measurement_omni(pos, exp_center, max_order, wave_num) @ prior_mean[...,None] + p = p - p_prior[...,0] + + num_mic = pos.shape[0] + + psi = ki.kernel_helmholtz_3d(pos, pos, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * noise_power / prior_variance + regression_vec = np.linalg.solve(psi_plus_noise_cov, p) + + shd_coeffs = measurement_conj_omni(pos, exp_center, max_order, wave_num) @ regression_vec[...,None]#np.linalg.inv(psi_plus_noise_cov) @ p[:,:,None] + shd_coeffs = np.squeeze(shd_coeffs, axis=-1) + + if prior_mean is not None: + shd_coeffs += prior_mean + return shd_coeffs + +def posterior_covariance_omni(pos, exp_center, max_order, wave_num, prior_variance, noise_power): + """ Returns the posterior covariance of the spherical harmonic coefficients + + Given the model in Natsuki Ueno, Shoichi Koyama, Hiroshi Saruwatai, + 'Sound Field Recording Using Distributed Microphones Based on Harmonic Analysis of Infinite Order' + + Parameters + ---------- + pos : ndarray of shape (num_mics, 3) + positions of the microphones + exp_center : ndarray of shape (1, 3) + expansion center of the spherical harmonics + max_order : int + maximum order of the spherical harmonics + wave_num : ndarray of shape (num_real_freqs,) + wavenumber, defined as w / c where w is the angular frequency + and c is the speed of sound + prior_variance : float + variance of the prior distribution for the sherical harmonic coefficients. The prior + covariance matrix is assumed to be an identity matrix scaled by this value. + noise_power : float + variance of the noise in the measurement equation. The noise covariance matrix + is assumed to be a identity matrix scaled by this value + + Returns + ------- + cov : ndarray of shape (num_real_freqs, num_coeffs, num_coeffs) + posterior covariance of the spherical harmonic coefficients + + """ + num_mic = pos.shape[0] + num_coeffs = shd_num_coeffs(max_order) + + psi = ki.kernel_helmholtz_3d(pos, pos, wave_num) + psi_plus_noise_cov = psi + np.eye(num_mic) * noise_power / prior_variance + + cov = measurement_conj_omni(pos, exp_center, max_order, wave_num) @ \ + np.linalg.inv(psi_plus_noise_cov) @ \ + measurement_omni(pos, exp_center, max_order, wave_num) + cov = prior_variance * (np.eye(num_coeffs) - cov) + return cov + +def posterior_covariance_omni_arbitrary_cov(pos, exp_center, max_order, wave_num, noise_power, prior_covariance): + num_mic = pos.shape[0] + measure = measurement_omni(pos, exp_center, max_order, wave_num) + measure_conj = measurement_conj_omni(pos, exp_center, max_order, wave_num) + + psi = measure @ prior_covariance @ measure_conj + psi_plus_noise_cov = psi + np.eye(num_mic) * noise_power + + measure_times_cov = measure @ prior_covariance + cov_times_measure_conj = prior_covariance @ measure_conj + main_mat = cov_times_measure_conj @ np.linalg.solve(psi_plus_noise_cov, measure_times_cov) + + return prior_covariance - main_mat + + + + +def inf_dimensional_shd_dynamic_omni(p, pos, pos_eval, sequence, samplerate, c, reg_param, verbose=False): + """ + Estimates the RIR at evaluation positions using data from a moving microphone + using Bayesian inference of an infinite sequence of spherical harmonics + + Implements the method in J. Brunnström, M.B. Moeller, M. Moonen, + "Bayesian sound field estimation using moving microphones" + + Assumptions: + The microphones are omnidirectional + The noise covariance is a scaled identity matrix + The data is measured over an integer number of periods of the sequence + N = seq_len * M, where M is the number of periods that was measured + The length of sequence is the length of the estimated RIR + + Parameters + ---------- + p : ndarray of shape (N) + sound pressure for each sample of the moving microphone + pos : ndarray of shape (N, 3) + position of the trajectory for each sample + pos_eval : ndarray of shape (num_eval, 3) + positions of the evaluation points + sequence : ndarray of shape (seq_len) or (1, seq_len) + the training signal used for the measurements + samplerate : int + c : float + speed of sound + reg_param : float + regularization parameter + verbose : bool, optional + if True, returns diagnostics, by default False + + Returns + ------- + shd_coeffs : ndarray of shape (num_real_freqs, num_eval) + time-domain harmonic coefficients of the estimated sound field + """ + # ======= Argument parsing ======= + if p.ndim >= 2: + p = np.squeeze(p) + + if sequence.ndim == 2: + sequence = np.squeeze(sequence, axis=0) + assert sequence.ndim == 1 + + N = p.shape[0] + seq_len = sequence.shape[0] + num_periods = N // seq_len + assert N % seq_len == 0 + + k = fd.get_wavenum(seq_len, samplerate, c) + num_real_freqs = len(fd.get_real_freqs(seq_len, samplerate)) + + # ======= Estimation of spherical harmonic coefficients ======= + Phi = _sequence_stft_multiperiod(sequence, num_periods) + + #division by pi is a correction for the sinc function used later + dist_mat = np.sqrt(np.sum((np.expand_dims(pos,1) - np.expand_dims(pos,0))**2, axis=-1)) / np.pi + + psi = np.zeros((N, N), dtype = float) + + # no conjugation required for zeroth frequency and the Nyquist frequency, + # since they will be real already for a real input sequence + psi += np.sinc(dist_mat * k[0]) * np.real_if_close(Phi[0,:,None] * Phi[0,None,:]) + assert seq_len % 2 == 0 #following line is only correct if B is even + psi += np.sinc(dist_mat * k[seq_len//2]) * np.real_if_close(Phi[seq_len//2,:,None] * Phi[seq_len//2,None,:]) + + for f in range(1, num_real_freqs-1): + phi_rank1_matrix = Phi[f,:,None] * Phi[f,None,:].conj() + psi += 2*np.real(np.sinc(dist_mat * k[f]) * phi_rank1_matrix) + + noise_cov = reg_param * np.eye(N) + right_side = splin.solve(psi + noise_cov, p, assume_a = "pos") + + right_side = Phi.conj() * right_side[None,:] + + # ======= Reconstruction of RIR ======= + est_sound_pressure = np.zeros((num_real_freqs, pos_eval.shape[0]), dtype=complex) + for f in range(num_real_freqs): + kernel_val = ki.kernel_helmholtz_3d(pos_eval, pos, k[f:f+1]).astype(complex)[0,:,:] + est_sound_pressure[f, :] = np.sum(kernel_val * right_side[f,None,:], axis=-1) + + if verbose: + diagnostics = {} + diagnostics["regularization parameter"] = reg_param + diagnostics["condition number"] = np.linalg.cond(psi).tolist() + diagnostics["smallest eigenvalue"] = splin.eigh(psi, subset_by_index=(0,0), eigvals_only=True).tolist() + diagnostics["largest eigenvalue"] = splin.eigh(psi, subset_by_index=(N-1, N-1), eigvals_only=True).tolist() + return est_sound_pressure, diagnostics + else: + return est_sound_pressure + + + diff --git a/aspcol/unittests/test_sphericalharmonics.py b/aspcol/unittests/test_sphericalharmonics.py new file mode 100644 index 0000000..88e8aaf --- /dev/null +++ b/aspcol/unittests/test_sphericalharmonics.py @@ -0,0 +1,509 @@ +import numpy as np +import pathlib +import hypothesis as hyp +#from hypothesis import given +import hypothesis.strategies as st +import pytest +import itertools as it +from sympy.physics.wigner import wigner_3j, gaunt + +import scipy.special as special + +import aspcol.sphericalharmonics as sph +import aspcol.utilities as utils +import aspcol.filterdesign as fd + +import wigner +import wigners + +from aspsim.simulator import SimulatorSetup + +import matplotlib.pyplot as plt + + +# ========= GAUNT COEFFICIENTS ========= +def _gaunt_coef_ueno(l1, m1, l2, m2, l3): + """Gaunt coefficients + + Taken directly from https://github.com/sh01k/MeshRIR/blob/main/example/sf_func.py + """ + m3 = -m1 - m2 + l = int((l1 + l2 + l3) / 2) + + t1 = l2 - m1 - l3 + t2 = l1 + m2 - l3 + t3 = l1 + l2 - l3 + t4 = l1 - m1 + t5 = l2 + m2 + + tmin = max([0, max([t1, t2])]) + tmax = min([t3, min([t4, t5])]) + + t = np.arange(tmin, tmax+1) + gl_tbl = np.array(special.gammaln(np.arange(1, l1+l2+l3+3))) + G = np.sum( (-1.)**t * np.exp( -np.sum( gl_tbl[np.array([t, t-t1, t-t2, t3-t, t4-t, t5-t])] ) \ + +np.sum( gl_tbl[np.array([l1+l2-l3, l1-l2+l3, -l1+l2+l3, l])] ) \ + -np.sum( gl_tbl[np.array([l1+l2+l3+1, l-l1, l-l2, l-l3])] ) \ + +np.sum( gl_tbl[np.array([l1+m1, l1-m1, l2+m2, l2-m2, l3+m3, l3-m3])] ) * 0.5 ) ) \ + * (-1.)**( l + l1 - l2 - m3) * np.sqrt( (2*l1+1) * (2*l2+1) * (2*l3+1) / (4*np.pi) ) + return G + + +def gaunt_sympy(l1, l2, l3, m1, m2, m3): + f1 = np.sqrt((2*l1+1)*(2*l2+1)*(2*l3+1)/(4*np.pi)) + f2 = wigner_3j(l1,l2,l3,m1,m2,m3) + f3 = wigner_3j(l1,l2,l3,0,0,0) + return f1*f2*f3 + + +def test_gaunt_coef_dev(): + max_order = 3 + + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + #all_params = all_params[7:8] + + val = [] + val_ref = [] + for i in range(all_params.shape[0]): + args = all_params[i] + val.append(sph.gaunt_coefficient(*args)) + val_ref.append(_gaunt_coef_ueno(*args)) + + val_ref = np.array(val_ref) + val = np.array(val) + assert np.allclose(val, val_ref) + + +def test_gaunt_coef(): + max_order = 3 + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + val = sph.gaunt_coefficient(all_params[:, 0], all_params[:, 1], all_params[:, 2], all_params[:, 3], all_params[:, 4]) + + val_ref = [] + for i in range(all_params.shape[0]): + args = all_params[i] + val_ref.append(_gaunt_coef_ueno(*args)) + val_ref = np.array(val_ref) + assert np.allclose(val, val_ref) + + +def test_gaunt_coef_lookup(): + max_order = 6 + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + + tbl = sph.gaunt_coef_lookup(max_order) + val = tbl[all_params[:, 0], all_params[:, 1], all_params[:, 2], all_params[:, 3], all_params[:, 4]] + + val_ref = [] + for i in range(all_params.shape[0]): + args = all_params[i] + val_ref.append(_gaunt_coef_ueno(*args)) + val_ref = np.array(val_ref) + assert np.allclose(val, val_ref) + + +# ============ GAUNT AND WIGNER TESTS ============ + +def _linearity_formula_lower_limit(l1, l2, m1, m2): + """ Lower limit of the sum in the linearity formula for the Gaunt coefficient + described in (3.74) in Multiple scattering: Interaction of time-harmonic waves with N obstacles. + + func(l1, l2, m1, m2) is equivalent to func(l1, l1, m2, m1) + """ + if np.abs(l1 - l2) >= np.abs(m1 + m2): + return np.abs(l1 - l2) + elif np.abs(l1 - l2) < np.abs(m1 + m2) and ((l1 + l2 + np.abs(m1 + m2)) % 2 == 0): + return np.abs(m1 + m2) + elif np.abs(l1 - l2) < np.abs(m1 + m2) and ((l1 + l2 + np.abs(m1 + m2)) % 2 == 1): + return np.abs(m1 + m2) + 1 + else: + raise ValueError("Something went wrong") + +def test_linearisation_formula_for_spherical_harmonics(): + """ + Equation (3.70) in Multiple scattering: Interaction of time-harmonic waves with N obstacles + Relates the Gaunt coefficient to the product of two spherical harmonics. A simpler expression + for the same thing is given between (3.74) and (3.75) in the same book. + """ + rng = np.random.default_rng() + direction = rng.uniform(-1, 1, size = (1,3)) + radius, angles = utils.cart2spherical(direction) + + max_order = 5 + for l1, l2 in it.product(range(max_order+1), range(max_order+1)): + for m1, m2, in it.product(range(-l1, l1+1), range(-l2, l2+1)): + q0 = _linearity_formula_lower_limit(l1, l2, m1, m2) + Q = (l1 + l2 - q0) / 2 + assert utils.is_integer(Q) + Q = int(Q) + + sum_val = 0 + for l3 in range(Q+1): + g = sph.gaunt_coefficient(l1, m1, l2, m2, q0 + 2*l3) + sum_val += g * special.sph_harm(m1+m2, q0 + 2*l3, angles[0,0], angles[0,1]) + + comparison_val = special.sph_harm(m1, l1, angles[0,0], angles[0,1]) * special.sph_harm(m2, l2, angles[0,0], angles[0,1]) + assert np.allclose(sum_val, comparison_val) + +def test_relationship_between_gaunt_and_triple_harmonic_integral(): + max_order = 5 + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + num_param_sets = all_params.shape[0] + assert num_param_sets > 0 + + for i in range(num_param_sets): + (l1, m1, l2, m2, l3) = all_params[i] + val1 = sph.triple_harmonic_integral(l1, l2, l3, m1, m2, -m1-m2) + val2 = (-1)**(m1+m2) * sph.gaunt_coefficient(l1, m1, l2, m2, l3) + assert np.allclose(val1, val2) + + +def test_triple_harmonic_integral_against_sympy(): + max_order = 3 + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + num_param_sets = all_params.shape[0] + assert num_param_sets > 0 + + for i in range(num_param_sets): + (l1, m1, l2, m2, l3) = all_params[i] + val1 = sph.triple_harmonic_integral(l1, l2, l3, m1, m2, -m1-m2) + val2 = float(gaunt(l1, l2, l3, m1, m2, -m1-m2).n(16)) + assert np.allclose(val1, val2) + +def test_all_wigner_3j_implementations_are_equal(): + max_order = 8 + all_params = np.array([i for i in sph._all_gaunt_parameters(max_order)]) + num_param_sets = all_params.shape[0] + assert num_param_sets > 0 + + for i in range(num_param_sets): + (l1, m1, l2, m2, l3) = all_params[i] + + m2_min, m2_max, wigner_val = wigner.wigner_3jm(l1,l2,l3,m1) + val1 = wigner_val[m2 - int(m2_min)] + + val2 = wigners.wigner_3j(l1,l2,l3,m1,m2,-m1-m2) + val3 = float(wigner_3j(l1,l2,l3,m1,m2,-m1-m2).n(16)) + assert np.allclose(val1, val2) + assert np.allclose(val1, val3) + + + +# ============ TRANSLATION TESTS ============ +def test_translated_shd_coeffs_are_similar_to_directly_estimated_shd_coeffs(): + sr = 1000 + reg = 1e-7 + max_order = 3 + center1 = np.zeros((1,3)) + center2 = np.ones((1,3)) * 0.5 + + pos1, p1, freqs, sim_info = _generate_soundfield_data_omni(sr, center1) + pos2, p2, freqs, sim_info = _generate_soundfield_data_omni(sr, center2) + wave_num = 2 * np.pi * freqs / sim_info.c + + shd_coeffs1 = sph.inf_dimensional_shd_omni(p1, pos1, center1, max_order, wave_num, reg) + shd_coeffs2 = sph.inf_dimensional_shd_omni(p2, pos2, center1, max_order, wave_num, reg) + + shd_coeffs2_est = sph.translate_shd_coeffs(shd_coeffs1, center2-center1, wave_num, max_order, max_order) + + fig, axes = plt.subplots(1, 4, figsize=(20,6)) + axes[0].plot(10*np.log10(np.mean(np.abs(shd_coeffs2 - np.squeeze(shd_coeffs2_est))**2, axis=-1))) + axes[0].set_ylabel("Mean square error (dB)") + axes[0].set_xlabel("Frequency (Hz)") + + axes[1].plot(10*np.log10(np.mean(np.abs(shd_coeffs2 - np.squeeze(shd_coeffs2_est))**2, axis=0) / np.mean(np.abs(shd_coeffs2)**2, axis=0))) + axes[1].set_ylabel("Mean square error (dB)") + axes[1].set_xlabel("Frequency (Hz)") + + axes[2].plot(np.real(shd_coeffs2_est[:,0]), label="estimated") + axes[2].plot(np.real(shd_coeffs2[:,0]), label="recorded") + axes[2].set_title("Real part") + # axes[2].plot(np.imag(shd_coeffs2_est), label="estimated") + # axes[2].plot(np.imag(shd_coeffs2), label="recorded") + # axes[2].set_title("Imag part") + axes[3].plot(np.abs(shd_coeffs2_est[:,0]), label="estimated") + axes[3].plot(np.abs(shd_coeffs2[:,0]), label="recorded") + axes[3].set_title("Magnitude") + for ax in axes: + ax.legend() + plt.show() + + + + + +def test_translation_operator_against_uenos_implementation(): + """ + It looks like Ueno's implementation might not be correct. + It does not satisfy the additivity property well. + """ + rng = np.random.default_rng() + pos = rng.normal(size = (1,3)) + wave_num = np.array([2 * np.pi * 1000 / 343]) + max_order_input = 1 + max_order_output = 1 + + T_ref = sph.translation(pos, wave_num, max_order_input, max_order_output) + T = sph.translation_operator(pos, wave_num, max_order_input, max_order_output) + pass + +def show_translation_operator_addition_property(): + rng = np.random.default_rng(123456) + pos1 = rng.normal(size = (1,3)) + pos2 = rng.normal(size = (1,3)) + pos_added = pos1 + pos2 + wave_num = np.array([2 * np.pi * 1000 / 343]) + max_order_input = 2 + max_order_mid = 20 + max_order_output = 2 + + T1 = sph.translation_operator(pos1, wave_num, max_order_input, max_order_mid) + T2 = sph.translation_operator(pos2, wave_num, max_order_mid, max_order_output) + Tadded = sph.translation_operator(pos_added, wave_num, max_order_input, max_order_output) + T = T2 @ T1 + + fig, axes = plt.subplots(1, 3, figsize=(14,4)) + clr = axes[0].matshow(np.squeeze(np.abs(T))) + plt.colorbar(clr, ax = axes[0]) + clr = axes[1].matshow(np.squeeze(np.real(T))) + plt.colorbar(clr, ax = axes[1]) + clr = axes[2].matshow(np.squeeze(np.imag(T))) + plt.colorbar(clr, ax = axes[2]) + + fig, axes = plt.subplots(1, 3, figsize=(14,4)) + clr = axes[0].matshow(np.squeeze(np.abs(Tadded))) + plt.colorbar(clr, ax = axes[0]) + clr = axes[1].matshow(np.squeeze(np.real(Tadded))) + plt.colorbar(clr, ax = axes[1]) + clr = axes[2].matshow(np.squeeze(np.imag(Tadded))) + plt.colorbar(clr, ax = axes[2]) + plt.show() + + +def test_translation_operator_addition_property_as_function_of_max_order_mid(): + rng = np.random.default_rng(123456) + pos1 = rng.normal(size = (1,3)) + pos2 = rng.normal(size = (1,3)) + pos_added = pos1 + pos2 + freq = rng.uniform(100, 1000) + wave_num = np.array([2 * np.pi * freq / 343]) + max_order_input = 2 + max_order_output = 2 + Tadded = sph.translation_operator(pos_added, wave_num, max_order_input, max_order_output) + norm_factor = np.mean(np.abs(Tadded)**2) + + max_order_mid = [i for i in range(2, 21, 2)] + mse = [] + for mom in max_order_mid: + T1 = sph.translation_operator(pos1, wave_num, max_order_input, mom) + T2 = sph.translation_operator(pos2, wave_num, mom, max_order_output) + T = T2 @ T1 + + mse.append(10 * np.log10(np.mean(np.abs(T - Tadded)**2) / norm_factor)) + # Uncomment to see how the mse evolve with increasing max_order_mid + # plt.plot(max_order_mid, mse) + # plt.xlabel("max_order_mid") + # plt.ylabel("MSE (dB)") + # plt.show() + assert all([mse[i] >= mse[i+1] for i in range(len(mse)-1)]) + + +def test_hermitian_translation_operator_is_negative_argument(): + rng = np.random.default_rng() + pos = rng.normal(size = (1,3)) + wave_num = np.array([2 * np.pi * 1000 / 343]) + max_order_input = 5 + max_order_output = 5 + + T = np.squeeze(sph.translation_operator(pos, wave_num, max_order_input, max_order_output)) + T2 = np.squeeze(sph.translation_operator(-pos, wave_num, max_order_input, max_order_output)) + + assert np.allclose(T, T2.conj().T) + +def test_translation_operator_with_zero_arguments_is_identity(): + pos = np.zeros((1,3)) + wave_num = np.array([2 * np.pi * 1000 / 343]) + max_order_input = 5 + max_order_output = 5 + T = np.squeeze(sph.translation_operator(pos, wave_num, max_order_input, max_order_output)) + assert np.allclose(T, np.eye(T.shape[0])) + +def test_translation_operator_with_one_zero_order_is_basis_function(): + """Identity given as (i) on page 88 in P. Martin's Multiple scattering: Interaction of + time-harmonic waves with N obstacles + """ + rng = np.random.default_rng() + pos = rng.normal(size=(1,3)) + freq = rng.uniform(100, 1000) + wave_num = np.array([2 * np.pi * freq / 343]) + max_order_output = 5 + T = np.squeeze(sph.translation_operator(pos, wave_num, 0, max_order_output)) + + orders, degrees = sph.shd_num_degrees_vector(max_order_output) + basis_vals = np.squeeze(sph.shd_basis(pos, orders, degrees, wave_num)) + assert np.allclose(basis_vals, T) + + + +# ============ DIRECTIVITY TESTS ============ +def test_directivity_linear_is_cardioid_for_a_equals_one_half(): + dir_coeffs = sph.directivity_linear(0.5, np.array([[0,1,0]])) + + num_angles = 50 + angles = np.zeros((num_angles, 2)) + angles[:, 0] = np.linspace(0, 2*np.pi, num_angles) + angles[:, 1] = np.pi / 2 + pos = utils.spherical2cart(np.ones((num_angles,)), angles) + wave_num = np.ones((1,)) + + p = sph.reconstruct_pressure(dir_coeffs, pos, np.zeros((1,3)), wave_num) + + fig, ax = plt.subplots(1,1, figsize=(8,6), subplot_kw={'projection': 'polar'}) + ax.plot(angles[:,0], np.abs(p[0,:])**2) + plt.show() + + + +def test_directivity_coefficients_times_harmonic_coefficients_is_microphone_signal(): + dir_dir = np.array([[0,1,0]]) + pos_omni, p_omni, p_dir, freqs, sim_info= _generate_soundfield_data_dir_center(1000, dir_dir) + wave_num = 2 * np.pi * freqs / sim_info.c + + shd_coeffs = sph.inf_dimensional_shd_omni(p_omni, pos_omni, np.zeros((1,3)), 1, wave_num, 1e-9) + + #sph.apply_measurement_omni(shd_coeffs, np.zeros((1,3)), np.zeros((1,3)), 1, freqs) + + dir_coeffs = sph.directivity_linear(0.5, dir_dir) + + p_est = np.sum(np.conj(dir_coeffs) * shd_coeffs, axis=-1) + + # fig, axes = plt.subplots(1, 4, figsize=(20,6)) + # axes[0].plot(10*np.log10(np.abs(p_est - np.squeeze(p_dir))**2)) + # axes[0].set_title("Mean square error (dB)") + # axes[1].plot(np.real(p_est), label="estimated") + # axes[1].plot(np.real(p_dir), label="recorded") + # axes[1].set_title("Real part") + # axes[2].plot(np.imag(p_est), label="estimated") + # axes[2].plot(np.imag(p_dir), label="recorded") + # axes[2].set_title("Imag part") + # axes[3].plot(np.abs(p_est), label="estimated") + # axes[3].plot(np.abs(p_dir), label="recorded") + # axes[3].set_title("Magnitude") + # for ax in axes: + # ax.legend() + # plt.show() + assert 10*np.log10(np.mean(np.abs(p_est - np.squeeze(p_dir))**2)) < -30 + + + +def test_estimated_shd_coeffs_from_omni_and_cardioid_are_similar(): + dir_dir = np.array([[0,1,0]]) + pos_omni, p_omni, p_dir, freqs, sim_info= _generate_soundfield_data_dir_center(1000, dir_dir) + wave_num = 2 * np.pi * freqs / sim_info.c + + shd_coeffs = sph.inf_dimensional_shd_omni(p_omni, pos_omni, np.zeros((1,3)), 1, wave_num, 1e-9) + + #sph.apply_measurement_omni(shd_coeffs, np.zeros((1,3)), np.zeros((1,3)), 1, freqs) + + dir_coeffs = sph.directivity_linear(0.5, dir_dir) + + p_est = np.sum(np.conj(dir_coeffs) * shd_coeffs, axis=-1) + assert False + + + + + + + + +def _generate_soundfield_data_omni(sr, exp_center = np.array([[0,0,0]])): + rng = np.random.default_rng(10) + side_len = 0.4 + num_mic = 100 + + #pos_mic = np.zeros((num_mic, 3)) + pos_mic = rng.uniform(low=-side_len/2, high=side_len/2, size=(num_mic, 3)) + pos_mic += exp_center + + pos_src = np.array([[3,0.05,-0.05]]) + + setup = SimulatorSetup() + setup.sim_info.samplerate = sr + + setup.sim_info.tot_samples = sr + setup.sim_info.export_frequency = setup.sim_info.tot_samples + setup.sim_info.reverb = "ism" + setup.sim_info.room_size = [7, 4, 2] + setup.sim_info.room_center = [2, 0.1, 0.1] + setup.sim_info.rt60 = 0.25 + setup.sim_info.max_room_ir_length = sr // 2 + setup.sim_info.array_update_freq = 1 + setup.sim_info.randomized_ism = False + setup.sim_info.auto_save_load = False + setup.sim_info.sim_buffer = sr // 2 + setup.sim_info.extra_delay = 40 + setup.sim_info.plot_output = "none" + + setup.add_mics("omni", pos_mic) + setup.add_controllable_source("src", pos_src) + + sim = setup.create_simulator() + + num_freqs = 512 + freqs = fd.get_real_freqs(num_freqs, sr) + num_real_freqs = freqs.shape[0] + + fpaths = {} + for src, mic, path in sim.arrays.iter_paths(): + fpaths.setdefault(src.name, {}) + fpaths[src.name][mic.name] = np.moveaxis(np.moveaxis(np.fft.fft(path, n=num_freqs), -1, 0),1,2)[:num_real_freqs,...] + + return sim.arrays["omni"].pos, fpaths["src"]["omni"][...,0], freqs, sim.sim_info + + + + +def _generate_soundfield_data_dir_center(sr, directivity_dir): + rng = np.random.default_rng(10) + side_len = 0.4 + num_mic = 20 + + #pos_mic = np.zeros((num_mic, 3)) + pos_mic = rng.uniform(low=-side_len/2, high=side_len/2, size=(num_mic, 3)) + + pos_src = np.array([[3,0.05,-0.05]]) + + setup = SimulatorSetup() + setup.sim_info.samplerate = sr + + setup.sim_info.tot_samples = sr + setup.sim_info.export_frequency = setup.sim_info.tot_samples + setup.sim_info.reverb = "ism" + setup.sim_info.room_size = [7, 4, 2] + setup.sim_info.room_center = [2, 0.1, 0.1] + setup.sim_info.rt60 = 0.25 + setup.sim_info.max_room_ir_length = sr // 2 + setup.sim_info.array_update_freq = 1 + setup.sim_info.randomized_ism = False + setup.sim_info.auto_save_load = False + setup.sim_info.sim_buffer = sr // 2 + setup.sim_info.extra_delay = 40 + setup.sim_info.plot_output = "none" + + setup.add_mics("omni", pos_mic) + setup.add_mics("dir", np.zeros((1,3)), directivity_type=["cardioid"], directivity_dir=directivity_dir) + setup.add_controllable_source("src", pos_src) + + sim = setup.create_simulator() + + num_freqs = 512 + freqs = fd.get_real_freqs(num_freqs, sr) + num_real_freqs = freqs.shape[0] + + fpaths = {} + for src, mic, path in sim.arrays.iter_paths(): + fpaths.setdefault(src.name, {}) + fpaths[src.name][mic.name] = np.moveaxis(np.moveaxis(np.fft.fft(path, n=num_freqs), -1, 0),1,2)[:num_real_freqs,...] + + return sim.arrays["omni"].pos, fpaths["src"]["omni"][...,0], fpaths["src"]["dir"][...,0], freqs, sim.sim_info