From a93cd44d3d73baa3c252df9c82a7771b546df5d1 Mon Sep 17 00:00:00 2001 From: rksharma95 Date: Mon, 16 Dec 2024 23:31:47 +0530 Subject: [PATCH 1/2] fix elk client panic issue due to uninitialized waitgroup, ctx Signed-off-by: rksharma95 --- relay-server/elasticsearch/adapter.go | 4 +++- relay-server/main.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 108cac4..598bda7 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -17,6 +17,7 @@ import ( "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" "github.com/kubearmor/kubearmor-relay-server/relay-server/server" + "golang.org/x/sync/errgroup" ) var ( @@ -116,7 +117,8 @@ func (ecl *ElasticsearchClient) Start() error { start = time.Now() client := ecl.kaClient ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) - + client.WgServer = &errgroup.Group{} + client.Context = ecl.ctx // do healthcheck if ok := client.DoHealthCheck(); !ok { return fmt.Errorf("failed to check the liveness of the gRPC server") diff --git a/relay-server/main.go b/relay-server/main.go index 251f604..2f6cef8 100644 --- a/relay-server/main.go +++ b/relay-server/main.go @@ -87,6 +87,7 @@ func main() { esCl, err := elasticsearch.NewElasticsearchClient(esUrl, endPoint) if err != nil { kg.Warnf("Failed to start a Elasticsearch Client") + return } go esCl.Start() defer esCl.Stop() From 7ed8440e28afe9c1876c73ea2720257dfb15650f Mon Sep 17 00:00:00 2001 From: rksharma95 Date: Tue, 17 Dec 2024 16:04:31 +0530 Subject: [PATCH 2/2] send alert to elk adapter directly from relay buffer,and handle interrupts gracefully Signed-off-by: rksharma95 --- relay-server/elasticsearch/adapter.go | 56 ++++++--------------------- relay-server/main.go | 7 ++-- relay-server/server/relayServer.go | 26 ++++++++----- 3 files changed, 32 insertions(+), 57 deletions(-) diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 598bda7..8fc3058 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "strings" + "sync" "sync/atomic" "time" @@ -15,9 +16,8 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/google/uuid" + pb "github.com/kubearmor/KubeArmor/protobuf" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" - "github.com/kubearmor/kubearmor-relay-server/relay-server/server" - "golang.org/x/sync/errgroup" ) var ( @@ -28,7 +28,6 @@ var ( // ElasticsearchClient Structure type ElasticsearchClient struct { - kaClient *server.LogClient esClient *elasticsearch.Client cancel context.CancelFunc bulkIndexer esutil.BulkIndexer @@ -39,7 +38,7 @@ type ElasticsearchClient struct { // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL // and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays. // It then creates a new NewBulkIndexer with the esClient -func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error) { +func NewElasticsearchClient(esURL string) (*ElasticsearchClient, error) { retryBackoff := backoff.NewExponentialBackOff() cfg := elasticsearch.Config{ Addresses: []string{esURL}, @@ -70,8 +69,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error log.Fatalf("Error creating the indexer: %s", err) } alertCh := make(chan interface{}, 10000) - kaClient := server.NewClient(Endpoint) - return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil + return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil } // bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer. @@ -109,71 +107,39 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) { } } +func (ecl *ElasticsearchClient) SendAlertToBuffer(alert *pb.Alert) { + ecl.alertCh <- alert +} + // Start starts the Elasticsearch client by performing a health check on the gRPC server // and starting goroutines to consume messages from the alert channel and bulk index them. // The method starts a goroutine for each stream and waits for messages to be received. // Additional goroutines consume alert from the alert channel and bulk index them. func (ecl *ElasticsearchClient) Start() error { start = time.Now() - client := ecl.kaClient ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) - client.WgServer = &errgroup.Group{} - client.Context = ecl.ctx - // do healthcheck - if ok := client.DoHealthCheck(); !ok { - return fmt.Errorf("failed to check the liveness of the gRPC server") - } - kg.Printf("Checked the liveness of the gRPC server") - - client.WgServer.Go(func() error { - for client.Running { - res, err := client.AlertStream.Recv() - if err != nil { - return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err) - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - - select { - case ecl.alertCh <- res: - case <-client.Context.Done(): - // The context is over, stop processing results - return nil - default: - //not able to add it to Log buffer - } - } - return nil - }) + var wg sync.WaitGroup for i := 0; i < 5; i++ { + wg.Add(1) go func() { for { select { case alert := <-ecl.alertCh: ecl.bulkIndex(alert, "alert") case <-ecl.ctx.Done(): - close(ecl.alertCh) return } } }() } + wg.Wait() return nil } // Stop stops the Elasticsearch client and performs necessary cleanup operations. // It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context. func (ecl *ElasticsearchClient) Stop() error { - logClient := ecl.kaClient - logClient.Running = false - time.Sleep(2 * time.Second) - - //Destoy KubeArmor Relay Client - if err := logClient.DestroyClient(); err != nil { - return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error()) - } - kg.Printf("Destroyed kubearmor relay gRPC client") //Close BulkIndexer if err := ecl.bulkIndexer.Close(ecl.ctx); err != nil { diff --git a/relay-server/main.go b/relay-server/main.go index 2f6cef8..bafad21 100644 --- a/relay-server/main.go +++ b/relay-server/main.go @@ -84,13 +84,14 @@ func main() { // check and start an elasticsearch client if enableEsDashboards == "true" { - esCl, err := elasticsearch.NewElasticsearchClient(esUrl, endPoint) + esCl, err := elasticsearch.NewElasticsearchClient(esUrl) if err != nil { kg.Warnf("Failed to start a Elasticsearch Client") return } - go esCl.Start() - defer esCl.Stop() + relayServer.ELKClient = esCl + go relayServer.ELKClient.Start() + defer relayServer.ELKClient.Stop() } // == // diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 1675dcb..95c2265 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/status" cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config" + "github.com/kubearmor/kubearmor-relay-server/relay-server/elasticsearch" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" ) @@ -495,6 +496,9 @@ func (rs *RelayServer) AddAlertFromBuffChan() { tel, _ := json.Marshal(alert) fmt.Printf("%s\n", string(tel)) } + if rs.ELKClient != nil { + rs.ELKClient.SendAlertToBuffer(alert) + } AlertLock.RLock() for uid := range AlertStructs { select { @@ -591,6 +595,9 @@ type RelayServer struct { // wait group WgServer sync.WaitGroup + + // ELK adapter + ELKClient *elasticsearch.ElasticsearchClient } // LogBufferChannel store incoming data from log stream in buffer @@ -681,7 +688,6 @@ func (rs *RelayServer) DestroyRelayServer() error { // wait for other routines rs.WgServer.Wait() - return nil } @@ -713,7 +719,6 @@ func DeleteClientEntry(nodeIP string) { // =============== // func connectToKubeArmor(nodeID, port string) error { - nodeIP, err := extractIP(nodeID) if err != nil { return err @@ -787,7 +792,6 @@ func connectToKubeArmor(nodeID, port string) error { kg.Printf("Destroyed the client (%s)", server) } - return nil } @@ -815,13 +819,17 @@ func (rs *RelayServer) GetFeedsFromNodes() { } for Running { - ip := <-ipsChan - ClientListLock.Lock() - if _, ok := ClientList[ip]; !ok { - ClientList[ip] = 1 - go connectToKubeArmor(ip, rs.Port) + select { + case ip := <-ipsChan: + ClientListLock.Lock() + if _, ok := ClientList[ip]; !ok { + ClientList[ip] = 1 + go connectToKubeArmor(ip, rs.Port) + } + ClientListLock.Unlock() + case <-time.After(time.Second): + // no op } - ClientListLock.Unlock() } } }