-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathModel.py
151 lines (129 loc) · 6.2 KB
/
Model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import tensorflow as tf
class Autoencoder:
def __init__(self, encoderDims, sparseInput=False, tiedWeights=False, denoise=False):
self.encoderDims = encoderDims
self.decoderDims = list(reversed(encoderDims))
self.sparseInput = sparseInput
self.tiedWeights = tiedWeights
self.denoise = denoise # Only works for greyscale image data
self.input = tf.placeholder(tf.float32, [None, encoderDims[0]])
self.learningRate = tf.placeholder(tf.float32, [])
self.activationFunction = tf.nn.sigmoid # Allow to be specified by user
# self.activationFunction = tf.tanh
# self.activationFunction = tf.nn.selu
self.lossFunction = tf.losses.mean_squared_error # Allow to be specified by user
self.SGD = tf.train.AdamOptimizer(self.learningRate) # Allow to be specified by user
if self.denoise:
self.__addNoise()
self.__buildNetwork() # Constructs Encoder & Decoder
self.__buildTensorFlowGraph() # Creates sequential TensorFlow operations
self.session = tf.Session()
self.session.run(tf.global_variables_initializer()) # Initialise weights & biases
self.saver = tf.train.Saver()
self.session.graph.finalize() # Avoids memory leaks through duplicating graph nodes
def __addNoise(self):
# Create a tensor of random numbers with unit variance
# Then sets pixels to black where values of random tensor > 1
# (i.e. all values outside the std dev -> ~32% of pixels)
random = tf.random_normal(tf.shape(self.input))
mask = tf.greater(random, 1.0)
self.noisyInput = tf.where(mask, tf.ones_like(self.input) * 255, self.input)
def __buildNetwork(self):
# Lists of weights and biases per layer of encoder and decoder
self.encoderWeights, self.encoderBiases = [], []
self.decoderWeights, self.decoderBiases = [], []
for layer in range(len(self.encoderDims) - 1):
self.encoderWeights.append(
tf.Variable(tf.random_normal([self.encoderDims[layer], self.encoderDims[layer + 1]]))
)
self.encoderBiases.append(
tf.Variable(tf.zeros([self.encoderDims[layer + 1]]))
)
# if layer != len(self.decoderDims) - 2: # NO BIAS IN OUTPUT LAYER ##################################
self.decoderBiases.append(
tf.Variable(tf.zeros([self.decoderDims[layer + 1]]))
)
if not self.tiedWeights:
self.decoderWeights.append(
tf.Variable(tf.random_normal([self.decoderDims[layer], self.decoderDims[layer + 1]]))
)
if self.tiedWeights:
self.decoderWeights = [tf.transpose(i) for i in reversed(self.encoderWeights)]
def __buildTensorFlowGraph(self):
self.encoded = self.encode() # Encoded/compressed data
self.decoded = self.decode() # Decoded/reconstructed data
self.loss = self.__calculateLoss()
self.train = self.SGD.minimize(self.loss)
def encode(self):
if self.denoise:
encoded = self.noisyInput
else:
encoded = self.input
for layer in range(len(self.encoderDims) - 1):
encoded = tf.matmul(encoded, self.encoderWeights[layer])
encoded = tf.add(encoded, self.encoderBiases[layer])
if layer != len(self.encoderDims) - 2:
encoded = self.activationFunction(encoded)
return encoded
def decode(self):
decoded = self.encoded
for layer in range(len(self.decoderDims) - 1):
decoded = tf.matmul(decoded, self.decoderWeights[layer])
decoded = tf.add(decoded, self.decoderBiases[layer]) # Bias in final layer????
if layer != len(self.decoderDims) - 2: # Keep output layer linear
decoded = self.activationFunction(decoded)
return decoded
def __calculateLoss(self):
if self.sparseInput:
nonZeros = tf.where(tf.greater(self.input, 0)) # Only calculates RMSE on
return tf.sqrt( # non-zero input values
self.lossFunction(
labels=tf.gather(self.input, nonZeros),
predictions=tf.gather(self.decoded, nonZeros)
)
)
else:
return self.lossFunction(labels=self.input, predictions=self.decoded)
def setBatch(self, input, learningRate=0.0):
self.batchDict = {
self.input: input,
self.learningRate: learningRate
}
def run(self, operations=None, train=False):
# Returns values of specified list of operations
# Trains network's parameters if specified
if not type(operations) is list:
operations = [operations]
if train:
ops = [self.train]
else:
ops = []
if operations is not None:
for op in operations:
if op == 'input':
ops.append(self.input)
if op == 'noisyInput':
ops.append(self.noisyInput)
if op == 'encoded':
ops.append(self.encoded)
if op == 'decoded':
ops.append(self.decoded)
if op == 'loss':
ops.append(self.loss)
if (train and len(ops) == 2) or (not train and len(ops) == 1):
return self.session.run(ops, self.batchDict)[-1]
elif train:
return self.session.run(ops, self.batchDict)[1:]
else:
return self.session.run(ops, self.batchDict)
def save(self, modelName="Autoencoder"):
modelName += '.ckpt'
dir = os.path.dirname(os.path.realpath(__file__)) + '/SavedModels/'
self.saver.save(self.session, dir + modelName)
def restore(self, modelName="Autoencoder"):
modelName += '.ckpt'
dir = os.path.dirname(os.path.realpath(__file__)) + '/SavedModels/'
self.saver.restore(self.session, dir + modelName)
def kill(self):
self.session.close()