-
Notifications
You must be signed in to change notification settings - Fork 4
/
tga.py
322 lines (256 loc) · 10 KB
/
tga.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.extmath import fast_dot, norm
from sklearn.utils import as_float_array, check_array, check_random_state
from sklearn.utils.validation import check_is_fitted
from numpy.linalg import norm as norm_
def _trimmed_mean_1d(arr, k):
"""Calculate trimmed mean on a 1d array.
Trim values largest than the k'th largest value or smaller than the k'th
smallest value
Parameters
----------
arr: ndarray, shape (n,)
The one-dimensional input array to perform trimmed mean on
k: int
The thresholding order for trimmed mean
Returns
-------
trimmed_mean: float
The trimmed mean calculated
"""
kth_smallest = np.partition(arr, k)[k-1]
kth_largest = -np.partition(-arr, k)[k-1]
cnt = 0
summation = 0.0
for elem in arr:
if elem >= kth_smallest and elem <= kth_largest:
cnt += 1
summation += elem
return summation / cnt
def _trimmed_mean(X, trim_proportion):
"""Calculate trimmed mean on each column of input matrix
Parameters
----------
X: ndarray, shape (n_samples, n_features)
The input matrix to perform trimmed mean on
trim_proportion: float
The proportion of trim. Largest and smallest 'trim_proportion' are
trimmed when calculating the mean.
Returns
-------
trimmed_mean: ndarray, shape (n_features,)
The trimmed mean calculated on each column
"""
n_samples, n_features = X.shape
n_trim = int(n_samples * trim_proportion)
return np.apply_along_axis(_trimmed_mean_1d, 0, X, k=n_trim)
def _reorth(basis, target, rows=None, alpha=0.5):
"""Reorthogonalize a vector using iterated Gram-Schmidt
Parameters
----------
basis: ndarray, shape (n_features, n_basis)
The matrix whose rows are a set of basis to reorthogonalize against
target: ndarray, shape (n_features,)
The target vector to be reorthogonalized
rows: {array-like, None}, default None
Indices of rows from basis to use. Use all if None
alpha: float, default 0.5
Parameter for determining whether to do a second reorthogonalization.
Returns
-------
reorthed_target: ndarray, shape (n_features,)
The reorthogonalized vector
"""
if rows is not None:
basis = basis[rows]
norm_target = norm(target)
norm_target_old = 0
n_reorth = 0
while norm_target < alpha * norm_target_old or n_reorth == 0:
for row in basis:
t = fast_dot(row, target)
target = target - t * row
norm_target_old = norm_target
norm_target = norm(target)
n_reorth += 1
if n_reorth > 4:
# target in span(basis) => accpet target = 0
target = np.zeros(basis.shape[0])
break
return target
class TGA(BaseEstimator, TransformerMixin):
"""Trimmed Grassmann Average as robust PCA
Implementation of Trimmed Grassmann Average by Hauberg S et al.
Parameters
----------
n_components: int, optional, default None
Maximum number of components to keep. When not given or None, this
is set to n_features (the second dimension of the training data).
trim_proportion: float, default 0.5
The proportion with resepct to n_samples to trim when calculating
copy: bool, default True
If False, data passed to fit are overwritten and running
fit(X).transform(X) will not yield the expected results,
use fit_transform(X) instead.
tol: float, default 1e-5
Tolerance for stopping criterion of grassmann average
centering: {'mean', 'median'}, default 'mean'
Whether to center the data with empirical mean or median
random_state: int or RandomState instance or None (default)
Pseudo Random Number generator seed control. If None, use the
numpy.random singleton.
Attributes
----------
components_: array, [n_components, n_features]
Top `n_components` principle components.
center_: array, [n_features]
Per-feature empirical mean or median according to value of 'centering',
estimated from the training set.
References
----------
Hauberg, Soren, Aasa Feragen, and Michael J. Black. "Grassmann averages
for scalable robust PCA." Computer Vision and Pattern Recognition
(CVPR), 2014 IEEE Conference on. IEEE, 2014.
"""
def __init__(self, n_components=None, trim_proportion=0.5, copy=True,
tol=1e-5, centering='mean', random_state=None):
self.n_components = n_components
self.trim_proportion = trim_proportion
self.copy = copy
self.tol = tol
self.centering = centering
self.random_state = random_state
def fit(self, X, y=None):
"""Fit the model with X.
Parameters
----------
X: array-like, shape (n_samples, n_features)
Training data, where n_samples in the number of samples
and n_features is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
self._fit(X)
return self
def fit_transform(self, X, y=None):
"""Fit the model with X and apply the dimensionality reduction on X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
Returns
-------
X_new : array-like, shape (n_samples, n_components)
"""
self._fit(X)
if self.copy and self.center_ is not None:
X = X - self.center_
return fast_dot(X, self.components_.T)
def _fit(self, X):
"""Fit the model on X
Parameters
----------
X: array-like, shape (n_samples, n_features)
Training vector, where n_samples in the number of samples and
n_features is the number of features.
"""
self.nnzs = []
X_orig = X.copy()
if self.trim_proportion < 0 or self.trim_proportion > 0.5:
raise ValueError('`trim_proportion` must be between 0 and 0.5,'
' got %s.' % self.trim_proportion)
lam = 1.0 / np.sqrt(np.max(X.shape))
rng = check_random_state(self.random_state)
X = check_array(X)
self.obj = []
n_samples, n_features = X.shape
X = as_float_array(X, copy=self.copy)
# Center data
if self.centering == 'mean':
self.center_ = np.mean(X, axis=0)
elif self.centering == 'median':
self.center_ = np.median(X, axis=0)
else:
raise ValueError("`centering` must be 'mean' or 'median', "
"got %s" % self.centering)
X -= self.center_
if self.n_components is None:
n_components = X.shape[1]
elif not 0 <= self.n_components <= n_features:
raise ValueError("n_components=%r invalid for n_features=%d"
% (self.n_components, n_features))
else:
n_components = self.n_components
self.components_ = np.empty((n_components, n_features))
for k in range(n_components):
# compute k'th principle component
mu = rng.rand(n_features) - 0.5
mu = mu / norm(mu)
# initialize using a few EM iterations
for i in range(3):
dots = fast_dot(X, mu)
mu = fast_dot(dots.T, X)
mu = mu / norm(mu)
# grassmann average
for i in range(n_samples):
prev_mu = mu
dot_signs = np.sign(fast_dot(X, mu))
mu = _trimmed_mean(X * dot_signs[:, np.newaxis],
self.trim_proportion)
mu = mu / norm(mu)
if np.max(np.abs(mu - prev_mu)) < self.tol:
break
# store the estimated vector and possibly re-orthonormalize
if k > 0:
mu = _reorth(self.components_[:k-1], mu)
mu = mu / norm(mu)
self.components_[k] = mu
if k < n_components - 1:
X = X - fast_dot(fast_dot(X, mu)[:, np.newaxis],
mu[np.newaxis, :])
L = X + self.center_
S = X_orig - L
o = norm_(L, 'nuc') + lam*np.sum(np.abs(S))
#print('TGA Objective = ', o)
self.obj.append(o)
self.nnzs.append(np.sum(S > 0))
def transform(self, X, y=None):
"""Apply dimensionality reduction on X.
X is projected on the principal components previous extracted
from a training set.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples in the number of samples
and n_features is the number of features.
Returns
-------
X_transformed : array-like, shape (n_samples, n_components)
"""
check_is_fitted(self, 'center_')
X = check_array(X)
if self.center_ is not None:
X = X - self.center_
X_transformed = fast_dot(X, self.components_.T)
return X_transformed
def inverse_transform(self, X):
"""Transform data back to its original space, i.e.,
return an input X_original whose transform would be X
Parameters
----------
X : array-like, shape (n_samples, n_components)
New data, where n_samples is the number of samples
and n_components is the number of components.
Returns
-------
X_original: array-like, shape (n_samples, n_features)
"""
check_is_fitted(self, 'center_')
X_original = fast_dot(X, self.components_)
if self.center_ is not None:
X_original = X_original + self.center_
return X_original