-
Notifications
You must be signed in to change notification settings - Fork 1
/
kube.go
271 lines (217 loc) · 7.08 KB
/
kube.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/kontena/pharos-host-upgrades/hosts"
"github.com/kontena/pharos-host-upgrades/kube"
"github.com/kontena/pharos-host-upgrades/kubectl"
)
const KubeLockAnnotation = "pharos-host-upgrades.kontena.io/lock"
const KubeDrainAnnotation = "pharos-host-upgrades.kontena.io/drain"
const KubeRebootAnnotation = "pharos-host-upgrades.kontena.io/reboot"
type KubeOptions struct {
kube.Options
}
func (options KubeOptions) IsSet() bool {
return !(options.Namespace == "" && options.DaemonSet == "" && options.Node == "")
}
type Kube struct {
options kube.Options
hostInfo hosts.Info
kube *kube.Kube
lock *kube.Lock
node *kube.Node
}
func makeKube(options Options, hostInfo hosts.Info) (*Kube, error) {
var k = Kube{
options: options.Kube.Options,
hostInfo: hostInfo,
}
if !options.Kube.IsSet() {
log.Printf("No --kube configuration")
return nil, nil
}
log.Printf("Using --kube-namespace=%v --kube-daemonset=%v --kube-node=%v",
options.Kube.Namespace,
options.Kube.DaemonSet,
options.Kube.Node,
)
if kube, err := kube.New(options.Kube.Options); err != nil {
return nil, err
} else {
k.kube = kube
}
if err := k.initNode(); err != nil {
return nil, err
}
if err := k.initLock(); err != nil {
return nil, err
}
// verifies host <=> node state, fails if not rebooted
if err := k.clearNodeReboot(); err != nil {
return nil, err
}
// this happens even without the reboot annotation set, we do not want to leave the node drained in case of errors
if err := k.clearNodeDrain(); err != nil {
return nil, err
}
// clear lock if acquired, assuming that host is now in a good state (rebooted, undrained)
if err := k.clearLock(); err != nil {
return nil, err
}
return &k, nil
}
func (k *Kube) initNode() error {
if kubeNode, err := k.kube.Node(); err != nil {
return err
} else {
k.node = kubeNode
}
return nil
}
func (k *Kube) initLock() error {
if kubeLock, err := k.kube.Lock(KubeLockAnnotation); err != nil {
return err
} else {
k.lock = kubeLock
}
return nil
}
func (k *Kube) checkReboot() (time.Time, bool, error) {
var t time.Time
if value, exists, err := k.node.GetAnnotation(KubeRebootAnnotation); err != nil {
return t, false, fmt.Errorf("Faield to get node reboot annotation: %v", err)
} else if !exists {
return t, false, nil
} else if err := json.Unmarshal([]byte(value), &t); err != nil {
return t, true, fmt.Errorf("Failed to unmarshal reboot annotation: %v", err)
} else {
return t, true, nil
}
}
// check and clear node reboot state/status
// fails if expected to reboot, but did not reboot
func (k *Kube) clearNodeReboot() error {
if rebootTime, rebooting, err := k.checkReboot(); err != nil {
return err
} else if !rebooting {
log.Printf("Initialized kube node %v (not rebooting)", k.node)
return nil
} else if !k.hostInfo.BootTime.After(rebootTime) {
return fmt.Errorf("Kube node %v is still rebooting (reboot=%v >= boot=%v)", k.node, rebootTime, k.hostInfo.BootTime)
} else if err := k.node.SetCondition(MakeRebootConditionRebooted(k.hostInfo.BootTime)); err != nil {
log.Printf("Failed to update node %v condition: %v", k.node, err)
return nil
} else if err := k.node.ClearAnnotation(KubeRebootAnnotation); err != nil {
return fmt.Errorf("Failed to clear reboot annotation: %v", err)
} else {
log.Printf("Kube node %v was rebooted (reboot=%v < boot=%v)...", k.node, rebootTime, k.hostInfo.BootTime)
return nil
}
}
// uncordon if drained before reboot
func (k *Kube) clearNodeDrain() error {
if changed, err := k.node.SetSchedulableIfAnnotated(KubeDrainAnnotation); err != nil {
return fmt.Errorf("Failed to clear node drain state: %v", err)
} else if changed {
log.Printf("Uncordoned drained kube node %v (with annotation %v)", k.node, KubeDrainAnnotation)
return nil
} else {
log.Printf("Kube node %v is not marked as drained (with annotation %v)", k.node, KubeDrainAnnotation)
return nil
}
}
// release lock if still acquired
func (k *Kube) clearLock() error {
if value, acquired, err := k.lock.Test(); err != nil {
return fmt.Errorf("Failed to test lock %v: %v", k.lock, err)
} else if !acquired {
log.Printf("Using kube lock %v (not acquired, value=%v)", k.lock, value)
} else if err := k.lock.Release(); err != nil {
return fmt.Errorf("Failed to release lock %v: %v", k.lock, err)
} else {
log.Printf("Released kube lock %v (value=%v)", k.lock, value)
}
return nil
}
// attempts to acquire the kube lock until the context expires
func (k *Kube) AcquireLock(ctx context.Context) error {
if k == nil || k.lock == nil {
log.Printf("Skip kube locking")
return nil
}
log.Printf("Acquiring kube lock...")
var wait = 1 * time.Second
var waitFactor = 2
var maxWait = 1 * time.Minute
for {
if err := ctx.Err(); err != nil {
return err
} else if err := k.lock.Acquire(ctx); err != nil {
log.Printf("Acquiring kube lock failed, retrying: %v", err)
} else {
return nil
}
// don't hammer the API server too hard...
time.Sleep(wait)
wait *= time.Duration(waitFactor)
if wait > maxWait {
wait = maxWait
}
}
}
func (k *Kube) ReleaseLock() error {
if k == nil || k.lock == nil {
log.Printf("Skip kube unlocking")
return nil
}
log.Printf("Releasing kube lock...")
return k.lock.Release()
}
// Update node status condition based on function execution
func (k *Kube) UpdateHostStatus(status hosts.Status, upgradeErr error) error {
if k == nil || k.node == nil {
log.Printf("Skip updating kube node condition")
return nil
}
log.Printf("Update kube node %v condition for status=%v with error: %v", k.node, status, upgradeErr)
if err := k.node.SetCondition(
MakeUpgradeCondition(status, upgradeErr),
MakeRebootCondition(k.hostInfo, status, upgradeErr),
); err != nil {
log.Printf("Failed to update node %v condition: %v", k.node, err)
}
return nil
}
func (k *Kube) DrainNode() error {
if k == nil || k.node == nil {
return fmt.Errorf("No --kube-node configured")
}
log.Printf("Draining kube node %v (with annotation %v)...", k.node, KubeDrainAnnotation)
if err := k.node.SetAnnotation(KubeDrainAnnotation, "true"); err != nil {
return fmt.Errorf("Failed to set node annotation for drain: %v", err)
} else if err := kubectl.Drain(k.options.Node); err != nil {
return fmt.Errorf("Failed to drain node %v: %v", k.options.Node, err)
} else {
return nil
}
}
func (k *Kube) MarkReboot(rebootTime time.Time) error {
if k == nil || k.node == nil {
log.Printf("Skip kube node reboot marking")
return nil
}
log.Printf("Marking kube node %v for reboot (with annotation %v=%v)...", k.node, KubeRebootAnnotation, rebootTime)
if value, err := json.Marshal(rebootTime); err != nil {
return fmt.Errorf("Failed to marshal reboot annotation: %v", err)
} else if err := k.node.SetAnnotation(KubeRebootAnnotation, string(value)); err != nil {
return fmt.Errorf("Failed to set node annotation for reboot: %v", err)
} else if err := k.node.SetCondition(MakeRebootConditionRebooting(rebootTime)); err != nil {
return fmt.Errorf("Failed to set node condition for reboot: %v", err)
} else {
return nil
}
}