-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecorder.py
162 lines (138 loc) · 5.81 KB
/
recorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Part of the code has been updated from
# http://www.swharden.com/wp/2013-05-09-realtime-fft-audio-visualization-with-python/
# Author: Scott W Harden: neuroscientist, dentist, molecular biologist, code monkey (2013)
import matplotlib
matplotlib.use('TkAgg') # <-- THIS MAKES IT FAST!
import numpy
import scipy
import struct
import pyaudio
import threading
import pylab
import struct
class SwhRecorder:
"""Simple, cross-platform class to record from the microphone."""
def __init__(self):
"""minimal garb is executed when class is loaded."""
self.BUFFERSIZE=2**13 #1024 is a good buffer size
self.secToRecord=.1
self.threadsDieNow=False
self.newAudio=False
# Set sampling rate (in Hz).
# Typical rates: {8000, 11025, 22050, 44100, 48100} Hz
self.RATE = 44100;
# Choose response type.
# Note: {'fast' = ~125 ms, 'slow' = ~1.0 s}
self.responseType = 'fast';
# Set calibration constant.
# Note: A quite location will be ~55 dBA.
self.C = 50;
def setup(self):
"""initialize sound card."""
#TODO - windows detection vs. alsa or something for linux
#TODO - try/except for sound card selection/initiation
if self.responseType=='slow':
self.secToRecord=1.0
else:
self.secToRecord= 0.125
self.buffersToRecord=int(self.RATE*self.secToRecord/self.BUFFERSIZE)
if self.buffersToRecord==0: self.buffersToRecord=1
self.samplesToRecord=int(self.BUFFERSIZE*self.buffersToRecord)
self.chunksToRecord=int(self.samplesToRecord/self.BUFFERSIZE)
self.secPerPoint=1.0/self.RATE
self.p = pyaudio.PyAudio()
self.inStream = self.p.open(format=pyaudio.paFloat32,channels=1,rate=self.RATE,input=True,frames_per_buffer=self.BUFFERSIZE)
self.xsBuffer=numpy.arange(self.BUFFERSIZE)*self.secPerPoint
self.xs=numpy.arange(self.chunksToRecord*self.BUFFERSIZE)*self.secPerPoint
self.audio=numpy.empty((self.chunksToRecord*self.BUFFERSIZE),dtype=numpy.float32)
def close(self):
"""cleanly back out and release sound card."""
self.p.close(self.inStream)
### RECORDING AUDIO ###
def getAudio(self):
"""get a single buffer size worth of audio."""
audioString=self.inStream.read(self.BUFFERSIZE)
return numpy.fromstring(audioString,dtype=numpy.float32)
def record(self,forever=True):
"""record secToRecord seconds of audio."""
while True:
if self.threadsDieNow: break
for i in range(self.chunksToRecord):
self.audio[i*self.BUFFERSIZE:(i+1)*self.BUFFERSIZE]=self.getAudio()
self.newAudio=True
if forever==False: break
def getRecord(self):
"""record secToRecord seconds of audio."""
for i in range(self.chunksToRecord):
self.audio[i*self.BUFFERSIZE:(i+1)*self.BUFFERSIZE]=self.getAudio()
rec=self.audio
return rec
def continuousStart(self):
"""CALL THIS to start running forever."""
self.t = threading.Thread(target=self.record)
self.t.start()
def continuousEnd(self):
"""shut down continuous recording."""
self.threadsDieNow=True
### MATH ###
def downsample(self,data,mult):
"""Given 1D data, return the binned average."""
overhang=len(data)%mult
if overhang: data=data[:-overhang]
data=numpy.reshape(data,(len(data)/mult,mult))
data=numpy.average(data,1)
return data
def fft(self,data=None):
if data==None:
data=self.audio.flatten()
left,right=numpy.split(numpy.abs(numpy.fft.fft(data)),2)
ys=numpy.add(left,right[::-1])
# Determine FFT frequencies.
xs = (1.*self.RATE/len(ys))*numpy.arange(0,len(ys))
return xs,ys
def level(self,data=None):
if data==None:
data=self.audio.flatten()
# Calculate magnitude of FFT.
X = numpy.abs(numpy.fft.fft(data))
# Add offset to prevent taking the log of zero.
X[X == 0] = 1e-17
# Retain frequencies below Nyquist rate.
f = (1.*self.RATE/len(X))*numpy.arange(0,len(X))
ind = f<self.RATE/2.; f = f[ind]; X = X[ind]
# Apply A-weighting filter.
A = self.filterA(f)
X = A*X
# Estimate dBA value using Parseval's relation.
totalEnergy = numpy.sum(X**2)/len(X)
meanEnergy = totalEnergy/((1./self.RATE)*len(X))
dBA = 10.*numpy.log10(meanEnergy)+self.C
# Estimate decibel level (for visualization).
X = 20.*numpy.log10(numpy.abs(X))
return f, X, dBA
def filterA(self,f):
# FILTERA Generates an A-weighting filter.
# FILTERA Uses a closed-form expression to generate
# an A-weighting filter for arbitary frequencies.
#
# Author: Douglas R. Lanman, 11/21/05
# Define filter coefficients.
# See: http://www.beis.de/Elektronik/AudioMeasure/
# WeightingFilters.html#A-Weighting
c1 = 3.5041384e16
c2 = 20.598997**2
c3 = 107.65265**2
c4 = 737.86223**2
c5 = 12194.217**2
# Evaluate A-weighting filter.
f[f == 0] = 1e-17;
f = f**2
num = c1*f**4
den = ((c2+f)**2)*(c3+f)*(c4+f)*((c5+f)**2)
A = num/den
return A
### VISUALIZATION ###
def plotAudio(self):
"""open a matplotlib popup window showing audio data."""
pylab.plot(self.audio.flatten())
pylab.show()