Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add network metadata to telemetry events using K8s informers #57

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
25 changes: 25 additions & 0 deletions relay-server/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"strings"
)

func Extractdata(body string) map[string]string {

pairs := strings.Split(body, " ")

// Initialize a map to store extracted values
dataMap := make(map[string]string)

// Loop through each key-value pair
for _, pair := range pairs {
// Split each pair by '=' to separate key and value
parts := strings.Split(pair, "=")
if len(parts) == 2 {
key := parts[0]
value := parts[1]
dataMap[key] = value
}
}
return dataMap
}
69 changes: 23 additions & 46 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dustin/go-humanize"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/google/uuid"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
"log"
"strings"
"sync/atomic"
"time"

"github.com/kubearmor/kubearmor-relay-server/relay-server/server"
)

Expand All @@ -27,12 +27,10 @@ var (

// ElasticsearchClient Structure
type ElasticsearchClient struct {
kaClient *server.LogClient
esClient *elasticsearch.Client
cancel context.CancelFunc
bulkIndexer esutil.BulkIndexer
ctx context.Context
alertCh chan interface{}
}

// NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL
Expand Down Expand Up @@ -68,9 +66,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
alertCh := make(chan interface{}, 10000)
kaClient := server.NewClient(Endpoint)
return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil
return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient}, nil
}

// bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer.
Expand Down Expand Up @@ -114,44 +110,29 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) {
// Additional goroutines consume alert from the alert channel and bulk index them.
func (ecl *ElasticsearchClient) Start() error {
start = time.Now()
client := ecl.kaClient
// client := ecl.kaClient
ecl.ctx, ecl.cancel = context.WithCancel(context.Background())

// do healthcheck
if ok := client.DoHealthCheck(); !ok {
return fmt.Errorf("failed to check the liveness of the gRPC server")
}
kg.Printf("Checked the liveness of the gRPC server")

client.WgServer.Go(func() error {
for client.Running {
res, err := client.AlertStream.Recv()
if err != nil {
return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err)
}
tel, _ := json.Marshal(res)
fmt.Printf("%s\n", string(tel))

select {
case ecl.alertCh <- res:
case <-client.Context.Done():
// The context is over, stop processing results
return nil
default:
//not able to add it to Log buffer
}
}
return nil
})

for i := 0; i < 5; i++ {
go func() {
for {
select {
case alert := <-ecl.alertCh:
case alert := <-server.ESAlertChannel:
ecl.bulkIndex(alert, "alert")
case <-ecl.ctx.Done():
close(ecl.alertCh)
close(server.ESAlertChannel)
return
}
}
}()

go func() {
for {
select {
case log := <-server.ESLogChannel:
ecl.bulkIndex(log, "log")
case <-ecl.ctx.Done():
close(server.ESLogChannel)
return
}
}
Expand All @@ -163,14 +144,10 @@ func (ecl *ElasticsearchClient) Start() error {
// Stop stops the Elasticsearch client and performs necessary cleanup operations.
// It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context.
func (ecl *ElasticsearchClient) Stop() error {
logClient := ecl.kaClient
logClient.Running = false
// logClient := ecl.kaClient
server.Running = false
time.Sleep(2 * time.Second)

//Destoy KubeArmor Relay Client
if err := logClient.DestroyClient(); err != nil {
return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error())
}
kg.Printf("Destroyed kubearmor relay gRPC client")

//Close BulkIndexer
Expand Down
37 changes: 18 additions & 19 deletions relay-server/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/kubearmor/kubearmor-relay-server/relay-server

go 1.22
go 1.21.0

// toolchain go1.21.4

replace (
github.com/kubearmor/kubearmor-relay-server/relay-server => ./
Expand All @@ -14,29 +16,29 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/google/uuid v1.6.0
github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342
github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271
github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9
github.com/spf13/viper v1.18.2
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.63.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
k8s.io/client-go v0.29.0
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -47,7 +49,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/gomega v1.30.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand All @@ -58,23 +59,21 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect
k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
Loading
Loading