-
Notifications
You must be signed in to change notification settings - Fork 17
/
conv_net.py
252 lines (191 loc) · 10.8 KB
/
conv_net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# -*- coding: utf-8 -*-
"""
Created on Wed Mar 22 12:41:17 2017
Code for connectome based classification with convolutional networks (CCNN)
partially based on code from Deep learning course by Udacity
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/udacity/4_convolutions.ipynb
@author: mregina
"""
import numpy as np
import tensorflow as tf
from six.moves import cPickle as pickle
#load sample data
pickle_file = 'tensors_medium_noise.pickle' #or tensors_high_noise.pickle
with open(pickle_file, 'rb') as f:
save = pickle.load(f)
data_tensor1 = save['data_tensor1']
data_tensor2 = save['data_tensor2']
labels = save['label']
del save # hint to help gc free up memory
# we have 150 "independent" subjects, so subjectIDs are unique, but in case of real data, where one subject has multiple measurements, the same subjectID should be repeated for those meaurements
subjectIDs=range(150)
#%%
#define functions for cross-validation, tensor randomization and normalization and performance calculation
#create_train_and_test_folds randomly divides subjectIDs stored in subjects to num_folds sets
#INPUT: num_folds: number of folds in cross-validation (integer)
# subjects: list of unique subject IDs
#OUTPUT: IDs: array storing unique subject IDs with num_folds columns: each column contains IDs of test subjects of the given fold
def create_train_and_test_folds(num_folds,subjects):
n=np.ceil(len(subjects)/num_folds).astype(np.int)
np.random.shuffle(subjects)
if len(subjects)!=n*num_folds:
s=np.zeros(n*num_folds)
s[:len(subjects)]=subjects
subjects=s
IDs=subjects.reshape((n,num_folds))
return IDs
#normalize_tensor standardizes an n dimesional np.array to have zero mean and maximal absolute value of 1
def normalize_tensor(data_tensor):
data_tensor-=np.mean(data_tensor)
data_tensor/=np.max(np.abs(data_tensor))
return data_tensor
#randomize_tensor generates a random permutation of instances and the corresponding labels before training
#INPUT: dataset: 4D tensor (np.array), instances are concatenated along the first (0.) dimension
# labels: 2D tensor (np.array), storing labels of instances in dataset,instances are concatenated along the first (0.)
# dimension, number of columns corresponds to the number of classes, i.e. labels are stored in one-hot encoding
#OUTPUT: shuffled_dataset: 4D tensor (np.array), instances are permuted along the first (0.) dimension
# shuffled_labels: 2D tensor (np.array), storing labels of instances in shuffled_dataset
def randomize_tensor(dataset, labels):
permutation = np.random.permutation(labels.shape[0])
shuffled_dataset = dataset[permutation,:,:,:]
shuffled_labels = labels[permutation,:]
return shuffled_dataset, shuffled_labels
#create_train_and_test_data creates and prepares training and test datasets and labels for a given fold of cross-validation
#INPUT: fold: number of the given fold (starting from 0)
# IDs: array storing unique subject IDs with num_folds columns: each column contains IDs of test subjects of the given fold
# (output of reate_train_and_test_folds)
# subjectIDs: list of subject IDs corresponding to the order of instances stored in the dataset (ID of the same subject might appear more than once)
# labels: 1D vector (np.array) storing instance labels as integers (label encoding)
# data_tensor: 4D tensor (np.array), instances are concatenated along the first (0.) dimension
#OUTPUT: train_data: 4D tensor (np.array) of normalized and randomized train instances of the given fold
# train_labels: 2D tensor (np.array), storing labels of instances in train_data in one-hot encoding
# test_data: 4D tensor (np.array) of normalized (but not randomized) test instances of the given fold
# test_labels: 2D tensor (np.array), storing labels of instances in test_data in one-hot encoding
def create_train_and_test_data(fold,IDs,subjectIDs,labels,data_tensor):
#create one-hot encoding of labels
num_labels=len(np.unique(labels))
labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
#identify the IDs of test subjects
testIDs=np.in1d(subjectIDs,IDs[:,fold])
test_data=normalize_tensor(data_tensor[testIDs,:,:,:]).astype(np.float32)
test_labels=labels[testIDs]
train_data=normalize_tensor(data_tensor[~testIDs,:,:,:]).astype(np.float32)
train_labels=labels[~testIDs]
train_data,train_labels=randomize_tensor(train_data,train_labels)
return train_data,train_labels,test_data,test_labels
#accuracy calculates classification accuracy from one-hot encoded labels and predictions
#INPUT: predictions: 2D tensor (np.array), storing predicted labels (calculated with soft-max in our case) of instances with one-hot encoding
# labels: 2D tensor (np.array), storing actual labels with one-hot encoding
#OUTPUT: accuracy in %
def accuracy(predictions, labels):
return (100.0 * np.sum(np.argmax(predictions, 1) == np.argmax(labels, 1))
/ predictions.shape[0])
#%%
#initialize network parameters
numROI=499
num_channels=2
num_labels=2
image_size=numROI
batch_size = 4
patch_size = image_size
depth = 64
num_hidden = 96
keep_pr=0.6
combined_tensor=np.zeros((data_tensor1.shape[0],data_tensor1.shape[1],data_tensor1.shape[2],data_tensor1.shape[3]*num_channels))
combined_tensor[:,:,:,0]=normalize_tensor(data_tensor1[:,:,:,0])
combined_tensor[:,:,:,1]=normalize_tensor(data_tensor2[:,:,:,0])
subjects=np.unique(subjectIDs)
num_folds=10
IDs=create_train_and_test_folds(num_folds,subjects)
#IDs=np.load("F:/DTW/conv_net/IDs7_old.npy") #IDs can be loaded for repeated tests
test_labs=[]
test_preds=[]
#%%
#launch TensorFlow in each fold of cross-validation
for i in range(num_folds):
train_data,train_labels,test_data,test_labels=create_train_and_test_data(i,IDs,subjectIDs,labels,combined_tensor)
train_data=train_data[:,:image_size,:image_size,:]
test_data=test_data[:,:image_size,:image_size,:]
graph = tf.Graph()
with graph.as_default():
#input data placeholders
tf_train_dataset = tf.placeholder(tf.float32, shape=(batch_size, image_size, image_size, num_channels))
tf_train_labels = tf.placeholder(tf.float32, shape=(batch_size, num_labels))
#test data is a constant
tf_test_dataset = tf.constant(test_data)
#network weight variables: Xavier initialization for better convergence in deep layers
layer1_weights = tf.get_variable("layer1_weights", shape=[1, patch_size, num_channels, depth],
initializer=tf.contrib.layers.xavier_initializer())
layer1_biases = tf.Variable(tf.constant(0.001, shape=[depth]))
layer2_weights = tf.get_variable("layer2_weights", shape=[patch_size, 1, depth, 2*depth],
initializer=tf.contrib.layers.xavier_initializer())
layer2_biases = tf.Variable(tf.constant(0.001, shape=[2*depth]))
layer3_weights = tf.get_variable("layer3_weights", shape=[2*depth, num_hidden],
initializer=tf.contrib.layers.xavier_initializer())
layer3_biases = tf.Variable(tf.constant(0.01, shape=[num_hidden]))
layer4_weights = tf.get_variable("layer4_weights", shape=[num_hidden, num_labels],
initializer=tf.contrib.layers.xavier_initializer())
layer4_biases = tf.Variable(tf.constant(0.01, shape=[num_labels]))
#convolutional network architecture
def model(data, keep_pr):
#first layer: line-by-line convolution with ReLU and dropout
conv = tf.nn.conv2d(data, layer1_weights, [1, 1, 1, 1], padding='VALID')
hidden = tf.nn.dropout(tf.nn.relu(conv+layer1_biases),keep_pr)
#second layer: convolution by column with ReLU and dropout
conv = tf.nn.conv2d(hidden, layer2_weights, [1, 1, 1, 1], padding='VALID')
hidden = tf.nn.dropout(tf.nn.relu(conv+layer2_biases),keep_pr)
#third layer: fully connected hidden layer with dropout and ReLU
shape = hidden.get_shape().as_list()
reshape = tf.reshape(hidden, [shape[0], shape[1] * shape[2] * shape[3]])
hidden = tf.nn.dropout(tf.nn.relu(tf.matmul(reshape, layer3_weights) + layer3_biases),keep_pr)
#fourth (output) layer: fully connected layer with logits as output
return tf.matmul(hidden, layer4_weights) + layer4_biases
#calculate loss-function (cross-entropy) in training
logits = model(tf_train_dataset,keep_pr)
loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits, tf_train_labels))
#optimizer definition
learning_rate = 0.001
optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss)
#calculate predictions from training data
train_prediction = tf.nn.softmax(logits)
#calculate predictions from test data (keep_pr of dropout is 1!)
test_prediction = tf.nn.softmax(model(tf_test_dataset,1))
# nuber of iterations
num_steps = 20001
#start TensorFlow session
with tf.Session(graph=graph) as session:
tf.global_variables_initializer().run()
print('Initialized')
for step in range(num_steps):
offset = (step * batch_size) % (train_labels.shape[0] - batch_size)
if (offset == 0 ): #if we seen all train data at least once, re-randomize the order of instances
train_data, train_labels = randomize_tensor(train_data, train_labels)
#create batch
batch_data = train_data[offset:(offset + batch_size), :, :, :]
batch_labels = train_labels[offset:(offset + batch_size), :]
#feed batch data to the placeholders
feed_dict = {tf_train_dataset : batch_data, tf_train_labels : batch_labels}
_, l, predictions = session.run(
[optimizer, loss, train_prediction], feed_dict=feed_dict)
# at every 2000. step give some feedback on the progress
if (step % 2000 == 0):
print('Minibatch loss at step %d: %f' % (step, l))
print('Minibatch accuracy: %.1f%%' % accuracy(predictions, batch_labels))
#evaluate the trained model on the test data in the given fold
test_pred=test_prediction.eval()
print('Test accuracy: %.1f%%' % accuracy(test_pred, test_labels))
#save test predictions and labels of this fold to a list
test_labs.append(test_labels)
test_preds.append(test_pred)
#create np.array to store all predictions and labels
l=test_labs[0]
p=test_preds[0]
#iterate through the cross-validation folds
for i in range(1,num_folds):
l=np.vstack((l,test_labs[i]))
p=np.vstack((p,test_preds[i]))
#calculate final accuracy
print('Test accuracy: %.1f%%' % accuracy(p, l))
#save data
np.savez("predictions.npz",labels=l,predictions=p,splits=IDs)