-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
150 lines (120 loc) · 3.83 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"bytes"
"context"
"errors"
"fmt"
"os/exec"
"path/filepath"
"regexp"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
// access kubernetes config
var home = homedir.HomeDir()
var kubeconfig = filepath.Join(home, ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
// ensure kubernetes is configured
if err != nil {
panic(err.Error())
}
// Configure dynamic kubeclient
dyclient, err := dynamic.NewForConfig(config)
// ensure no error in obtaining the kubernetes client
if err != nil {
panic(err.Error())
}
// attempts to begin watching the namespaces
// returns a `watch.Interface`, or an error
// create the group version response
gvr, _ := schema.ParseResourceArg("pods.v1.")
// configure options
b := true
opts := metav1.ListOptions{
Watch: true,
SendInitialEvents: &b,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
}
// check if WatchList feature is enable
watchList := checkWatchListFeatureBruteForce(dyclient)
if !watchList {
if err != nil {
panic(errors.New("WatchList feature not enabled"))
}
}
// create the watch command
watcher, err := dyclient.Resource(*gvr).Watch(context.TODO(), opts)
// Create a stop channel to signal the goroutine to stop
stopChannel := make(chan struct{})
// create watch group for synchronizing go routines
var wg sync.WaitGroup
// Launch the goroutine and pass the channel as an argument
wg.Add(1)
go backgroundProcessor(watcher.ResultChan(), stopChannel, &wg)
wg.Wait()
}
func backgroundProcessor(result <-chan watch.Event, stopCh chan struct{}, wg *sync.WaitGroup) {
for {
for event := range result {
// retrieve the Namespace
// item := event.Object.(*unstructured.Unstructured)
obj := event.Object.(*unstructured.Unstructured)
switch event.Type {
// when an event is deleted...
case watch.Deleted:
fmt.Println("Received DELETE event for: ", obj.GetName(), "/", obj.GetNamespace(), " of kind: ", obj.GroupVersionKind().Kind)
// when an event is added...
case watch.Added:
fmt.Println("Received ADD event for: ", obj.GetName(), "/", obj.GetNamespace(), " of kind: ", obj.GroupVersionKind().Kind)
// when an event is added...
case watch.Modified:
fmt.Println("Received UPDATE event for: ", obj.GetName(), "/", obj.GetNamespace(), " of kind: ", obj.GroupVersionKind().Kind)
}
// check if channel is closed
if len(stopCh) > 0 {
println("Goroutines killed!")
wg.Done()
return
}
}
}
}
// checkWatchListFeatureOs checks whether the WatchList feature gate is enabled
// by doing a ps aux command and matching the output with 'WatchList=true' string that would signify
// the feature being set
func checkWatchListFeatureOs() bool {
cmd := exec.Command("ps", "aux")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
print(err.Error())
}
r, _ := regexp.Compile("WatchList=true")
if r.MatchString(out.String()) {
return true
}
return false
}
// checkWatchListFeatureBruteForce checks if the WatchList feature is present by doing a test
// streaming list watch command on a simple pod and watching the result, a positive result
// means the feature is enabled
func checkWatchListFeatureBruteForce(client dynamic.Interface) bool {
b := true
opts := metav1.ListOptions{
Watch: true,
SendInitialEvents: &b,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
}
gvr, _ := schema.ParseResourceArg("pods.v1.")
_, err := client.Resource(*gvr).Watch(context.TODO(), opts)
return err == nil
}