-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
314 lines (267 loc) · 14.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import os, sys, errno
import json
import numpy as np
from termcolor import cprint
import datetime
from sklearn.preprocessing import MinMaxScaler
import settings
### Utility functions to manipule numpy datasets
def normalize_vgg16_data(data):
data = data.astype('float64')
for col_index, mean in enumerate([103.939, 116.779, 123.68]):
data[:, col_index, :, :] -= mean
data = data.clip(-127.5, 127.5)
r, g, b = data[:,0,:,:], data[:,1,:,:], data[:,2,:,:]
data[:,0,:,:], data[:,1,:,:], data[:,2,:,:] = b, g, r
return data.astype('float32')
def denormalize_vgg16_data(data):
data = data.astype('float64')
b, g, r = data[:,0,:,:], data[:,1,:,:], data[:,2,:,:]
data[:,0,:,:], data[:,1,:,:], data[:,2,:,:] = r, g, b
for col_index, mean in enumerate([103.939, 116.779, 123.68]):
data[:, col_index, :, :] += mean
data = data.clip(0.0, 255.5)
return data.astype('uint8')
def normalize_data(data):
#scaler = MinMaxScaler(feature_range=(0, 1))
#return scaler.fit_transform(data.astype('float32').reshape(data.shape[0], -1)).reshape(data.shape)
if settings.MODEL == "vgg16":
return normalize_vgg16_data(data)
return (data.astype('float32')/256.0).clip(0.0, 1.0)
def denormalize_data(data):
#if data.dtype == 'uint8':
# return data
#unscaler = MinMaxScaler(feature_range=(0, 256))
#return unscaler.fit_transform(data.reshape(data.shape[0], -1)).reshape(data.shape).astype('uint8')
if settings.MODEL == "vgg16":
return denormalize_vgg16_data(data)
return (data*256.0).clip(0.0, 255.5).astype('uint8')
def normalize_data_tanh(data):
"""Transform linearly integers within [0, 255] to float32 within [-1, 1]. Data must be numpy array of type 'uint8'."""
if data.dtype == 'float32':
return
M = np.amax(data)
m = np.amin(data)
if M > 255 or m < 0:
print_error("Trying to normalize data that does not fit within the uint8 range of [0, 255], with max = {} and min = {}.".format(M, m))
return (data.astype('float32') - 127.5) / 127.5
def denormalize_data_tanh(data):
"""Transform linearly floating points within [-1, 1] to uint8 within [0, 255]. Data must be numpy array of type 'float32'."""
if data.dtype == 'uint8':
return
M = np.amax(data)
m = np.amin(data)
if M > 1.0 or m < -1.0:
print_error("Trying to normalize data that does not fit within the float32 range of [-1, 1], with max = {} and min = {}.".format(M, m))
return (data*127.5 + 127.5).astype('uint8')
def normalize_data_unit_interval(data):
"""Transform linearly integers within [0, 255] to float32 within [0, 1]. Data must be numpy array of type 'uint8'."""
if data.dtype == 'float32':
return
return data.astype('float32') / 255.0
def denormalize_data_unit_interval(data):
"""Transform linearly floating points within [0, 1] to uint8 within [0, 255]. Data must be numpy array of type 'float32'."""
if data.dtype == 'uint8':
return
return (data * 255.0).astype('uint8')
def unflatten_to_4tensor(data, num_rows, width = 64, height = 64, is_colors_channel_first = True):
if is_colors_channel_first:
return data.reshape(num_rows, 3, width, height)
else:
return data.reshape(num_rows, width, height)
def unflatten_to_3tensor(data, width = 64, height = 64, is_colors_channel_first = True):
if is_colors_channel_first:
return data.reshape(3, width, height)
else:
return data.reshape(width, height, 3)
def transpose_colors_channel(data, from_first_to_last = True):
if from_first_to_last:
### Convert colors channel from first position to last position
if len(data.shape) == 4:
### We are dealing with a batch (4-tensor)
if data.shape[1] != 3 and data.shape[3] == 3:
raise Exception("It appears that your colors channel is located last. Pass argument from_first_to_last=False.")
num_rows = data.shape[0]
width = data.shape[2]
height = data.shape[3]
return data.transpose(0, 2, 3, 1).reshape(num_rows, width, height, 3)
elif len(data.shape) == 3:
### We are dealing with a single image (3-tensor)
if data.shape[0] != 3 and data.shape[2] == 3:
raise Exception("It appears that your colors channel is located last. Pass argument from_first_to_last=False.")
width = data.shape[1]
height = data.shape[2]
return data.transpose(1, 2, 0).reshape(width, height, 3)
else:
raise ValueError("Dataset is not a 4-tensor batch or a 3-tensor image, as expected.")
else:
### Convert colors channel from last position to first position
if len(data.shape) == 4:
### We are dealing with a batch (4-tensor)
if data.shape[3] != 3 and data.shape[1] == 3:
raise Exception("It appears that your colors channel is located first. " +\
"You need to use 'transpose_to_colors_channel_last' instead.")
num_rows = data.shape[0]
width = data.shape[1]
height = data.shape[2]
return data.transpose(0, 3, 1, 2).reshape(num_rows, 3, width, height)
elif len(data.shape) == 3:
### We are dealing with a single image (3-tensor)
if data.shape[2] != 3 and data.shape[0] == 3:
raise Exception("It appears that your colors channel is located first. " + \
"You need to use 'transpose_to_colors_channel_last' instead.")
width = data.shape[0]
height = data.shape[1]
return data.transpose(2, 0, 1).reshape(3, width, height)
else:
raise ValueError("Dataset is not a 4-tensor batch or a 3-tensor image, as expected.")
### Pretty exceptions handling and pretty logging messages
def cprint_curtime(msg, fg=None, bg=None, attrs=[]):
"""Print the line in format "[HH:MM:SS] msg", where HH:MM:SS is the current time
in zero-padded hours, minutes and seconds. The foreground and background colors, as
well as the text attributes are only applied to the 'msg' portion."""
sys.stdout.write("[{0}] ".format(datetime.datetime.now().strftime("%H:%M:%S")))
sys.stdout.flush()
cprint(msg, fg, bg, attrs)
sys.stdout.flush()
def handle_exceptions(msg, e, exception_type, fg=None, bg=None, attrs=[]):
from settings import VERBOSE, MODULE_HAVE_XTRACEBACK
cprint_curtime("[EXCEPTION] {0}: {1}".format(exception_type, msg), fg=fg, bg=bg, attrs=attrs)
logerr("[EXCEPTION] {0}: {1}".format(exception_type, msg))
logout("[EXCEPTION] {0}: {1}".format(exception_type, msg))
# Write the exception reason/message in Magenta
cprint_curtime("EXCEPTION CAUSE: " + str(e), fg="magenta")
logerr("EXCEPTION CAUSE: " + str(e))
logout("EXCEPTION CAUSE: " + str(e))
if not MODULE_HAVE_XTRACEBACK:
raise e
else:
from traceback import format_exc, print_exc
format_str = format_exc()
logerr("EXCEPTION TRACEBACK: " + format_str)
logout("EXCEPTION TRACEBACK: " + format_str)
cprint_curtime("EXCEPTION TRACEBACK:", fg=fg, bg=bg, attrs=attrs)
print_exc()
def handle_critical(msg, e):
handle_exceptions(msg, e, exception_type = "CRITICAL", fg = "white", bg = "on_red", attrs=["bold", "underline"])
def print_critical(msg):
logerr(msg)
logout(msg)
cprint_curtime("(!!!) {0}: {1}".format("CRITICAL", msg), fg = "white", bg = "on_red", attrs=["bold", "underline"])
def handle_error(msg, e):
handle_exceptions(msg, e, exception_type = "ERROR", fg = "red")
def print_error(msg):
logout(msg)
logerr(msg)
cprint_curtime("(!) {0}: {1}".format("ERROR", msg), fg = "red")
def handle_warning(msg, e):
handle_exceptions(msg, e, exception_type = "WARNING", fg = "yellow")
def print_warning(msg):
logout(msg)
cprint_curtime("{0}: {1}".format("WARNING", msg), fg = "yellow")
def print_info(msg):
logout(msg)
cprint_curtime("{0}: {1}".format("INFO", msg), fg = "cyan")
def print_positive(msg):
logout(msg)
cprint_curtime("{0}: {1}".format("EXCELLENT", msg), "cyan", attrs=["bold"])
def logout(msg):
with open(settings.OUTLOGFILE, 'a') as fd:
fd.write("[{0}] {1}\n".format(datetime.datetime.now().strftime("%H:%M:%S"),msg))
fd.flush()
def logerr(msg):
with open(settings.ERRLOGFILE, 'a') as fd:
fd.write("[{0}] {1}\n".format(datetime.datetime.now().strftime("%H:%M:%S"),msg))
fd.flush()
def log(msg):
cprint_curtime(msg)
logout(msg)
def force_symlink(src, dst):
try:
os.symlink(src, dst)
except OSError, e:
if e.errno == errno.EEXIST:
os.remove(dst)
os.symlink(src, dst)
def get_json_pretty_print(json_object):
return json.dumps(json_object, sort_keys=True, indent=4, separators=(',', ': '))
### Saving results in convenient formats
### Utilities functions
import os
from os.path import join
import sys
import numpy as np
import PIL.Image as Image
## Your model will be saved in: models/<experiment_name>.hdf5
## A summary of your model architecture will saved be in: models/summary_<experiment_name>.txt
## Your model's performance will be saved in: models/performance_<experiment_name>.txt
## Your predictions will be saved in: predictions/assets/<experiment_name>/Y_pred_<i>.jpg
## predictions/assets/<experiment_name>/Y_<i>.jpg
## predictions/assets/<experiment_name>/X_outer_<i>.jpg
## predictions/assets/<experiment_name>/X_full_<i>.jpg
## predictions/assets/<experiment_name>/X_full_pred_<i>.jpg
def save_keras_predictions(pred, pred_indices, dataset,
num_images = 20, use_flattened_datasets = True):
from settings import touch_dir
from settings import ASSETS_DIR
if use_flattened_datasets:
touch_dir(ASSETS_DIR)
print_positive("Saving result images (outer frame input, inner frame prediction, true inner frame, and combination of outer frame + prediction and outer frame + true inner frame) within the directory: {}".format(ASSETS_DIR))
for row in range(num_images):
idt = pred_indices[row]
Image.fromarray(dataset.images_outer2d[idt]).save(join(ASSETS_DIR, 'images_outer2d_' + str(row) + '.jpg'))
Image.fromarray(pred[row]).save(join(ASSETS_DIR, 'images_pred_' + str(row) + '.jpg'))
Image.fromarray(dataset.images_inner2d[idt]).save(join(ASSETS_DIR, 'images_inner2d_' + str(row) + '.jpg'))
Image.fromarray(dataset.images[idt]).save(join(ASSETS_DIR, 'fullimages_' + str(row) + '.jpg'))
fullimg_pred = np.copy(dataset.images[idt])
center = (int(np.floor(fullimg_pred.shape[0] / 2.)), int(np.floor(fullimg_pred.shape[1] / 2.)))
fullimg_pred[center[0]-16:center[0]+16, center[1]-16:center[1]+16, :] = pred[row, :, :, :]
Image.fromarray(fullimg_pred).save(join(ASSETS_DIR, 'fullimages_pred_' + str(row) + '.jpg'))
else:
raise NotImplementedError("Haven't implemented save_predictions_info for 2D images (only flattened images).")
def denormalize_and_save_jpg_results(preds, X_test, y_test, X_original_test, num_images):
assets_dir = settings.ASSETS_DIR
if not os.path.exists(assets_dir):
os.makedirs(assets_dir)
print_info("Saving predictions and associated images as JPG files within directory: {}".format(assets_dir))
# Denormalize images datasets
preds = transpose_colors_channel(denormalize_data(preds))
X_test = transpose_colors_channel(denormalize_data(X_test))
y_test = transpose_colors_channel(denormalize_data(y_test))
X_original_test = transpose_colors_channel(denormalize_data(X_original_test))
# Save the 'num_images' predictions to JPG files within the 'assets' subdirectory
for index in range(num_images):
Image.fromarray(X_test[index]).save(os.path.join(assets_dir, 'images_outer2d_' + str(index) + '.jpg'))
Image.fromarray(preds[index]).save(os.path.join(assets_dir, 'images_pred_' + str(index) + '.jpg'))
Image.fromarray(y_test[index]).save(os.path.join(assets_dir, 'images_inner2d_' + str(index) + '.jpg'))
Image.fromarray(X_original_test[index]).save(os.path.join(assets_dir, 'fullimages_' + str(index) + '.jpg'))
fullimg_pred = np.copy(X_original_test[index])
center = (int(np.floor(fullimg_pred.shape[0] / 2.)), int(np.floor(fullimg_pred.shape[1] / 2.)))
fullimg_pred[center[0]-16:center[0]+16, center[1]-16:center[1]+16, :] = preds[index, :, :, :]
Image.fromarray(fullimg_pred).save(os.path.join(assets_dir, 'fullimages_pred_' + str(index) + '.jpg'))
def create_html_results_page(num_images):
# Write a file called 'results.html' that display the image predictions versus the true images in a convenient way
filename = os.path.join(settings.PRED_DIR, "results.html")
img_src = "assets/"
html_file = filename
print_info("Creating HTML page to visualize results here: {}".format(html_file))
with open(html_file, 'w') as fd:
fd.write("""
<table>
<tr>
<th style="width:132px">Input</th>
<th style="width:68px">Model prediction</th>
<th style="width:68px">Correct output</th>
<th style="width:132px">Input + prediction</th>
<th style="width:132px">Input + correct output</th>
</tr>
""")
for index in range(num_images):
fd.write(" <tr>\n")
fd.write(" <td><img src='%s/images_outer2d_%i.jpg' width='128' height='128'></td>\n" % (img_src, index))
fd.write(" <td><img src='%s/images_pred_%i.jpg' width='64' height='64'></td>\n" % (img_src, index))
fd.write(" <td><img src='%s/images_inner2d_%i.jpg' width='64' height='64'></td>\n" % (img_src, index))
fd.write(" <td><img src='%s/fullimages_pred_%i.jpg' width='128' height='128'></td>\n" % (img_src, index))
fd.write(" <td><img src='%s/fullimages_%i.jpg' width='128' height='128'></td>\n" % (img_src, index))
fd.write('</tr>\n')
fd.write('</table>')