Skip to content

Commit

Permalink
Use Channels to receive logs and alerts for adapter
Browse files Browse the repository at this point in the history
Signed-off-by: Harisudarsan <[email protected]>
  • Loading branch information
harisudarsan1 committed Jul 5, 2024
1 parent 47e722b commit 4679b3a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 175 deletions.
143 changes: 12 additions & 131 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dustin/go-humanize"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/google/uuid"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
"log"
"strings"
"sync/atomic"
"time"

pb "github.com/kubearmor/KubeArmor/protobuf"
kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common"

// kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers"
"github.com/kubearmor/kubearmor-relay-server/relay-server/server"
)

Expand All @@ -32,14 +27,10 @@ var (

// ElasticsearchClient Structure
type ElasticsearchClient struct {
kaClient *server.LogClient
esClient *elasticsearch.Client
cancel context.CancelFunc
bulkIndexer esutil.BulkIndexer
ctx context.Context
alertCh chan *pb.Alert
logCh chan *pb.Log
// client *kif.Client
}

// NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL
Expand Down Expand Up @@ -75,27 +66,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
alertCh := make(chan *pb.Alert, 10000)
logCh := make(chan *pb.Log, 10000)
kaClient := server.NewClient(Endpoint)

// k8sClient := kif.GetK8sClient()
// cc := &kif.ClusterCache{
//
// mu: &sync.RWMutex{},
//
// ipPodCache: make(map[string]PodServiceInfo),
// }
// client := &kif.Client{
// k8sClient: k8sClient,
// ClusterIPCache: cc,
// }

// client := kif.InitializeClient()
// go kif.StartInformers(client)

// TODO: remove this informers
return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh, logCh: logCh}, nil
return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient}, nil
}

// bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer.
Expand Down Expand Up @@ -139,115 +110,29 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) {
// Additional goroutines consume alert from the alert channel and bulk index them.
func (ecl *ElasticsearchClient) Start() error {
start = time.Now()
client := ecl.kaClient
// client := ecl.kaClient
ecl.ctx, ecl.cancel = context.WithCancel(context.Background())

// do healthcheck
if ok := client.DoHealthCheck(); !ok {
return fmt.Errorf("failed to check the liveness of the gRPC server")
}
kg.Printf("Checked the liveness of the gRPC server")

// var wg sync.WaitGroup

// stop := make(chan struct{})
// errCh := make(chan error, 1)

client.WgServer.Add(1)
go func() error {
defer client.WgServer.Done()
for client.Running {
res, err := client.AlertStream.Recv()
if err != nil {
return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err)
}
tel, _ := json.Marshal(res)
fmt.Printf("%s\n", string(tel))

select {
case ecl.alertCh <- res:
case <-client.Context.Done():
// The context is over, stop processing results
return nil
default:
//not able to add it to Log buffer
}
}

return nil
}()

for i := 0; i < 5; i++ {
go func() {
for {
select {
case alert := <-ecl.alertCh:
case alert := <-server.ESAlertChannel:
ecl.bulkIndex(alert, "alert")
case <-ecl.ctx.Done():
close(ecl.alertCh)
close(server.ESAlertChannel)
return
}
}
}()
}
client.WgServer.Add(1)
go func() error {

defer client.WgServer.Done()
// client.WatchLogs(&wg, stop, errCh, ecl.logCh)

for client.Running {
res, err := client.LogStream.Recv()
if err != nil {
kg.Warnf("Failed to receive an log (%s)", client.Server)
break
}

if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe {

resourceMap := kl.Extractdata(res.GetResource())
remoteIP := resourceMap["remoteip"]
podserviceInfo, found := ecl.kaClient.Ifclient.ClusterIPCache.Get(remoteIP)

if found {
switch podserviceInfo.Type {
case "POD":
resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName)
data := res.GetData() + fmt.Sprintf(" ownertype=pod")
res.Data = data

res.Resource = resource
// kg.Printf("logData:%s", res.Data)
break
case "SERVICE":
resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName)

data := res.GetData() + fmt.Sprintf(" ownertype=service")
res.Data = data
res.Resource = resource
// kg.Printf("logData:%s", res.Data)

break
}
}

}
tel, _ := json.Marshal(res)
fmt.Printf("%s\n", string(tel))
ecl.logCh <- res
}

return nil
}()

