-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathstacked_ae.py
148 lines (136 loc) · 6.12 KB
/
stacked_ae.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import keras
from keras.models import Model,Sequential
from keras.layers import Input,Dense, Dropout, Activation, Flatten, Reshape, Conv2D, MaxPooling2D, AveragePooling2D
from keras import regularizers
from keras.losses import mean_squared_error
from keras import losses
import matplotlib.patches as patches
import numpy as np
import dicom
import cv2
from utils import *
from train_cnn import run
def open_data_AE(X_fullsize, y_pred, contour_mask):
"""
Open dataset from the output of the CNN and
unroll it as 64*64 = vector of 4096 elements
:param X_fullsize: full size training set (256x256)
:param y_pred: CNN output
:return: input AE, output
"""
input_AE = []
contour_experts = []
for j in range(y_pred.shape[0]):
in_AE = cv2.resize(compute_roi_pred(X_fullsize, y_pred, contour_mask, j, roi_shape=32)[0],(64 , 64))
contour = cv2.resize(compute_roi_pred(X_fullsize, y_pred, contour_mask, j)[2], (64,64), interpolation = cv2.INTERSECT_NONE)
input_AE.append(in_AE)
contour_experts.append(contour)
return np.array(input_AE).reshape((-1, 64*64)), np.array(contour_experts).reshape((-1, 64*64))
def customized_loss(y_true, y_pred, alpha=0.0001, beta=3):
"""
Create a customized loss for the stacked AE.
Linear combination of MSE and KL divergence.
"""
#customize your own loss components
loss1 = losses.mean_absolute_error(y_true, y_pred)
loss2 = losses.kullback_leibler_divergence(y_true, y_pred)
#adjust the weight between loss components
return (alpha/2) * loss1 + beta * loss2
def model1(X_train, param_reg, get_history=False, verbose=0,loss = "customized_loss"):
"""
First part of the stacked AE.
Train the AE on the ROI input images.
:param X_train: ROI input image
:param get_history: boolean to return the loss history
:param loss: if "customized_loss" -> customized_loss
:return: encoded ROI image
"""
autoencoder_0 = Sequential()
encoder_0 = Dense(input_dim=4096, units=100, kernel_regularizer=regularizers.l2(param_reg))
decoder_0 = Dense(input_dim=100, units=4096, kernel_regularizer=regularizers.l2(param_reg))
autoencoder_0.add(encoder_0)
autoencoder_0.add(decoder_0)
if (loss == "customized_loss"):
loss = customized_loss
autoencoder_0.compile(loss = loss,optimizer='adam', metrics=['accuracy'])
h = autoencoder_0.fit(X_train, X_train, epochs=100, verbose=verbose)
temp_0 = Sequential()
temp_0.add(encoder_0)
temp_0.compile(loss=loss, optimizer='adam', metrics=['accuracy'])
encoded_X = temp_0.predict(X_train, verbose=0)
if get_history:
return h.history['loss'], encoded_X, encoder_0
else:
return encoded_X, encoder_0
def model2(encoder_0, encoded_X, X_train, param_reg, get_history=False, verbose=0,loss = "customized_loss"):
"""
Second part of the stacked AE.
:param X_train: encoder ROI image
:param get_history: boolean to return the loss history
:return: encoding layer
"""
autoencoder_1 = Sequential()
encoder_1 = Dense(input_dim=100, units=100, kernel_regularizer=regularizers.l2(param_reg))
decoder_1 = Dense(input_dim=100, units=100, kernel_regularizer=regularizers.l2(param_reg))
autoencoder_1.add(encoder_1)
autoencoder_1.add(decoder_1)
if (loss == "customized_loss"):
loss = customized_loss
autoencoder_1.compile(loss= loss, optimizer='adam', metrics=['accuracy'])
h = autoencoder_1.fit(encoded_X, encoded_X, epochs=100, verbose=verbose)
temp_0 = Sequential()
temp_0.add(encoder_0)
temp_0.compile(loss= loss, optimizer='adam', metrics=['accuracy'])
encoded_X = temp_0.predict(X_train, verbose=0)
if get_history:
return h.history['loss'], encoder_1
else:
return encoder_1
def model3(X_train, Y_train, encoder_0, encoder_1, init, param_reg,
get_history=False, verbose=0,loss = "MSE"):
"""
Last part of the stacked AE.
:param X_train: ROI input image
:param init: set the initial kernel weights (None for uniform)
:param get_history: boolean to return the loss history
:return: final model
"""
model = Sequential()
model.add(encoder_0)
model.add(encoder_1)
model.add(Dense(input_dim=100, units=4096, kernel_initializer=init, kernel_regularizer=regularizers.l2(param_reg)))
if (loss == "customized_loss"):
loss = customized_loss
model.compile(optimizer = 'adam', loss = loss, metrics=['accuracy'])
h = model.fit(X_train, Y_train, epochs=20, verbose=verbose)
if get_history:
return h.history['loss'], model
else:
return model
def run(X_fullsize, y_pred, contour_mask, X_to_pred=None, verbose=0, param_reg=3*0.001, init_3='zero',history=False ,loss1 = "customized_loss",loss2 = "customized_loss",loss3 = "MSE"):
"""
Full pipeline for CNN: load the dataset, train the model and predict ROIs
:param X_fullsize: full size training set (256x256)
:param y_pred: CNN output for training
:param X_to_pred: input for predictions after training (X_train if not specified)
:param verbose: int for verbose
:return: X_train, Y_train, contours_pred
"""
X_train, Y_train = open_data_AE(X_fullsize, y_pred, contour_mask)
encoded_X, encoder_0 = model1(X_train, param_reg=param_reg,loss = loss1)
encoder_1 = model2(encoder_0, encoded_X, X_train, param_reg=param_reg,loss = loss2)
h, model = model3(X_train, Y_train, encoder_0, encoder_1, param_reg=param_reg,
init=init_3, get_history=True, verbose=verbose,loss = loss3)
if not X_to_pred:
X_to_pred = X_train
contours = model.predict(X_to_pred)
binarys = np.array([cv2.threshold(contour, 0, 1, cv2.INTERSECT_NONE)[1].reshape((64,64)) for contour in contours])
if history:
return X_train, Y_train, binarys, model, h
else:
return X_train, Y_train, binarys, model
def inference(X_fullsize, y_pred, contour_mask, model):
X_test, Y_test = open_data_AE(X_fullsize, y_pred, contour_mask)
contours = model.predict(X_test)
binarys = np.array([cv2.threshold(contour, 0, 1, cv2.INTERSECT_NONE)[1].reshape((64,64)) for contour in contours])
return X_test, Y_test, binarys