-
Notifications
You must be signed in to change notification settings - Fork 0
/
deepclustering.py
178 lines (143 loc) · 5.75 KB
/
deepclustering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import torch
import torch.nn as nn
from sklearn.cluster import KMeans
class ClusterAssignment(nn.Module):
def __init__(
self,
num_cluster: int,
hidden_dim: int,
alpha: float = 1.0,
centroids: torch.tensor = None,
):
"""
Module to handle the soft assignment, for a description see in 3.1.1. in Xie/Girshick/Farhadi,
where the Student's t-distribution is used measure similarity between feature vector and each
cluster centroid.
Arguments:
num_cluster (int): number of clusters
hidden_dim (int): dimension of bottleneck layer
alpha (float): parameter representing the degrees of freedom in the t-distribution, default 1.0
centroids: clusters centers to initialise, if None then use Xavier uniform
Return:
q: student's t-distribution, or soft labels for each sample. shape=(n_samples, n_clusters)
"""
super().__init__()
self.num_cluster = num_cluster
self.hidden_dim = hidden_dim
self.alpha = alpha
self.centroids = centroids
if centroids is None:
centroids = torch.zeros(
self.num_cluster, self.hidden_dim, dtype=torch.float
)
nn.init.xavier_uniform_(centroids)
self.centroids = nn.Parameter(centroids)
def forward(self, z):
diff = torch.sum((z.unsqueeze(1) - self.centroids) ** 2, 2)
numerator = 1.0 / (1.0 + (diff / self.alpha))
power = (self.alpha + 1.0) / 2
numerator = numerator ** power
q = numerator / torch.sum(numerator, dim=1, keepdim=True)
return q
class DEC(nn.Module):
'''
DEC algorithm implementation.
Read Unsupervised Deep Embedding for Clustering Analysis (2016) by Junyuan Xie et al. for details.
Arguments:
autoencoder (nn.Module): autoencoder to use
n_clusters (int): number of clusters
alpha (float): parameter representing the degrees of freedom in the t-distribution, default 1.0
centroids: clusters centers to initialise
Main attributes:
encoder (nn.Module): pretrained encoder which will be used for cluster assignment
assignment (nn.Module): soft cluster assignment with shape == (batch_size, hid_dim, n_clusters)
kmeans (KMeans): need for initial cluster initialization
Methods:
get_target_distribution: get t-disributiion as described in Xie et al. (2016)
forward: compute cluster assignment
Return:
Soft cluster assignment (batch_size, hid_dim, n_clusters)
'''
def __init__(
self,
autoencoder: nn.Module,
n_clusters: int = 10,
alpha: float = 1.0,
centroids = None,
):
super().__init__()
self.encoder = autoencoder.encoder
self.n_clusters = n_clusters
self.hidden_dim = autoencoder.hid_dim
self.alpha = alpha
self.assignment = ClusterAssignment(
n_clusters, autoencoder.hid_dim, alpha, centroids
)
self.kmeans = KMeans(n_clusters, n_init=20)
self.initialized = False
def forward(self, x):
'''
Compute the cluster assignment using the ClusterAssignment after running the batch
through the encoder part of the associated AutoEncoder module.
:x: [batch size, embedding dimension] FloatTensor
:return: [batch size, number of clusters] FloatTensor
'''
return self.assignment(self.encoder(x))
def get_target_distribution(self, q):
numerator = (q ** 2) / torch.sum(q, 0)
p = (numerator.t() / torch.sum(numerator, 1)).t()
return p
class IDEC(DEC):
'''
IDEC algorithm implementation.
Read "Improved Deep Embedded Clustering with Local Structure Preservation" (2017) by Xifeng Guo et al. for details.
Arguments:
autoencoder (nn.Module): autoencoder to use
n_clusters (int): number of clusters
alpha (float): parameter representing the degrees of freedom in the t-distribution, default 1.0
centroids: clusters centers to initialise
Main attributes:
encoder (nn.Module): pretrained encoder used for minimization of clustering loss
decoder (nn.Module): pretrained decoder used for minimization of reconstruction loss
assignment (nn.Module):
kmeans (KMeans): need for initial cluster initialization
Methods:
get_target_distribution: get t-disributiion as described in Xie et al. (2016)
forward: compute cluster assignment
Return:
forward: (soft cluster assignment, reconstructed embeddings)
'''
def __init__(
self,
autoencoder: nn.Module,
n_clusters: int = 10,
alpha: float = 1.0,
):
super().__init__(
autoencoder,
n_clusters,
alpha,
)
self.decoder = autoencoder.decoder
def forward(self, x):
return(
self.assignment(self.encoder(x)),
self.decoder(self.encoder(x))
)
class IDEC_loss(nn.Module):
'''
IDEC loss used for optimization of DEC algorithm.
Read "Improved Deep Embedded Clustering with Local Structure Preservation" (2017) by Xifeng Guo et al. for details.
'''
def __init__(self, gamma=.1):
super().__init__();
self.gamma = gamma
def forward(self, pred_cl, targ_cl, pred_rec, targ_rec, ):
"""
See original paper for explanation
"""
loss_cl = nn.KLDivLoss(reduction='batchmean')
loss_rec = nn.MSELoss()
self.cl_loss = self.gamma * loss_cl(pred_cl, targ_cl) #value
self.rec_loss = loss_rec(pred_rec, targ_rec) #value
return self.rec_loss + self.cl_loss