for i := 0; i < 5; i++ {
go func() {
for {
select {
case log := <-ecl.logCh:
case log := <-server.ESLogChannel:
ecl.bulkIndex(log, "log")
case <-ecl.ctx.Done():
close(ecl.logCh)
close(server.ESLogChannel)
return
}
}
Expand All @@ -259,14 +144,10 @@ func (ecl *ElasticsearchClient) Start() error {
// Stop stops the Elasticsearch client and performs necessary cleanup operations.
// It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context.
func (ecl *ElasticsearchClient) Stop() error {
logClient := ecl.kaClient
logClient.Running = false
// logClient := ecl.kaClient
server.Running = false
time.Sleep(2 * time.Second)

//Destoy KubeArmor Relay Client
if err := logClient.DestroyClient(); err != nil {
return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error())
}
kg.Printf("Destroyed kubearmor relay gRPC client")

//Close BulkIndexer
Expand Down
2 changes: 2 additions & 0 deletions relay-server/server/k8sHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var stdoutlogs = false
var stdoutalerts = false
var stdoutmsg = false

var enableEsDashboards = os.Getenv("ENABLE_DASHBOARDS") == "true"

// NewK8sHandler Function
func NewK8sHandler() *K8sHandler {
kh := &K8sHandler{}
Expand Down
106 changes: 62 additions & 44 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,8 @@ type LogClient struct {
// logs
LogStream pb.LogService_WatchLogsClient

//Informerclient
Ifclient *kif.Client

// wait group
WgServer sync.WaitGroup
// // wait group
// WgServer sync.WaitGroup

Context context.Context
}
Expand Down Expand Up @@ -377,15 +374,6 @@ func NewClient(server string) *LogClient {
return nil
}

// start informers
Informerclient := kif.InitializeClient()
go kif.StartInformers(Informerclient)

lc.Ifclient = Informerclient

// var g errgroup.Group
lc.WgServer = sync.WaitGroup{}

return lc
}

Expand Down Expand Up @@ -523,6 +511,11 @@ func (rs *RelayServer) AddAlertFromBuffChan() {
fmt.Printf("%s\n", string(tel))
}
AlertLock.RLock()

if enableEsDashboards {
ESAlertChannel <- (&alert)
}

for uid := range AlertStructs {
select {
case AlertStructs[uid].Broadcast <- (&alert):
Expand Down Expand Up @@ -553,36 +546,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha
default:
if res, err = lc.LogStream.Recv(); err != nil {
errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error())

if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe && res.GetOperation() == "Network" {

resourceMap := kl.Extractdata(res.GetResource())
remoteIP := resourceMap["remoteip"]
podserviceInfo, found := lc.Ifclient.ClusterIPCache.Get(remoteIP)

if found {
switch podserviceInfo.Type {
case "POD":
resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName)
data := res.GetData() + fmt.Sprintf(" ownertype=pod")
res.Data = data

res.Resource = resource
// kg.Printf("logData:%s", res.Data)
break
case "SERVICE":
resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName)

data := res.GetData() + fmt.Sprintf(" ownertype=service")
res.Data = data
res.Resource = resource
// kg.Printf("logData:%s", res.Data)

break
}
}

}
}

select {
Expand All @@ -608,10 +571,45 @@ func (rs *RelayServer) AddLogFromBuffChan() {
if err := kl.Clone(*res, &log); err != nil {
kg.Warnf("Failed to clone a log (%v)", *res)
}

if containsKprobe := strings.Contains(log.Data, "kprobe"); containsKprobe && log.GetOperation() == "Network" {

resourceMap := kl.Extractdata(log.GetResource())
remoteIP := resourceMap["remoteip"]
podserviceInfo, found := rs.Ifclient.ClusterIPCache.Get(remoteIP)

if found {
switch podserviceInfo.Type {
case "POD":
resource := log.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName)
data := log.GetData() + fmt.Sprintf(" ownertype=pod")
log.Data = data

log.Resource = resource
kg.Printf("logData:%s", log.Data)
break
case "SERVICE":
resource := log.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName)

data := log.GetData() + fmt.Sprintf(" ownertype=service")
log.Data = data
log.Resource = resource
kg.Printf("logData:%s", log.Data)

break
}
}

}

if stdoutlogs {
tel, _ := json.Marshal(log)
fmt.Printf("%s\n", string(tel))
}
if enableEsDashboards {
ESLogChannel <- (&log)
}

for uid := range LogStructs {
select {
case LogStructs[uid].Broadcast <- (&log):
Expand Down Expand Up @@ -651,17 +649,26 @@ type RelayServer struct {

// wait group
WgServer sync.WaitGroup

//Informerclient
Ifclient *kif.Client
}

// LogBufferChannel store incoming data from log stream in buffer
var LogBufferChannel chan *pb.Log

// ESLogChannel send logs to ES adapter
var ESLogChannel chan *pb.Log

// MsgBufferChannel store incoming data from Alert stream in buffer
var MsgBufferChannel chan *pb.Message

// AlertBufferChannel store incoming data from msg stream in buffer
var AlertBufferChannel chan *pb.Alert

// ESAlertChannel send alerts to ES adapter
var ESAlertChannel chan *pb.Alert

// NewRelayServer Function
func NewRelayServer(port string) *RelayServer {
rs := &RelayServer{}
Expand All @@ -672,6 +679,11 @@ func NewRelayServer(port string) *RelayServer {
AlertBufferChannel = make(chan *pb.Alert, 1000)
MsgBufferChannel = make(chan *pb.Message, 100)

if enableEsDashboards {
ESLogChannel = make(chan *pb.Log, 10000)
AlertBufferChannel = make(chan *pb.Alert, 1000)
}

// listen to gRPC port
listener, err := net.Listen("tcp", ":"+rs.Port)
if err != nil {
Expand Down Expand Up @@ -717,6 +729,12 @@ func NewRelayServer(port string) *RelayServer {
LogStructs = make(map[string]LogStruct)
LogLock = &sync.RWMutex{}

// start informers
Informerclient := kif.InitializeClient()
go kif.StartInformers(Informerclient)

rs.Ifclient = Informerclient

// set wait group
rs.WgServer = sync.WaitGroup{}

Expand Down

0 comments on commit 4679b3a

Please sign in to comment.