-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrobustclassifier.py
372 lines (339 loc) · 17 KB
/
robustclassifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Robust Classifier
NOTE:
in my conda environment, cvxpy can only be installed via pip.
if you need install cvxpy in conda, please ref to the reference below:
https://stackoverflow.com/questions/41060382/using-pip-to-install-packages-to-anaconda-environment
Dependencies:
- Python 3.7.6
- NumPy 1.18.1
- cvxpy 1.1.0a3
- PyTorch 1.4.0
- cvxpylayers 0.1.2
- arrow 0.13.1
"""
import nn
import arrow
import utils
import torch
import plots
import cvxpy as cp
import numpy as np
import torch.optim as optim
from cvxpylayers.torch import CvxpyLayer
# TEST METHODS
def knn_regressor(H_test, H_train, p_hat_train, K=5):
"""
k-Nearest Neighbor Regressor
Given the train embedding and its corresponding optimal marginal distribution for each class,
the function produce the prediction of p_hat for testing dataset given the test embedding based
on the k-Nearest Neighbor rule.
input
- H_test: [n_test_sample, n_feature]
- H_train: [n_train_sample, n_feature]
- p_hat_train: [n_class, n_train_sample]
output
- p_hat_test: [n_class, n_test_sample]
"""
# find the indices of k-nearest neighbor in trainset
dist = utils.pairwise_dist(H_test, H_train)
dist *= -1
_, knb = torch.topk(dist, K, dim=1) # [n_test_sample, K]
# calculate the class marginal probability (p_hat) for each test sample
p_hat_test = torch.stack(
[ p_hat_train[:, neighbors].mean(dim=1)
for neighbors in knb ], dim=0).t() # [n_class, n_test_sample]
return p_hat_test
def kernel_smoother(H_test, H_train, p_hat_train, h=1e-1):
"""
kernel smoothing test
Given the train embedding and its corresponding optimal marginal distribution for each class,
the function produce the prediction of p_hat for testing dataset given the test embedding based
on the kernel smoothing rule with the bandwidth h.
input
- H_test: [n_test_sample, n_feature]
- H_train: [n_train_sample, n_feature]
- p_hat_train: [n_class, n_train_sample]
output
- p_hat_test: [n_class, n_test_sample]
"""
n_test_sample, n_feature = H_test.shape[0], H_test.shape[1]
n_class, n_train_sample = p_hat_train.shape[0], p_hat_train.shape[1]
# calculate the pairwise distance between training sample and testing sample
dist = utils.pairwise_dist(H_train, H_test) # [n_train_sample, n_test_sample]
# apply gaussian kernel
G = 1 / ((np.sqrt(2*np.pi) * h) ** n_feature) * \
torch.exp(- dist ** 2 / (2 * h ** 2)) # [n_train_sample, n_test_sample]
G = G.unsqueeze(0).repeat([n_class, 1, 1]) # [n_class, n_train_sample, n_test_sample]
p_hat_ext = p_hat_train.unsqueeze(2).\
repeat([1, 1, n_test_sample]) # [n_class, n_train_sample, n_test_sample]
p_hat_test = (G * p_hat_ext).mean(dim=1) # [n_class, n_test_sample]
return p_hat_test
# GENERAL TRAIN PROCEDURE
def train(model, trainloader, testloader=None, n_iter=100, log_interval=10, lr=1e-2):
"""training procedure for one epoch"""
# NOTE: gradient for loss is expected to be None,
# since it is not leaf node. (it's root node)
optimizer = optim.Adadelta(model.parameters(), lr=lr)
for batch_idx, (X, Y) in enumerate(trainloader):
X, Y = X.float(), Y.float()
model.train()
Q = utils.sortedY2Q(Y) # calculate empirical distribution based on labels
optimizer.zero_grad() # init optimizer (set gradient to be zero)
p_hat = model(X, Q) # inference
loss = utils.tvloss(p_hat) # calculate total variation loss
# loss = utils.celoss(p_hat) # calculate cross entropy loss
loss.backward() # gradient descent
optimizer.step() # update optimizer
if batch_idx % log_interval == 0:
print("[%s] Train batch: %d\tLoss: %.3f" % (arrow.now(), batch_idx, loss.item()))
# TODO: temporarily place test right here, will remove it in the end.
if testloader is not None:
test(model, trainloader, testloader, K=5, h=1e-1)
if batch_idx > n_iter:
break
# GENERAL TEST PROCEDURE
def test(model, trainloader, testloader, K=5, h=1e-1):
"""testing procedure"""
# given hidden embedding, evaluate corresponding p_hat
# using the output of the robust classifier layer
def evaluate_p_hat(H, Q, theta):
n_class, n_sample, n_feature = theta.shape[1], H.shape[1], H.shape[2]
rbstclf = RobustClassifierLayer(n_class, n_sample, n_feature)
return rbstclf(H, Q, theta).data
# fetch data from trainset and testset
X_train = trainloader.X.float().unsqueeze(1) # [n_train_sample, 1, n_pixel, n_pixel]
Y_train = trainloader.Y.float().unsqueeze(0) # [1, n_train_sample]
X_test = testloader.X.float().unsqueeze(1) # [n_test_sample, 1, n_pixel, n_pixel]
Y_test = testloader.Y.float() # [n_test_sample]
# get H (embeddings) and p_hat for trainset and testset
model.eval()
with torch.no_grad():
Q = utils.sortedY2Q(Y_train) # [1, n_class, n_sample]
H_train = model.data2vec(X_train) # [n_train_sample, n_feature]
H_test = model.data2vec(X_test) # [n_test_sample, n_feature]
theta = model.theta.data.unsqueeze(0) # [1, n_class]
p_hat = evaluate_p_hat(
H_train.unsqueeze(0), Q, theta).squeeze(0) # [n_class, n_train_sample]
# perform test
p_hat_knn = knn_regressor(H_test, H_train, p_hat, K)
p_hat_kernel = kernel_smoother(H_test, H_train, p_hat, h)
# calculate accuracy
knn_pred = p_hat_knn.argmax(dim=0)
knn_n_correct = knn_pred.eq(Y_test).sum().item()
knn_accuracy = knn_n_correct / len(testloader)
kernel_pred = p_hat_kernel.argmax(dim=0)
kernel_n_correct = kernel_pred.eq(Y_test).sum().item()
kernel_accuracy = kernel_n_correct / len(testloader)
print("[%s] Test set: kNN accuracy: %.3f, kernel smoothing accuracy: %.3f (%d samples)" % (arrow.now(), knn_accuracy, kernel_accuracy, len(testloader)))
return knn_accuracy, kernel_accuracy
# IMAGE CLASSIFIER
class RobustImageClassifier(torch.nn.Module):
"""
A Robust Image Classifier based on multiple CNNs and a Robust Classifier Layer defined below
"""
def __init__(self, n_class, n_sample, n_feature, max_theta=0.1,
n_pixel=28, in_channel=1, out_channel=7,
kernel_size=3, stride=1, keepprob=0.2):
"""
Args:
- n_class: number of classes
- n_sample: number of sets of samples
- n_feature: size of the output feature (output of CNN)
- max_theta: threshold for theta_k (empirical distribution)
- n_pixel: number of pixels along one axis for an image (n_pixel * n_pixel)
28 in default (mnist)
- in_channel: number of in channels for an image
1 in default (mnist)
- out_channel: number of out channels from CNN
5 in default
- kernel_size: size of the kernel in CNN
3 in default
- stride: the stride for the cross-correlation, a single number or a tuple.
1 in default
- keepprob: keep probability for dropout layer
0.2 in default
"""
super(RobustImageClassifier, self).__init__()
# configurations
self.n_class = n_class
self.max_theta = max_theta
self.n_feature = n_feature
# Image to Vec layer
self.data2vec = nn.SimpleImage2Vec(n_feature,
in_channel, out_channel, n_pixel, kernel_size, stride, keepprob)
# robust classifier layer
# NOTE: if self.theta is a parameter, then it cannot be reassign with other values,
# since it is one of the attributes defined in the model.
self.theta = torch.nn.Parameter(torch.ones(self.n_class).float() * self.max_theta)
self.theta.requires_grad = True
# self.theta = torch.ones(self.n_class) * self.max_theta
self.rbstclf = RobustClassifierLayer(n_class, n_sample, n_feature)
def forward(self, X, Q):
"""
customized forward function.
input
- X: [batch_size, n_sample, in_channel, n_pixel, n_pixel]
- Q: [batch_size, n_class, n_sample]
output
- p_hat: [batch_size, n_class, n_sample]
- X: [batch_size, n_sample, n_feature]
"""
batch_size = X.shape[0]
n_sample = X.shape[1]
# CNN layer
# NOTE: merge the batch_size dimension and n_sample dimension
X = X.view(batch_size*n_sample,
X.shape[2], X.shape[3], X.shape[4]) # [batch_size*n_sample, in_channel, n_pixel, n_pixel]
Z = self.data2vec(X) # [batch_size*n_sample, n_feature]
# NOTE: reshape back to batch_size and n_sample
Z = Z.view(batch_size, n_sample, Z.shape[-1]) # [batch_size, n_sample, n_feature]
# robust classifier layer
# theta = torch.ones(batch_size, self.n_class, requires_grad=True) * self.max_theta
theta = self.theta.unsqueeze(0).repeat([batch_size, 1]) # [batch_size, n_class]
p_hat = self.rbstclf(Z, Q, theta) # [batch_size, n_class, n_sample]
return p_hat
# GENERAL CLASSIFIER
class RobustClassifierLayer(torch.nn.Module):
"""
A Robust Classifier Layer via CvxpyLayer
"""
def __init__(self, n_class, n_sample, n_feature):
"""
Args:
- n_class: number of classes
- n_sample: total number of samples in a single batch (including all classes)
"""
super(RobustClassifierLayer, self).__init__()
self.n_class, self.n_sample, self.n_feature = n_class, n_sample, n_feature
self.cvxpylayer = self._cvxpylayer(n_class, n_sample)
def forward(self, X_tch, Q_tch, theta_tch):
"""
customized forward function.
X_tch is a single batch of the input data and Q_tch is the empirical distribution obtained from
the labels of this batch.
input:
- X_tch: [batch_size, n_sample, n_feature]
- Q_tch: [batch_size, n_class, n_sample]
- theta_tch: [batch_size, n_class]
output:
- p_hat: [batch_size, n_class, n_sample]
"""
C_tch = self._wasserstein_distance(X_tch) # [batch_size, n_sample, n_sample]
gamma_hat = self.cvxpylayer(theta_tch, Q_tch, C_tch) # (n_class [batch_size, n_sample, n_sample])
gamma_hat = torch.stack(gamma_hat, dim=1) # [batch_size, n_class, n_sample, n_sample]
p_hat = gamma_hat.sum(dim=2) # [batch_size, n_class, n_sample]
return p_hat
@staticmethod
def _wasserstein_distance(X):
"""
the wasserstein distance for the input data via calculating the pairwise norm of two aribtrary
data points in the single batch of the input data, denoted as C here.
see reference below for pairwise distance calculation in torch:
https://discuss.pytorch.org/t/efficient-distance-matrix-computation/9065/2
input
- X: [batch_size, n_sample, n_feature]
output
- C_tch: [batch_size, n_sample, n_sample]
"""
C_tch = []
for x in X.split(split_size=1):
x = torch.squeeze(x, dim=0) # [n_sample, n_feature]
x_norm = (x**2).sum(dim=1).view(-1, 1) # [n_sample, 1]
y_t = torch.transpose(x, 0, 1) # [n_feature, n_sample]
y_norm = x_norm.view(1, -1) # [1, n_sample]
dist = x_norm + y_norm - 2.0 * torch.mm(x, y_t) # [n_sample, n_sample]
# Ensure diagonal is zero if x=y
dist = dist - torch.diag(dist) # [n_sample, n_sample]
dist = torch.clamp(dist, min=0.0, max=np.inf) # [n_sample, n_sample]
C_tch.append(dist)
C_tch = torch.stack(C_tch, dim=0) # [batch_size, n_sample, n_sample]
return C_tch
@staticmethod
def _cvxpylayer(n_class, n_sample):
"""
construct a cvxpylayer that solves a robust classification problem
see reference below for the binary case:
http://papers.nips.cc/paper/8015-robust-hypothesis-testing-using-wasserstein-uncertainty-sets
"""
# NOTE:
# cvxpy currently doesn't support N-dim variables, see discussion and solution below:
# * how to build N-dim variables?
# https://github.com/cvxgrp/cvxpy/issues/198
# * how to stack variables?
# https://stackoverflow.com/questions/45212926/how-to-stack-variables-together-in-cvxpy
# Variables
# - gamma_k: the joint probability distribution on Omega^2 with marginal distribution Q_k and p_k
gamma = [ cp.Variable((n_sample, n_sample)) for k in range(n_class) ]
# - p_k: the marginal distribution of class k [n_class, n_sample]
p = [ cp.sum(gamma[k], axis=0) for k in range(n_class) ]
p = cp.vstack(p)
# Parameters (indirectly from input data)
# - theta: the threshold of wasserstein distance for each class
theta = cp.Parameter(n_class)
# - Q: the empirical distribution of class k obtained from the input label
Q = cp.Parameter((n_class, n_sample))
# - C: the pairwise distance between data points
C = cp.Parameter((n_sample, n_sample))
# Constraints
cons = [ g >= 0. for g in gamma ]
for k in range(n_class):
cons += [cp.sum(cp.multiply(gamma[k], C)) <= theta[k]]
for l in range(n_sample):
cons += [cp.sum(gamma[k], axis=1)[l] == Q[k, l]]
# Problem setup
# tv loss
obj = cp.Maximize(cp.sum(cp.min(p, axis=0)))
# cross entropy loss
# obj = cp.Minimize(cp.sum(- cp.sum(p * cp.log(p), axis=0)))
prob = cp.Problem(obj, cons)
assert prob.is_dpp()
# return cvxpylayer with shape (n_class [batch_size, n_sample, n_sample])
# stack operation ('torch.stack(gamma_hat, axis=1)') is needed for the output of this layer
# to convert the output tensor into a normal shape, i.e., [batch_size, n_class, n_sample, n_sample]
return CvxpyLayer(prob, parameters=[theta, Q, C], variables=gamma)
# OTHERS
def search_through(model, trainloader, testloader, n_grid=50, K=8, h=1e-1):
"""
search through the embedding space, return the corresponding p_hat of a set of uniformly
sampled points in the space.
NOTE: now it only supports 2D embedding space
"""
# given hidden embedding, evaluate corresponding p_hat
# using the output of the robust classifier layer
def evaluate_p_hat(H, Q, theta):
n_class, n_sample, n_feature = theta.shape[1], H.shape[1], H.shape[2]
rbstclf = RobustClassifierLayer(n_class, n_sample, n_feature)
return rbstclf(H, Q, theta).data
assert model.n_feature == 2
# fetch data from trainset and testset
X_train = trainloader.X.float().unsqueeze(1) # [n_train_sample, 1, n_pixel, n_pixel]
Y_train = trainloader.Y.float().unsqueeze(0) # [1, n_train_sample]
X_test = testloader.X.float().unsqueeze(1) # [n_test_sample, 1, n_pixel, n_pixel]
Y_test = testloader.Y.float() # [n_test_sample]
# get H (embeddings) and p_hat for trainset and testset
model.eval()
with torch.no_grad():
Q = utils.sortedY2Q(Y_train) # [1, n_class, n_sample]
H_train = model.data2vec(X_train) # [n_train_sample, n_feature]
H_test = model.data2vec(X_test) # [n_test_sample, n_feature]
theta = model.theta.data.unsqueeze(0) # [1, n_class]
p_hat = evaluate_p_hat(
H_train.unsqueeze(0), Q, theta).squeeze(0) # [n_class, n_train_sample]
# uniformly sample points in the embedding space
# - the limits of the embedding space
min_H = torch.cat((H_train, H_test), 0).min(dim=0)[0].numpy()
max_H = torch.cat((H_train, H_test), 0).max(dim=0)[0].numpy()
min_H, max_H = min_H - (max_H - min_H) * .1, max_H + (max_H - min_H) * .1
H_space = [ np.linspace(min_h, max_h, n_grid + 1)[:-1]
for min_h, max_h in zip(min_H, max_H) ] # (n_feature [n_grid])
H = [ [x, y] for x in H_space[0] for y in H_space[1] ]
H = torch.Tensor(H) # [n_grid * n_grid, n_feature]
# perform test
p_hat_knn = knn_regressor(H, H_train, p_hat, K) # [n_class, n_grid * n_grid]
p_hat_kernel = kernel_smoother(H, H_train, p_hat, h) # [n_class, n_grid * n_grid]
plots.visualize_2Dspace_2class(
n_grid, max_H, min_H, p_hat_knn,
H_train, Y_train, H_test, Y_test, prefix="test")