diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 4fa0bdd..108cac4 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -197,17 +197,17 @@ func (ecl *ElasticsearchClient) PrintBulkStats() { if biStats.NumFailed > 0 { fmt.Printf( "Indexed [%s] documents with [%s] errors in %s (%s docs/sec)", - humanize.Comma(int64(biStats.NumFlushed)), - humanize.Comma(int64(biStats.NumFailed)), + humanize.Commaf(float64(biStats.NumFlushed)), + humanize.Commaf(float64(biStats.NumFailed)), dur.Truncate(time.Millisecond), - humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), + humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), ) } else { log.Printf( "Sucessfuly indexed [%s] documents in %s (%s docs/sec)", - humanize.Comma(int64(biStats.NumFlushed)), + humanize.Commaf(float64(biStats.NumFlushed)), dur.Truncate(time.Millisecond), - humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), + humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), ) } println(strings.Repeat("▔", 80)) diff --git a/relay-server/server/k8sHandler.go b/relay-server/server/k8sHandler.go index f19d436..d877a07 100644 --- a/relay-server/server/k8sHandler.go +++ b/relay-server/server/k8sHandler.go @@ -259,18 +259,38 @@ func (kh *K8sHandler) getKaPodInformer(ipsChan chan string) cache.SharedIndexInf _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod, ok := obj.(*corev1.Pod) - if ok { - if pod.Status.PodIP != "" { - ipsChan <- pod.Status.PodIP - } + if !ok { + return + } + + if pod.Status.PodIP != "" { + ipsChan <- pod.Status.PodIP } }, UpdateFunc: func(old, new interface{}) { + oldPod, ok := old.(*corev1.Pod) + if !ok { + return + } + newPod, ok := new.(*corev1.Pod) - if ok { - if newPod.Status.PodIP != "" { - ipsChan <- newPod.Status.PodIP - } + if !ok { + return + } + + if newPod.Status.PodIP != "" && newPod.Status.PodIP != oldPod.Status.PodIP { + ipsChan <- newPod.Status.PodIP + DeleteClientEntry(oldPod.Status.PodIP) + } + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + + if pod.Status.PodIP != "" { + DeleteClientEntry(pod.Status.PodIP) } }, }) diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 5770faf..159aa8a 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -38,13 +38,12 @@ var Running bool var ClientList map[string]int // ClientListLock Lock -var ClientListLock *sync.Mutex +var ClientListLock *sync.RWMutex func init() { Running = true ClientList = map[string]int{} - ClientListLock = &sync.Mutex{} - + ClientListLock = &sync.RWMutex{} } // ========== // @@ -706,11 +705,7 @@ func DeleteClientEntry(nodeIP string) { ClientListLock.Lock() defer ClientListLock.Unlock() - _, exists := ClientList[nodeIP] - - if exists { - delete(ClientList, nodeIP) - } + delete(ClientList, nodeIP) } // =============== // @@ -722,66 +717,72 @@ func connectToKubeArmor(nodeIP, port string) error { // create connection info server := nodeIP + ":" + port - defer DeleteClientEntry(nodeIP) + for Running { + ClientListLock.RLock() + _, found := ClientList[nodeIP] + ClientListLock.RUnlock() + if !found { + // KubeArmor with this IP is deleted or the IP has changed + // parent function will spawn a new goroutine accordingly + break + } - // create a client - client := NewClient(server) - if client == nil { - return nil - } + // create a client + client := NewClient(server) + if client == nil { + time.Sleep(5 * time.Second) // wait for 5 second before retrying + continue + } - // do healthcheck - if ok := client.DoHealthCheck(); !ok { - kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server) - return nil - } - kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server) + // do healthcheck + if ok := client.DoHealthCheck(); !ok { + kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server) + time.Sleep(5 * time.Second) // wait for 5 second before retrying + continue + } + kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server) - var wg sync.WaitGroup - stop := make(chan struct{}) - errCh := make(chan error, 1) + var wg sync.WaitGroup + stop := make(chan struct{}) + errCh := make(chan error, 1) - // Start watching messages - wg.Add(1) - go func() { - client.WatchMessages(&wg, stop, errCh) - }() - kg.Print("Started to watch messages from " + server) + // Start watching messages + wg.Add(1) + go client.WatchMessages(&wg, stop, errCh) + kg.Print("Started to watch messages from " + server) - // Start watching alerts - wg.Add(1) - go func() { - client.WatchAlerts(&wg, stop, errCh) - }() - kg.Print("Started to watch alerts from " + server) + // Start watching alerts + wg.Add(1) + go client.WatchAlerts(&wg, stop, errCh) + kg.Print("Started to watch alerts from " + server) - // Start watching logs - wg.Add(1) - go func() { - client.WatchLogs(&wg, stop, errCh) - }() - kg.Print("Started to watch logs from " + server) - - // Wait for an error or all goroutines to finish - select { - case err := <-errCh: - close(stop) // Stop other goroutines - kg.Warn(err.Error()) - case <-func() chan struct{} { - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - return done - }(): - // All goroutines finished without error - } + // Start watching logs + wg.Add(1) + go client.WatchLogs(&wg, stop, errCh) + kg.Print("Started to watch logs from " + server) - if err := client.DestroyClient(); err != nil { - kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error()) + // Wait for an error or all goroutines to finish + select { + case err := <-errCh: + close(stop) // Stop other goroutines + kg.Warn(err.Error()) + case <-func() chan struct{} { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return done + }(): + // All goroutines finished without error + } + + if err := client.DestroyClient(); err != nil { + kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error()) + } + + kg.Printf("Destroyed the client (%s)", server) } - kg.Printf("Destroyed the client (%s)", server) return nil } @@ -810,16 +811,13 @@ func (rs *RelayServer) GetFeedsFromNodes() { } for Running { - select { - case ip := <-ipsChan: - ClientListLock.Lock() - if _, ok := ClientList[ip]; !ok { - ClientList[ip] = 1 - go connectToKubeArmor(ip, rs.Port) - } - ClientListLock.Unlock() + ip := <-ipsChan + ClientListLock.Lock() + if _, ok := ClientList[ip]; !ok { + ClientList[ip] = 1 + go connectToKubeArmor(ip, rs.Port) } - time.Sleep(10 * time.Second) + ClientListLock.Unlock() } } }