Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(adapter): fix elk client panic issue due to uninitialized waitgroup, ctx #64

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 11 additions & 43 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package elasticsearch

Check warning on line 1 in relay-server/elasticsearch/adapter.go

View workflow job for this annotation

GitHub Actions / go-lint

should have a package comment

import (
"bytes"
Expand All @@ -7,6 +7,7 @@
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -15,8 +16,8 @@
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/google/uuid"
pb "github.com/kubearmor/KubeArmor/protobuf"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
"github.com/kubearmor/kubearmor-relay-server/relay-server/server"
)

var (
Expand All @@ -27,7 +28,6 @@

// ElasticsearchClient Structure
type ElasticsearchClient struct {
kaClient *server.LogClient
esClient *elasticsearch.Client
cancel context.CancelFunc
bulkIndexer esutil.BulkIndexer
Expand All @@ -38,7 +38,7 @@
// NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL
// and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays.
// It then creates a new NewBulkIndexer with the esClient
func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error) {
func NewElasticsearchClient(esURL string) (*ElasticsearchClient, error) {
retryBackoff := backoff.NewExponentialBackOff()
cfg := elasticsearch.Config{
Addresses: []string{esURL},
Expand Down Expand Up @@ -69,8 +69,7 @@
log.Fatalf("Error creating the indexer: %s", err)
}
alertCh := make(chan interface{}, 10000)
kaClient := server.NewClient(Endpoint)
return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil
return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil
}

// bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer.
Expand Down Expand Up @@ -108,70 +107,39 @@
}
}

func (ecl *ElasticsearchClient) SendAlertToBuffer(alert *pb.Alert) {
ecl.alertCh <- alert
}

// Start starts the Elasticsearch client by performing a health check on the gRPC server
// and starting goroutines to consume messages from the alert channel and bulk index them.
// The method starts a goroutine for each stream and waits for messages to be received.
// Additional goroutines consume alert from the alert channel and bulk index them.
func (ecl *ElasticsearchClient) Start() error {
start = time.Now()
client := ecl.kaClient
ecl.ctx, ecl.cancel = context.WithCancel(context.Background())

// do healthcheck
if ok := client.DoHealthCheck(); !ok {
return fmt.Errorf("failed to check the liveness of the gRPC server")
}
kg.Printf("Checked the liveness of the gRPC server")

client.WgServer.Go(func() error {
for client.Running {
res, err := client.AlertStream.Recv()
if err != nil {
return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err)
}
tel, _ := json.Marshal(res)
fmt.Printf("%s\n", string(tel))

select {
case ecl.alertCh <- res:
case <-client.Context.Done():
// The context is over, stop processing results
return nil
default:
//not able to add it to Log buffer
}
}
return nil
})
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
for {
select {
case alert := <-ecl.alertCh:
ecl.bulkIndex(alert, "alert")
case <-ecl.ctx.Done():
close(ecl.alertCh)
return
}
}
}()
}
wg.Wait()
return nil
}

// Stop stops the Elasticsearch client and performs necessary cleanup operations.
// It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context.
func (ecl *ElasticsearchClient) Stop() error {
logClient := ecl.kaClient
logClient.Running = false
time.Sleep(2 * time.Second)

//Destoy KubeArmor Relay Client
if err := logClient.DestroyClient(); err != nil {
return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error())
}
kg.Printf("Destroyed kubearmor relay gRPC client")

//Close BulkIndexer
if err := ecl.bulkIndexer.Close(ecl.ctx); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions relay-server/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Authors of KubeArmor

package main

Check warning on line 4 in relay-server/main.go

View workflow job for this annotation

GitHub Actions / go-lint

should have a package comment

import (
"os"
Expand Down Expand Up @@ -56,7 +56,7 @@

//get env
enableEsDashboards := os.Getenv("ENABLE_DASHBOARDS")
esUrl := os.Getenv("ES_URL")

Check warning on line 59 in relay-server/main.go

View workflow job for this annotation

GitHub Actions / go-lint

var esUrl should be esURL
endPoint := os.Getenv("KUBEARMOR_SERVICE")
if endPoint == "" {
endPoint = "localhost:32767"
Expand Down Expand Up @@ -84,12 +84,14 @@

// check and start an elasticsearch client
if enableEsDashboards == "true" {
esCl, err := elasticsearch.NewElasticsearchClient(esUrl, endPoint)
esCl, err := elasticsearch.NewElasticsearchClient(esUrl)
if err != nil {
kg.Warnf("Failed to start a Elasticsearch Client")
return
}
go esCl.Start()
defer esCl.Stop()
relayServer.ELKClient = esCl
go relayServer.ELKClient.Start()
defer relayServer.ELKClient.Stop()
}

// == //
Expand Down
26 changes: 17 additions & 9 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/status"

cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config"
"github.com/kubearmor/kubearmor-relay-server/relay-server/elasticsearch"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
)

Expand Down Expand Up @@ -495,6 +496,9 @@ func (rs *RelayServer) AddAlertFromBuffChan() {
tel, _ := json.Marshal(alert)
fmt.Printf("%s\n", string(tel))
}
if rs.ELKClient != nil {
rs.ELKClient.SendAlertToBuffer(alert)
}
AlertLock.RLock()
for uid := range AlertStructs {
select {
Expand Down Expand Up @@ -591,6 +595,9 @@ type RelayServer struct {

// wait group
WgServer sync.WaitGroup

// ELK adapter
ELKClient *elasticsearch.ElasticsearchClient
}

// LogBufferChannel store incoming data from log stream in buffer
Expand Down Expand Up @@ -681,7 +688,6 @@ func (rs *RelayServer) DestroyRelayServer() error {

// wait for other routines
rs.WgServer.Wait()

return nil
}

Expand Down Expand Up @@ -713,7 +719,6 @@ func DeleteClientEntry(nodeIP string) {
// =============== //

func connectToKubeArmor(nodeID, port string) error {

nodeIP, err := extractIP(nodeID)
if err != nil {
return err
Expand Down Expand Up @@ -787,7 +792,6 @@ func connectToKubeArmor(nodeID, port string) error {

kg.Printf("Destroyed the client (%s)", server)
}

return nil
}

Expand Down Expand Up @@ -815,13 +819,17 @@ func (rs *RelayServer) GetFeedsFromNodes() {
}

for Running {
ip := <-ipsChan
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
select {
case ip := <-ipsChan:
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
case <-time.After(time.Second):
// no op
}
ClientListLock.Unlock()
}
}
}
Loading