-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgenerate_anomaly_model.py
108 lines (81 loc) · 3.01 KB
/
generate_anomaly_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from keras.callbacks import ModelCheckpoint
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
import matplotlib.pyplot as plt
from skimage.io import imread
import numpy as np
import random
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
ANOMALY_PATH = os.path.join('training', 'anomaly')
NOISE_PATH = os.path.join('training', 'noise')
MODEL_PATH = os.path.join('models', 'anomaly_model.h5')
CHUNK_SIZE = 500
BATCH_SIZE = 6
EPOCHS = 18
input_shape = (720, 1280, 3)
def chunks(l, n):
"""Generator for generating chunks of a list."""
for i in range(0, len(l), n):
yield l[i:i + n]
def main():
model = Sequential()
model.add(Conv2D(32, (3, 3), input_shape=input_shape))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(32, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(64, (3, 3)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='rmsprop',
metrics=['accuracy'])
train_images = {}
for fn in os.listdir(ANOMALY_PATH):
if fn.endswith('.jpg'):
train_images.update({os.path.join(ANOMALY_PATH, fn) : 1})
for fn in os.listdir(NOISE_PATH):
if fn.endswith('.jpg'):
train_images.update({os.path.join(NOISE_PATH, fn) : 0})
filenames = list(train_images.keys())
random.shuffle(filenames) # Randomize Data
def generate_data(chunk_size):
"""
Loads data into memory chunk by chunk.
Parameters
----------
chunk_size : int
The size of each chunk to generate
Yields
------
Tuple
A tuple (X, y) containing a single chunk of training data
"""
for chunk in chunks(filenames, chunk_size):
X = np.array([imread(fn) for fn in chunk]) / 255
y = np.array([train_images[fn] for fn in chunk])
yield (X, y)
last_history = None
for X, y in generate_data(CHUNK_SIZE): # Fit the model w/each chunk and save
checkpoint = ModelCheckpoint(os.path.join('models', MODEL_PATH),
monitor='val_loss',
verbose=0,
save_best_only=True)
history = model.fit(X, y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=.2, shuffle=True, callbacks=[checkpoint])
last_history = (history.history['loss'], history.history['val_loss'])
plt.legend(['TrainLoss', 'TestLoss'])
plt.plot(last_history[0])
plt.plot(last_history[1])
plt.show()
if __name__ == '__main__':
main()