-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1084635
commit 4d2e16d
Showing
18 changed files
with
1,516 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
|
||
import numpy as np | ||
|
||
|
||
def allp_delayfilt(tau): | ||
''' | ||
produces a Fractional-delay All-pass Filter | ||
Arguments:tau = fractional delay in samples. When 'tau' is a float - sinc fu | ||
nction. When 'tau' is an integer - just impulse. | ||
type of tau: float or int | ||
:return: | ||
a: Denumerator of the transfer function | ||
b: Numerator of the transfer function | ||
''' | ||
#L = max(1,int(tau)+1) with the +1 the max doesn't make sense anymore | ||
L = int(tau)+1 | ||
n = np.arange(0,L) | ||
# print("n", n) | ||
|
||
a_0 = np.array([1.0]) | ||
a = np.array(np.cumprod( np.divide(np.multiply((L - n), (L - n - tau)) , (np | ||
.multiply((n + 1), (n + 1 + tau))) ) )) | ||
a = np.append(a_0, a) # Denumerator of the transfer function | ||
# print("Denumerator of the transfer function a:", a) | ||
|
||
b = np.flipud(a) # Numerator of the transfer function | ||
# print("Numerator of the transfer function b:", b) | ||
|
||
return a, b | ||
|
||
if __name__ == '__main__': | ||
#testing the fractional delay allpass filter | ||
import matplotlib.pyplot as plt | ||
import scipy.signal as sp | ||
#fractional delay of 5.5 samples: | ||
a,b=allp_delayfilt(5.5) | ||
print("a=",a,"b=",b) | ||
x=np.hstack((np.arange(4),np.zeros(8))) | ||
y=sp.lfilter(b,a,x) #applying the allpass filter | ||
plt.plot(x) | ||
plt.plot(y) | ||
plt.xlabel('Sample') | ||
plt.title('The IIR Fractional Delay Filter Result') | ||
plt.legend(('Original Signal', 'Delayed Signal')) | ||
plt.show() | ||
#Plot frequency response: | ||
import freqz | ||
freqz.freqz(b,a) #omega=0.5: angle(H(0.5))=-2.8 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Module for show impulse rsponse answer | ||
# Julia Peter, Mathias Kuntze | ||
#Modified, Gerald Schuller, Nov. 2016 | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
from scipy import signal as sp | ||
|
||
def freqz(b, a=1, whole = False, axisFreqz = None, axisPhase = None): | ||
|
||
w, h = sp.freqz(b, a, worN=512, whole=whole) | ||
#w = w/np.pi | ||
fig = plt.figure() | ||
plt.title('Digital filter frequency response') | ||
plt.subplot(2,1,1) | ||
|
||
plt.plot(w, 20 * np.log10(abs(h)), 'b') | ||
plt.ylabel('Amplitude (dB)') | ||
plt.xlabel('Normalized Frequency') | ||
plt.grid() | ||
if axisFreqz is not None: | ||
plt.axis(axisFreqz) | ||
|
||
plt.subplot(2,1,2) | ||
#angles = np.unwrap(np.angle(h)) | ||
angles = np.angle(h) | ||
plt.plot(w, angles, 'g') | ||
plt.ylabel('Angle (radians)') | ||
plt.xlabel('Normalized Frequency') | ||
plt.grid() | ||
|
||
if axisPhase is not None: | ||
plt.axis(axisPhase) | ||
|
||
plt.show() | ||
return h |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
#%% LMS | ||
import numpy as np | ||
from sound import * | ||
import matplotlib.pyplot as plt | ||
|
||
x, fs = wavread('fspeech.wav') | ||
#normalized float, -1<x<1 | ||
x = np.array(x,dtype=float)/2**15 | ||
print(np.size(x)) | ||
e = np.zeros(np.size(x)) | ||
|
||
h = np.zeros(10) | ||
#have same 0 starting values as in decoder: | ||
x[0:10]=0.0 | ||
for n in range(10, len(x)): | ||
if n> 4000 and n< 4010: | ||
print("encoder h: ", h) | ||
#prediction error and filter, using the vector of the time reversed IR: | ||
e[n] = x[n] - np.dot(np.flipud(x[n-10+np.arange(0,10)]), h) | ||
#LMS update rule, according to the definition above: | ||
h = h + 1.0* e[n]*np.flipud(x[n-10+np.arange(0,10)]) | ||
|
||
print("Mean squared prediction error:", np.dot(e, e) /np.max(np.size(e))) | ||
#0.000215852452838 | ||
print("Compare that with the mean squared signal power:", np.dot(x.transpose(),x)/np.max(np.size(x))) | ||
#0.00697569381701 | ||
print("The Signal to Error ratio is:", np.dot(x.transpose(),x)/np.dot(e.transpose(),e)) | ||
#32.316954129056604, half as much as for LPC. | ||
|
||
#listen to it: | ||
sound(2**15*e, fs) | ||
|
||
plt.figure() | ||
plt.plot(x) | ||
#plt.hold(True) | ||
plt.plot(e,'r') | ||
plt.xlabel('Sample') | ||
plt.ylabel('Normalized Sample') | ||
plt.title('Least Mean Squares (LMS) Online Adaptation') | ||
plt.legend(('Original','Prediction Error')) | ||
plt.show() | ||
|
||
|
||
# Decoder | ||
h = np.zeros(10); | ||
xrek = np.zeros(np.size(x)); | ||
for n in range(10, len(x)): | ||
if n> 4000 and n< 4010: | ||
print("decoder h: ", h) | ||
xrek[n] = e[n] + np.dot(np.flipud(xrek[n-10+np.arange(10)]), h) | ||
#LMS update: | ||
h = h + 1.0 * e[n]*np.flipud(xrek[n-10+np.arange(10)]); | ||
|
||
|
||
plt.plot(xrek) | ||
plt.xlabel('Sample') | ||
plt.ylabel('Normalized Sample') | ||
plt.title('Reconstructed Signal') | ||
plt.show() | ||
#Listen to the reconstructed signal: | ||
sound(2**15*xrek,fs) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
#%% LMS | ||
import numpy as np | ||
from sound import * | ||
import matplotlib.pyplot as plt | ||
|
||
x, fs = wavread('fspeech.wav') | ||
#normalized float, -1<x<1 | ||
x = np.array(x,dtype=float)/2**15 | ||
print(np.size(x)) | ||
e = np.zeros(np.size(x)) | ||
xrek=np.zeros(np.size(x)); | ||
P=0; | ||
L=10 | ||
h = np.zeros(L) | ||
#have same 0 starting values as in decoder: | ||
x[0:L]=0.0 | ||
quantstep=0.01; | ||
for n in range(L, len(x)): | ||
if n> 4000 and n< 4010: | ||
print("encoder h: ", h, "e=", e) | ||
#prediction error and filter, using the vector of the time reversed IR: | ||
#predicted value from past reconstructed values: | ||
P=np.dot(np.flipud(xrek[n-L+np.arange(L)]), h) | ||
#quantize and de-quantize e to step-size 0.05 (mid tread): | ||
e[n]=np.round((x[n]-P)/quantstep)*quantstep; | ||
#Decoder in encoder: | ||
#new reconstructed value: | ||
xrek[n]=e[n]+P; | ||
#LMS update rule: | ||
h = h + 1.0* e[n]*np.flipud(xrek[n-L+np.arange(L)]) | ||
|
||
print("Mean squared prediction error:", np.dot(e, e) /np.max(np.size(e))) | ||
#without quant.: 0.000215852452838 | ||
#with quant. with 0.01 : 0.000244936708861 | ||
#0.00046094397241 | ||
#quant with 0.0005: 0.000215872774695 | ||
print("Compare that with the mean squared signal power:", np.dot(x.transpose(),x)/np.max(np.size(x))) | ||
print("The Signal to Error ratio is:", np.dot(x.transpose(),x)/np.dot(e.transpose(),e)) | ||
#The Signal to Error ratio is: 28.479576824, a little less than without quant. | ||
#listen to it: | ||
sound(2**15*e, fs) | ||
|
||
plt.figure() | ||
plt.plot(x) | ||
#plt.hold(True) | ||
plt.plot(e,'r') | ||
plt.xlabel('Sample') | ||
plt.ylabel('Normalized Sample') | ||
plt.title('Least Mean Squares (LMS) Online Adaptation') | ||
plt.legend(('Original','Prediction Error')) | ||
plt.show() | ||
|
||
# Decoder | ||
h = np.zeros(L); | ||
xrek = np.zeros(np.size(x)); | ||
for n in range(L, len(x)): | ||
if n> 4000 and n< 4010: | ||
print("decoder h: ", h) | ||
P=np.dot(np.flipud(xrek[n-L+np.arange(L)]), h) | ||
xrek[n] = e[n] + P | ||
#LMS update: | ||
h = h + 1.0 * e[n]*np.flipud(xrek[n-L+np.arange(L)]); | ||
|
||
plt.plot(xrek) | ||
plt.xlabel('Sample') | ||
plt.ylabel('Normalized Sample') | ||
plt.title('The Reconstructed Signal') | ||
plt.show() | ||
|
||
#Listen to the reconstructed signal: | ||
sound(2**15*xrek,fs) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import numpy as np | ||
from sound import * | ||
import matplotlib.pyplot as plt | ||
import scipy.signal as sp | ||
|
||
x, fs = wavread('fspeech.wav'); | ||
#convert to float array type, normalize to -1<x<1: | ||
x = np.array(x,dtype=float)/2**15 | ||
print(np.size(x)) | ||
sound(2**15*x,fs) | ||
|
||
L=10 #predictor lenth | ||
len0 = np.max(np.size(x)) | ||
e = np.zeros(np.size(x)) #prediction error variable initialization | ||
blocks = np.int(np.floor(len0/640)) #total number of blocks | ||
state = np.zeros(L) #Memory state of prediction filter | ||
#Building our Matrix A from blocks of length 640 samples and process: | ||
h=np.zeros((blocks,L)) #initialize pred. coeff memory | ||
|
||
for m in range(0,blocks): | ||
A = np.zeros((640-L,L)) #trick: up to 630 to avoid zeros in the matrix | ||
for n in range(0,640-L): | ||
A[n,:] = np.flipud(x[m*640+n+np.arange(L)]) | ||
|
||
#Construct our desired target signal d, one sample into the future: | ||
d=x[m*640+np.arange(L,640)]; | ||
#Compute the prediction filter: | ||
h[m,:] = np.dot(np.dot(np.linalg.inv(np.dot(A.transpose(),A)), A.transpose()), d) | ||
hperr = np.hstack([1, -h[m,:]]) | ||
e[m*640+np.arange(0,640)], state = sp.lfilter(hperr,[1],x[m*640+np.arange(0,640)], zi=state) | ||
|
||
|
||
#The mean-squared error now is: | ||
print("The average squared error is:", np.dot(e.transpose(),e)/np.max(np.size(e))) | ||
#The average squared error is: 0.000113347859337 | ||
#We can see that this is only about 1 / 4 of the previous pred. Error! | ||
print("Compare that with the mean squared signal power:", np.dot(x.transpose(),x)/np.max(np.size(x))) | ||
#0.00697569381701 | ||
print("The Signal to Error ratio is:", np.dot(x.transpose(),x)/np.dot(e.transpose(),e)) | ||
#61.5423516403 | ||
#So our LPC pred err energy is more than a factor of 61 smaller than the | ||
#signal energy! | ||
#Listen to the prediction error: | ||
sound(2**15*e,fs) | ||
#Take a look at the signal and it's prediction error: | ||
plt.figure() | ||
plt.plot(x) | ||
#plt.hold(True) | ||
plt.plot(e,'r') | ||
plt.xlabel('Sample') | ||
plt.ylabel('Normalized Value') | ||
plt.legend(('Original','Prediction Error')) | ||
plt.title('LPC Coding') | ||
plt.show() | ||
|
||
#Decoder: | ||
xrek=np.zeros(x.shape) #initialize reconstructed signal memory | ||
state = np.zeros(L) #Initialize Memory state of prediction filter | ||
for m in range(0,blocks): | ||
hperr = np.hstack([1, -h[m,:]]) | ||
#predictive reconstruction filter: hperr from numerator to denominator: | ||
xrek[m*640+np.arange(0,640)] , state = sp.lfilter([1], hperr,e[m*640+np.arange(0,640)], zi=state) | ||
|
||
#Listen to the reconstructed signal: | ||
sound(2**15*xrek,fs) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
""" | ||
PyAudio Example: Filter the audio signal between input and output (i.e., record a | ||
few samples, filter them, and play them back immediately). | ||
Gerald Schuller, November 2014 | ||
""" | ||
|
||
import pyaudio | ||
import struct | ||
import math | ||
#import array | ||
import numpy as np | ||
import scipy.signal | ||
import matplotlib.pyplot as plt | ||
|
||
CHUNK = 1024 #Blocksize | ||
WIDTH = 2 #2 bytes per sample | ||
CHANNELS = 1 #2 | ||
RATE = 8000 #Sampling Rate in Hz | ||
RECORD_SECONDS = 8 | ||
|
||
N=64 | ||
bpass=scipy.signal.remez(N, [0.0, 0.05, 0.1, 0.2, 0.3, 0.5] , [0.0, 1.0, 0.0], weight=[100.0, 1.0, 100.0]) | ||
|
||
#fig = plt.figure() | ||
[freq, response] = scipy.signal.freqz(bpass) | ||
plt.plot(freq, 20*np.log10(np.abs(response)+1e-6)) | ||
plt.xlabel('Normalized Frequency (pi is Nyquist Frequency)') | ||
plt.ylabel("Magnitude of Frequency Response in dB") | ||
plt.title("Magnitude of Frequency Response for our Bandbass Filter") | ||
plt.show() | ||
|
||
plt.plot(bpass) | ||
plt.title('Impulse Response of our Bandpass Filter') | ||
plt.show() | ||
|
||
p = pyaudio.PyAudio() | ||
|
||
stream = p.open(format=p.get_format_from_width(WIDTH), | ||
channels=CHANNELS, | ||
rate=RATE, | ||
input=True, | ||
output=True, | ||
#input_device_index=10, | ||
frames_per_buffer=CHUNK) | ||
|
||
|
||
print("* recording") | ||
#initialize memory for filter: | ||
z=np.zeros(N-1) | ||
|
||
#Loop for the blocks: | ||
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): | ||
#Reading from audio input stream into data with block length "CHUNK": | ||
data = stream.read(CHUNK) | ||
#Convert from stream of bytes to a list of short integers (2 bytes here) in "samples": | ||
#shorts = (struct.unpack( "128h", data )) | ||
shorts = (struct.unpack( 'h' * CHUNK, data )); | ||
#samples=list(shorts); | ||
samples=np.array(list(shorts),dtype=float); | ||
#filter function: | ||
[filtered,z]=scipy.signal.lfilter(bpass, [1], samples, zi=z) | ||
filtered=np.clip(filtered, -32000,32000).astype(int) | ||
#converting from short integers to a stream of bytes in data: | ||
#comment this out to bypass filter: | ||
data=struct.pack('h' * len(filtered), *filtered); | ||
#Writing data back to audio output stream: | ||
stream.write(data, CHUNK) | ||
|
||
print("* done") | ||
|
||
stream.stop_stream() | ||
stream.close() | ||
|
||
p.terminate() | ||
|
Oops, something went wrong.