From 475a2035911189d464b416a455fd02c336daf735 Mon Sep 17 00:00:00 2001 From: Rudraksh Pareek Date: Wed, 22 May 2024 18:55:22 +0530 Subject: [PATCH 1/8] Update stable release to v1.2.3 Signed-off-by: Rudraksh Pareek --- STABLE-RELEASE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/STABLE-RELEASE b/STABLE-RELEASE index cc90463..4367f90 100644 --- a/STABLE-RELEASE +++ b/STABLE-RELEASE @@ -1 +1 @@ -v1.2.2 +v1.2.3 From 6305374c2e1982c2f5d06d827d1d36b028f0bf5d Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Mon, 10 Jun 2024 11:52:20 +0530 Subject: [PATCH 2/8] feat: Add informers and IP map cache feat: Add informers and IP map cache Signed-off-by: Harisudarsan change docker build image Signed-off-by: Harisudarsan --- relay-server/Dockerfile | 10 +- relay-server/common/utils.go | 25 +++ relay-server/elasticsearch/adapter.go | 84 ++++++++- relay-server/go.mod | 34 ++-- relay-server/go.sum | 93 ++++------ relay-server/informers/informercache.go | 227 ++++++++++++++++++++++++ relay-server/server/relayServer.go | 42 +++++ 7 files changed, 435 insertions(+), 80 deletions(-) create mode 100644 relay-server/common/utils.go create mode 100644 relay-server/informers/informercache.go diff --git a/relay-server/Dockerfile b/relay-server/Dockerfile index 1037955..2a0a0eb 100644 --- a/relay-server/Dockerfile +++ b/relay-server/Dockerfile @@ -9,7 +9,7 @@ WORKDIR /usr/src/kubearmor-relay-server COPY . . -RUN cd relay-server && make +RUN make ### Copy executable image @@ -26,14 +26,14 @@ LABEL name="kubearmor-relay-server" \ alerts, and system logs generated by KubeArmor in each node, streamlining log integration with other systems." RUN microdnf -y update && \ - microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \ - microdnf clean all + microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \ + microdnf clean all RUN groupadd --gid 1000 default \ - && useradd --uid 1000 --gid default --shell /bin/bash --create-home default + && useradd --uid 1000 --gid default --shell /bin/bash --create-home default COPY LICENSE /licenses/license.txt -COPY --from=builder /usr/src/kubearmor-relay-server/relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server +COPY --from=builder /usr/src/kubearmor-relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server USER default diff --git a/relay-server/common/utils.go b/relay-server/common/utils.go new file mode 100644 index 0000000..2baacd5 --- /dev/null +++ b/relay-server/common/utils.go @@ -0,0 +1,25 @@ +package common + +import ( + "strings" +) + +func Extractdata(body string) map[string]string { + + pairs := strings.Split(body, " ") + + // Initialize a map to store extracted values + dataMap := make(map[string]string) + + // Loop through each key-value pair + for _, pair := range pairs { + // Split each pair by '=' to separate key and value + parts := strings.Split(pair, "=") + if len(parts) == 2 { + key := parts[0] + value := parts[1] + dataMap[key] = value + } + } + return dataMap +} diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 4fa0bdd..2be13a8 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -16,6 +16,10 @@ import ( "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + + kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" + + // kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" "github.com/kubearmor/kubearmor-relay-server/relay-server/server" ) @@ -33,6 +37,8 @@ type ElasticsearchClient struct { bulkIndexer esutil.BulkIndexer ctx context.Context alertCh chan interface{} + logCh chan interface{} + // client *kif.Client } // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL @@ -69,8 +75,26 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error log.Fatalf("Error creating the indexer: %s", err) } alertCh := make(chan interface{}, 10000) + logCh := make(chan interface{}, 10000) kaClient := server.NewClient(Endpoint) - return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil + + // k8sClient := kif.GetK8sClient() + // cc := &kif.ClusterCache{ + // + // mu: &sync.RWMutex{}, + // + // ipPodCache: make(map[string]PodServiceInfo), + // } + // client := &kif.Client{ + // k8sClient: k8sClient, + // ClusterIPCache: cc, + // } + + // client := kif.InitializeClient() + // go kif.StartInformers(client) + + // TODO: remove this informers + return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh, logCh: logCh}, nil } // bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer. @@ -157,6 +181,64 @@ func (ecl *ElasticsearchClient) Start() error { } }() } + + client.WgServer.Go(func() error { + for client.Running { + res, err := client.LogStream.Recv() + if err != nil { + kg.Warnf("Failed to receive an log (%s)", client.Server) + break + } + + if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe { + + resourceMap := kl.Extractdata(res.GetResource()) + remoteIP := resourceMap["remoteip"] + podserviceInfo, found := ecl.kaClient.Ifclient.ClusterIPCache.Get(remoteIP) + + if found { + switch podserviceInfo.Type { + case "POD": + resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) + data := res.GetData() + fmt.Sprintf(" ownertype=pod") + res.Data = data + + res.Resource = resource + // kg.Printf("logData:%s", res.Data) + break + case "SERVICE": + resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) + + data := res.GetData() + fmt.Sprintf(" ownertype=service") + res.Data = data + res.Resource = resource + // kg.Printf("logData:%s", res.Data) + + break + } + } + + } + tel, _ := json.Marshal(res) + fmt.Printf("%s\n", string(tel)) + ecl.logCh <- res + } + return nil + }) + + for i := 0; i < 5; i++ { + go func() { + for { + select { + case log := <-ecl.logCh: + ecl.bulkIndex(log, "log") + case <-ecl.ctx.Done(): + close(ecl.logCh) + return + } + } + }() + } return nil } diff --git a/relay-server/go.mod b/relay-server/go.mod index bbcb47e..521484f 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -1,6 +1,8 @@ module github.com/kubearmor/kubearmor-relay-server/relay-server -go 1.22 +go 1.22.0 + +toolchain go1.22.4 replace ( github.com/kubearmor/kubearmor-relay-server/relay-server => ./ @@ -20,23 +22,24 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 google.golang.org/grpc v1.63.2 - k8s.io/apimachinery v0.29.2 - k8s.io/client-go v0.29.2 + k8s.io/api v0.30.1 + k8s.io/apimachinery v0.30.1 + k8s.io/client-go v0.30.1 ) require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.12.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -47,7 +50,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/gomega v1.30.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -58,23 +60,21 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect - golang.org/x/net v0.24.0 // indirect - golang.org/x/oauth2 v0.18.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/term v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.2 // indirect k8s.io/klog/v2 v2.120.1 // indirect - k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect - k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect + k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect + k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/relay-server/go.sum b/relay-server/go.sum index df5d956..8dfea1e 100644 --- a/relay-server/go.sum +++ b/relay-server/go.sum @@ -8,14 +8,14 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk= -github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= @@ -23,24 +23,22 @@ github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDsl github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 h1:0VpGH+cDhbDtdcweoyCVsF3fhN8kejK6rFe/2FFX2nU= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -74,10 +72,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= -github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= -github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= +github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -112,7 +110,6 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -122,70 +119,52 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f h1:3CW0unweImhOzd5FmYuRsD4Y4oQFKZIjAnKbjV4WIrw= golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= -golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= -golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= -golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -199,18 +178,18 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.29.2 h1:hBC7B9+MU+ptchxEqTNW2DkUosJpp1P+Wn6YncZ474A= -k8s.io/api v0.29.2/go.mod h1:sdIaaKuU7P44aoyyLlikSLayT6Vb7bvJNCX105xZXY0= -k8s.io/apimachinery v0.29.2 h1:EWGpfJ856oj11C52NRCHuU7rFDwxev48z+6DSlGNsV8= -k8s.io/apimachinery v0.29.2/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= -k8s.io/client-go v0.29.2 h1:FEg85el1TeZp+/vYJM7hkDlSTFZ+c5nnK44DJ4FyoRg= -k8s.io/client-go v0.29.2/go.mod h1:knlvFZE58VpqbQpJNbCbctTVXcd35mMyAAwBdpt4jrA= +k8s.io/api v0.30.1 h1:kCm/6mADMdbAxmIh0LBjS54nQBE+U4KmbCfIkF5CpJY= +k8s.io/api v0.30.1/go.mod h1:ddbN2C0+0DIiPntan/bye3SW3PdwLa11/0yqwvuRrJM= +k8s.io/apimachinery v0.30.1 h1:ZQStsEfo4n65yAdlGTfP/uSHMQSoYzU/oeEbkmF7P2U= +k8s.io/apimachinery v0.30.1/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.1 h1:uC/Ir6A3R46wdkgCV3vbLyNOYyCJ8oZnjtJGKfytl/Q= +k8s.io/client-go v0.30.1/go.mod h1:wrAqLNs2trwiCH/wxxmT/x3hKVH9PuV0GGW0oDoHVqc= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= -k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= -k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY= -k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/relay-server/informers/informercache.go b/relay-server/informers/informercache.go new file mode 100644 index 0000000..0181d7f --- /dev/null +++ b/relay-server/informers/informercache.go @@ -0,0 +1,227 @@ +package informers + +import ( + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + "k8s.io/client-go/rest" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type PodServiceInfo struct { + Type string + PodName string + DeploymentName string + ServiceName string + NamespaceName string +} + +type ClusterCache struct { + mu *sync.RWMutex + ipPodCache map[string]PodServiceInfo +} + +func (cc *ClusterCache) Get(IP string) (PodServiceInfo, bool) { + cc.mu.RLock() + defer cc.mu.RUnlock() + value, ok := cc.ipPodCache[IP] + return value, ok + +} +func (cc *ClusterCache) Set(IP string, pi PodServiceInfo) { + cc.mu.Lock() + defer cc.mu.Unlock() + + switch pi.Type { + case "POD": + + kg.Printf("Received IP %s and cached the host %s", IP, pi.PodName) + break + case "SERVICE": + + kg.Printf("Received IP %s and cached the host %s", IP, pi.ServiceName) + break + + } + cc.ipPodCache[IP] = pi + +} +func (cc *ClusterCache) Delete(IP string) { + cc.mu.Lock() + defer cc.mu.Unlock() + delete(cc.ipPodCache, IP) + +} + +type Client struct { + k8sClient *kubernetes.Clientset + ClusterIPCache *ClusterCache +} + +func InitializeClient() *Client { + clusterCache := &ClusterCache{ + mu: &sync.RWMutex{}, + + ipPodCache: make(map[string]PodServiceInfo), + } + return &Client{ + k8sClient: getK8sClient(), + ClusterIPCache: clusterCache, + } +} + +func StartInformers(client *Client) { + + informerFactory := informers.NewSharedInformerFactory(client.k8sClient, time.Minute*10) + kg.Printf("informerFactory created") + + podInformer := informerFactory.Core().V1().Pods().Informer() + + kg.Printf("pod informers created") + + // Set up event handlers for Pods + podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + + pod := obj.(*v1.Pod) + deploymentName := getDeploymentNamefromPod(pod) + podInfo := PodServiceInfo{ + Type: "POD", + PodName: pod.Name, + DeploymentName: deploymentName, + NamespaceName: pod.Namespace, + } + + client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) + + // kg.Printf("POD Added: %s/%s, remoteIP %s\n", pod.Name, deploymentName, pod.Status.PodIP) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + + pod := newObj.(*v1.Pod) + deploymentName := getDeploymentNamefromPod(pod) + podInfo := PodServiceInfo{ + + Type: "POD", + PodName: pod.Name, + DeploymentName: deploymentName, + + NamespaceName: pod.Namespace, + } + + client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) + // kg.Printf("POD Updated: %s/%s, remoteIP %s\n", pod.Name, deploymentName, pod.Status.PodIP) + + }, + DeleteFunc: func(obj interface{}) { + + pod := obj.(*v1.Pod) + + client.ClusterIPCache.Delete(pod.Status.PodIP) + }, + }, + ) + + // Get the Service informer + serviceInformer := informerFactory.Core().V1().Services().Informer() + + // Set up event handlers + serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service := obj.(*v1.Service) + + svcInfo := PodServiceInfo{ + + Type: "SERVICE", + ServiceName: service.Name, + DeploymentName: service.Name, + + NamespaceName: service.Namespace, + } + client.ClusterIPCache.Set(service.Spec.ClusterIP, svcInfo) + + // kg.Printf("Service Added: %s/%s, remoteIP %s\n", service.Namespace, service.Name, service.Spec.ClusterIP) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + service := newObj.(*v1.Service) + + svcInfo := PodServiceInfo{ + + Type: "SERVICE", + ServiceName: service.Name, + DeploymentName: service.Name, + NamespaceName: service.Namespace, + } + client.ClusterIPCache.Set(service.Spec.ClusterIP, svcInfo) + kg.Printf("Service Updated: %s/%s\n", service.Namespace, service.Name) + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*v1.Service) + + client.ClusterIPCache.Delete(service.Spec.ClusterIP) + // kg.Printf("Service Deleted: %s/%s\n", service.Namespace, service.Name) + }, + }, + ) + + // Start the informer + stopCh := make(chan struct{}) + defer close(stopCh) + + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + // Wait for signals to exit + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs +} + +func getK8sClient() *kubernetes.Clientset { + config, err := rest.InClusterConfig() + if err != nil { + kg.Errf("Error creating Kubernetes config: %v\n", err) + return nil + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + kg.Errf("Error creating Kubernetes clientset: %v\n", err) + return nil + } + kg.Printf("Successfully created k8sClientSet") + return clientset +} + +func getDeploymentNamefromPod(pod *v1.Pod) string { + for _, ownerReference := range pod.OwnerReferences { + switch ownerReference.Kind { + case "ReplicaSet": + return getDeploymentNameFromReplicaSetName(ownerReference.Name) + case "Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "ReplicationController": + return ownerReference.Name + } + } + return "" +} + +func getDeploymentNameFromReplicaSetName(replicaSetName string) string { + // Assuming the ReplicaSet name is in the format of `deploymentname-randomsuffix` + // Split the name by the last dash + parts := strings.Split(replicaSetName, "-") + if len(parts) < 2 { + return replicaSetName // If not in the expected format, return the original name + } + return strings.Join(parts[:len(parts)-1], "-") +} diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 6c46030..65d7df6 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -10,6 +10,7 @@ import ( "fmt" "math/rand" "net" + "strings" "sync" "time" @@ -25,6 +26,7 @@ import ( kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config" + kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" ) @@ -296,6 +298,9 @@ type LogClient struct { // logs LogStream pb.LogService_WatchLogsClient + //Informerclient + Ifclient *kif.Client + // wait group WgServer *errgroup.Group @@ -373,6 +378,12 @@ func NewClient(server string) *LogClient { return nil } + // start informers + Informerclient := kif.InitializeClient() + go kif.StartInformers(Informerclient) + + lc.Ifclient = Informerclient + return lc } @@ -540,6 +551,36 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha default: if res, err = lc.LogStream.Recv(); err != nil { errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error()) + + if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe && res.GetOperation() == "Network" { + + resourceMap := kl.Extractdata(res.GetResource()) + remoteIP := resourceMap["remoteip"] + podserviceInfo, found := lc.Ifclient.ClusterIPCache.Get(remoteIP) + + if found { + switch podserviceInfo.Type { + case "POD": + resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) + data := res.GetData() + fmt.Sprintf(" ownertype=pod") + res.Data = data + + res.Resource = resource + // kg.Printf("logData:%s", res.Data) + break + case "SERVICE": + resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) + + data := res.GetData() + fmt.Sprintf(" ownertype=service") + res.Data = data + res.Resource = resource + // kg.Printf("logData:%s", res.Data) + + break + } + } + + } return } @@ -814,6 +855,7 @@ func (rs *RelayServer) GetFeedsFromNodes() { go rs.AddLogFromBuffChan() if K8s.InitK8sClient() { + kg.Print("Initialized the Kubernetes client") for Running { From 47e722bbc574c0419fabbb1dd59d1960e05ec527 Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Thu, 4 Jul 2024 09:35:52 +0530 Subject: [PATCH 3/8] reset adapter.go Signed-off-by: Harisudarsan --- relay-server/Dockerfile | 10 ++++----- relay-server/elasticsearch/adapter.go | 30 ++++++++++++++++++++------- relay-server/server/relayServer.go | 7 ++++--- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/relay-server/Dockerfile b/relay-server/Dockerfile index 2a0a0eb..1037955 100644 --- a/relay-server/Dockerfile +++ b/relay-server/Dockerfile @@ -9,7 +9,7 @@ WORKDIR /usr/src/kubearmor-relay-server COPY . . -RUN make +RUN cd relay-server && make ### Copy executable image @@ -26,14 +26,14 @@ LABEL name="kubearmor-relay-server" \ alerts, and system logs generated by KubeArmor in each node, streamlining log integration with other systems." RUN microdnf -y update && \ - microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \ - microdnf clean all + microdnf -y install --nodocs --setopt=install_weak_deps=0 --setopt=keepcache=0 shadow-utils && \ + microdnf clean all RUN groupadd --gid 1000 default \ - && useradd --uid 1000 --gid default --shell /bin/bash --create-home default + && useradd --uid 1000 --gid default --shell /bin/bash --create-home default COPY LICENSE /licenses/license.txt -COPY --from=builder /usr/src/kubearmor-relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server +COPY --from=builder /usr/src/kubearmor-relay-server/relay-server/kubearmor-relay-server /KubeArmor/kubearmor-relay-server USER default diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 2be13a8..28db4d6 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -17,6 +17,7 @@ import ( "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + pb "github.com/kubearmor/KubeArmor/protobuf" kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" // kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" @@ -36,8 +37,8 @@ type ElasticsearchClient struct { cancel context.CancelFunc bulkIndexer esutil.BulkIndexer ctx context.Context - alertCh chan interface{} - logCh chan interface{} + alertCh chan *pb.Alert + logCh chan *pb.Log // client *kif.Client } @@ -74,8 +75,8 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error if err != nil { log.Fatalf("Error creating the indexer: %s", err) } - alertCh := make(chan interface{}, 10000) - logCh := make(chan interface{}, 10000) + alertCh := make(chan *pb.Alert, 10000) + logCh := make(chan *pb.Log, 10000) kaClient := server.NewClient(Endpoint) // k8sClient := kif.GetK8sClient() @@ -147,7 +148,14 @@ func (ecl *ElasticsearchClient) Start() error { } kg.Printf("Checked the liveness of the gRPC server") - client.WgServer.Go(func() error { + // var wg sync.WaitGroup + + // stop := make(chan struct{}) + // errCh := make(chan error, 1) + + client.WgServer.Add(1) + go func() error { + defer client.WgServer.Done() for client.Running { res, err := client.AlertStream.Recv() if err != nil { @@ -165,8 +173,9 @@ func (ecl *ElasticsearchClient) Start() error { //not able to add it to Log buffer } } + return nil - }) + }() for i := 0; i < 5; i++ { go func() { @@ -181,8 +190,12 @@ func (ecl *ElasticsearchClient) Start() error { } }() } + client.WgServer.Add(1) + go func() error { + + defer client.WgServer.Done() + // client.WatchLogs(&wg, stop, errCh, ecl.logCh) - client.WgServer.Go(func() error { for client.Running { res, err := client.LogStream.Recv() if err != nil { @@ -223,8 +236,9 @@ func (ecl *ElasticsearchClient) Start() error { fmt.Printf("%s\n", string(tel)) ecl.logCh <- res } + return nil - }) + }() for i := 0; i < 5; i++ { go func() { diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 65d7df6..a782a60 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -16,7 +16,6 @@ import ( "github.com/google/uuid" pb "github.com/kubearmor/KubeArmor/protobuf" - "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -302,7 +301,7 @@ type LogClient struct { Ifclient *kif.Client // wait group - WgServer *errgroup.Group + WgServer sync.WaitGroup Context context.Context } @@ -384,6 +383,9 @@ func NewClient(server string) *LogClient { lc.Ifclient = Informerclient + // var g errgroup.Group + lc.WgServer = sync.WaitGroup{} + return lc } @@ -581,7 +583,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha } } - return } select { From 4679b3a3acc3ec6a274d42ddeef54a16a046365e Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Fri, 5 Jul 2024 14:01:35 +0530 Subject: [PATCH 4/8] Use Channels to receive logs and alerts for adapter Signed-off-by: Harisudarsan --- relay-server/elasticsearch/adapter.go | 143 +++----------------------- relay-server/server/k8sHandler.go | 2 + relay-server/server/relayServer.go | 106 +++++++++++-------- 3 files changed, 76 insertions(+), 175 deletions(-) diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 28db4d6..459154f 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -5,22 +5,17 @@ import ( "context" "encoding/json" "fmt" - "log" - "strings" - "sync/atomic" - "time" - "github.com/cenkalti/backoff/v4" "github.com/dustin/go-humanize" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + "log" + "strings" + "sync/atomic" + "time" - pb "github.com/kubearmor/KubeArmor/protobuf" - kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" - - // kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" "github.com/kubearmor/kubearmor-relay-server/relay-server/server" ) @@ -32,14 +27,10 @@ var ( // ElasticsearchClient Structure type ElasticsearchClient struct { - kaClient *server.LogClient esClient *elasticsearch.Client cancel context.CancelFunc bulkIndexer esutil.BulkIndexer ctx context.Context - alertCh chan *pb.Alert - logCh chan *pb.Log - // client *kif.Client } // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL @@ -75,27 +66,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error if err != nil { log.Fatalf("Error creating the indexer: %s", err) } - alertCh := make(chan *pb.Alert, 10000) - logCh := make(chan *pb.Log, 10000) - kaClient := server.NewClient(Endpoint) - - // k8sClient := kif.GetK8sClient() - // cc := &kif.ClusterCache{ - // - // mu: &sync.RWMutex{}, - // - // ipPodCache: make(map[string]PodServiceInfo), - // } - // client := &kif.Client{ - // k8sClient: k8sClient, - // ClusterIPCache: cc, - // } - - // client := kif.InitializeClient() - // go kif.StartInformers(client) - - // TODO: remove this informers - return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh, logCh: logCh}, nil + return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient}, nil } // bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer. @@ -139,115 +110,29 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) { // Additional goroutines consume alert from the alert channel and bulk index them. func (ecl *ElasticsearchClient) Start() error { start = time.Now() - client := ecl.kaClient + // client := ecl.kaClient ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) - // do healthcheck - if ok := client.DoHealthCheck(); !ok { - return fmt.Errorf("failed to check the liveness of the gRPC server") - } - kg.Printf("Checked the liveness of the gRPC server") - - // var wg sync.WaitGroup - - // stop := make(chan struct{}) - // errCh := make(chan error, 1) - - client.WgServer.Add(1) - go func() error { - defer client.WgServer.Done() - for client.Running { - res, err := client.AlertStream.Recv() - if err != nil { - return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err) - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - - select { - case ecl.alertCh <- res: - case <-client.Context.Done(): - // The context is over, stop processing results - return nil - default: - //not able to add it to Log buffer - } - } - - return nil - }() - for i := 0; i < 5; i++ { go func() { for { select { - case alert := <-ecl.alertCh: + case alert := <-server.ESAlertChannel: ecl.bulkIndex(alert, "alert") case <-ecl.ctx.Done(): - close(ecl.alertCh) + close(server.ESAlertChannel) return } } }() - } - client.WgServer.Add(1) - go func() error { - - defer client.WgServer.Done() - // client.WatchLogs(&wg, stop, errCh, ecl.logCh) - - for client.Running { - res, err := client.LogStream.Recv() - if err != nil { - kg.Warnf("Failed to receive an log (%s)", client.Server) - break - } - - if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe { - - resourceMap := kl.Extractdata(res.GetResource()) - remoteIP := resourceMap["remoteip"] - podserviceInfo, found := ecl.kaClient.Ifclient.ClusterIPCache.Get(remoteIP) - - if found { - switch podserviceInfo.Type { - case "POD": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) - data := res.GetData() + fmt.Sprintf(" ownertype=pod") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - break - case "SERVICE": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) - - data := res.GetData() + fmt.Sprintf(" ownertype=service") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - - break - } - } - - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - ecl.logCh <- res - } - - return nil - }() - - for i := 0; i < 5; i++ { go func() { for { select { - case log := <-ecl.logCh: + case log := <-server.ESLogChannel: ecl.bulkIndex(log, "log") case <-ecl.ctx.Done(): - close(ecl.logCh) + close(server.ESLogChannel) return } } @@ -259,14 +144,10 @@ func (ecl *ElasticsearchClient) Start() error { // Stop stops the Elasticsearch client and performs necessary cleanup operations. // It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context. func (ecl *ElasticsearchClient) Stop() error { - logClient := ecl.kaClient - logClient.Running = false + // logClient := ecl.kaClient + server.Running = false time.Sleep(2 * time.Second) - //Destoy KubeArmor Relay Client - if err := logClient.DestroyClient(); err != nil { - return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error()) - } kg.Printf("Destroyed kubearmor relay gRPC client") //Close BulkIndexer diff --git a/relay-server/server/k8sHandler.go b/relay-server/server/k8sHandler.go index da73177..a056ded 100644 --- a/relay-server/server/k8sHandler.go +++ b/relay-server/server/k8sHandler.go @@ -54,6 +54,8 @@ var stdoutlogs = false var stdoutalerts = false var stdoutmsg = false +var enableEsDashboards = os.Getenv("ENABLE_DASHBOARDS") == "true" + // NewK8sHandler Function func NewK8sHandler() *K8sHandler { kh := &K8sHandler{} diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index a782a60..cc8227a 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -297,11 +297,8 @@ type LogClient struct { // logs LogStream pb.LogService_WatchLogsClient - //Informerclient - Ifclient *kif.Client - - // wait group - WgServer sync.WaitGroup + // // wait group + // WgServer sync.WaitGroup Context context.Context } @@ -377,15 +374,6 @@ func NewClient(server string) *LogClient { return nil } - // start informers - Informerclient := kif.InitializeClient() - go kif.StartInformers(Informerclient) - - lc.Ifclient = Informerclient - - // var g errgroup.Group - lc.WgServer = sync.WaitGroup{} - return lc } @@ -523,6 +511,11 @@ func (rs *RelayServer) AddAlertFromBuffChan() { fmt.Printf("%s\n", string(tel)) } AlertLock.RLock() + + if enableEsDashboards { + ESAlertChannel <- (&alert) + } + for uid := range AlertStructs { select { case AlertStructs[uid].Broadcast <- (&alert): @@ -553,36 +546,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha default: if res, err = lc.LogStream.Recv(); err != nil { errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error()) - - if containsKprobe := strings.Contains(res.Data, "kprobe"); containsKprobe && res.GetOperation() == "Network" { - - resourceMap := kl.Extractdata(res.GetResource()) - remoteIP := resourceMap["remoteip"] - podserviceInfo, found := lc.Ifclient.ClusterIPCache.Get(remoteIP) - - if found { - switch podserviceInfo.Type { - case "POD": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) - data := res.GetData() + fmt.Sprintf(" ownertype=pod") - res.Data = data - - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - break - case "SERVICE": - resource := res.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) - - data := res.GetData() + fmt.Sprintf(" ownertype=service") - res.Data = data - res.Resource = resource - // kg.Printf("logData:%s", res.Data) - - break - } - } - - } } select { @@ -608,10 +571,45 @@ func (rs *RelayServer) AddLogFromBuffChan() { if err := kl.Clone(*res, &log); err != nil { kg.Warnf("Failed to clone a log (%v)", *res) } + + if containsKprobe := strings.Contains(log.Data, "kprobe"); containsKprobe && log.GetOperation() == "Network" { + + resourceMap := kl.Extractdata(log.GetResource()) + remoteIP := resourceMap["remoteip"] + podserviceInfo, found := rs.Ifclient.ClusterIPCache.Get(remoteIP) + + if found { + switch podserviceInfo.Type { + case "POD": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) + data := log.GetData() + fmt.Sprintf(" ownertype=pod") + log.Data = data + + log.Resource = resource + kg.Printf("logData:%s", log.Data) + break + case "SERVICE": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) + + data := log.GetData() + fmt.Sprintf(" ownertype=service") + log.Data = data + log.Resource = resource + kg.Printf("logData:%s", log.Data) + + break + } + } + + } + if stdoutlogs { tel, _ := json.Marshal(log) fmt.Printf("%s\n", string(tel)) } + if enableEsDashboards { + ESLogChannel <- (&log) + } + for uid := range LogStructs { select { case LogStructs[uid].Broadcast <- (&log): @@ -651,17 +649,26 @@ type RelayServer struct { // wait group WgServer sync.WaitGroup + + //Informerclient + Ifclient *kif.Client } // LogBufferChannel store incoming data from log stream in buffer var LogBufferChannel chan *pb.Log +// ESLogChannel send logs to ES adapter +var ESLogChannel chan *pb.Log + // MsgBufferChannel store incoming data from Alert stream in buffer var MsgBufferChannel chan *pb.Message // AlertBufferChannel store incoming data from msg stream in buffer var AlertBufferChannel chan *pb.Alert +// ESAlertChannel send alerts to ES adapter +var ESAlertChannel chan *pb.Alert + // NewRelayServer Function func NewRelayServer(port string) *RelayServer { rs := &RelayServer{} @@ -672,6 +679,11 @@ func NewRelayServer(port string) *RelayServer { AlertBufferChannel = make(chan *pb.Alert, 1000) MsgBufferChannel = make(chan *pb.Message, 100) + if enableEsDashboards { + ESLogChannel = make(chan *pb.Log, 10000) + AlertBufferChannel = make(chan *pb.Alert, 1000) + } + // listen to gRPC port listener, err := net.Listen("tcp", ":"+rs.Port) if err != nil { @@ -717,6 +729,12 @@ func NewRelayServer(port string) *RelayServer { LogStructs = make(map[string]LogStruct) LogLock = &sync.RWMutex{} + // start informers + Informerclient := kif.InitializeClient() + go kif.StartInformers(Informerclient) + + rs.Ifclient = Informerclient + // set wait group rs.WgServer = sync.WaitGroup{} From e99d4e8f4ba62afe4fc2eb6c23c8d4d220d3a72e Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Sun, 28 Jul 2024 09:34:14 +0530 Subject: [PATCH 5/8] add error handling Signed-off-by: Harisudarsan --- relay-server/go.mod | 1 - relay-server/go.sum | 2 -- relay-server/informers/informercache.go | 48 ++++++++++++++++--------- relay-server/server/relayServer.go | 2 +- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/relay-server/go.mod b/relay-server/go.mod index 521484f..4a3e55e 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -20,7 +20,6 @@ require ( github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 github.com/spf13/viper v1.18.2 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.7.0 google.golang.org/grpc v1.63.2 k8s.io/api v0.30.1 k8s.io/apimachinery v0.30.1 diff --git a/relay-server/go.sum b/relay-server/go.sum index 8dfea1e..9d08f06 100644 --- a/relay-server/go.sum +++ b/relay-server/go.sum @@ -134,8 +134,6 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/relay-server/informers/informercache.go b/relay-server/informers/informercache.go index 0181d7f..60825dc 100644 --- a/relay-server/informers/informercache.go +++ b/relay-server/informers/informercache.go @@ -92,8 +92,12 @@ func StartInformers(client *Client) { podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in AddFunc: received type %T", obj) + return + } - pod := obj.(*v1.Pod) deploymentName := getDeploymentNamefromPod(pod) podInfo := PodServiceInfo{ Type: "POD", @@ -103,29 +107,30 @@ func StartInformers(client *Client) { } client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) - - // kg.Printf("POD Added: %s/%s, remoteIP %s\n", pod.Name, deploymentName, pod.Status.PodIP) }, UpdateFunc: func(oldObj, newObj interface{}) { + pod, ok := newObj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in UpdateFunc: received type %T", newObj) + return + } - pod := newObj.(*v1.Pod) deploymentName := getDeploymentNamefromPod(pod) podInfo := PodServiceInfo{ - Type: "POD", PodName: pod.Name, DeploymentName: deploymentName, - - NamespaceName: pod.Namespace, + NamespaceName: pod.Namespace, } client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) - // kg.Printf("POD Updated: %s/%s, remoteIP %s\n", pod.Name, deploymentName, pod.Status.PodIP) - }, DeleteFunc: func(obj interface{}) { - - pod := obj.(*v1.Pod) + pod, ok := obj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in DeleteFunc: received type %T", obj) + return + } client.ClusterIPCache.Delete(pod.Status.PodIP) }, @@ -139,7 +144,11 @@ func StartInformers(client *Client) { serviceInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - service := obj.(*v1.Service) + service, ok := obj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in AddFunc: received type %T", obj) + return + } svcInfo := PodServiceInfo{ @@ -154,8 +163,11 @@ func StartInformers(client *Client) { // kg.Printf("Service Added: %s/%s, remoteIP %s\n", service.Namespace, service.Name, service.Spec.ClusterIP) }, UpdateFunc: func(oldObj, newObj interface{}) { - service := newObj.(*v1.Service) - + service, ok := newObj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in UpdateFunc: received type %T", newObj) + return + } svcInfo := PodServiceInfo{ Type: "SERVICE", @@ -164,10 +176,14 @@ func StartInformers(client *Client) { NamespaceName: service.Namespace, } client.ClusterIPCache.Set(service.Spec.ClusterIP, svcInfo) - kg.Printf("Service Updated: %s/%s\n", service.Namespace, service.Name) + // kg.Printf("Service Updated: %s/%s\n", service.Namespace, service.Name) }, DeleteFunc: func(obj interface{}) { - service := obj.(*v1.Service) + service, ok := obj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in DeleteFunc: received type %T", obj) + return + } client.ClusterIPCache.Delete(service.Spec.ClusterIP) // kg.Printf("Service Deleted: %s/%s\n", service.Namespace, service.Name) diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index cc8227a..fecd140 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -681,7 +681,7 @@ func NewRelayServer(port string) *RelayServer { if enableEsDashboards { ESLogChannel = make(chan *pb.Log, 10000) - AlertBufferChannel = make(chan *pb.Alert, 1000) + ESAlertChannel = make(chan *pb.Alert, 1000) } // listen to gRPC port From aec023a84fa4751db40b36a741dfeb82b2952b30 Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Sun, 28 Jul 2024 12:54:21 +0530 Subject: [PATCH 6/8] go.mod Signed-off-by: Harisudarsan --- relay-server/go.mod | 6 +++--- relay-server/go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/relay-server/go.mod b/relay-server/go.mod index 4a3e55e..f99dd6b 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -21,9 +21,9 @@ require ( github.com/spf13/viper v1.18.2 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.63.2 - k8s.io/api v0.30.1 - k8s.io/apimachinery v0.30.1 - k8s.io/client-go v0.30.1 + k8s.io/api v0.30.3 + k8s.io/apimachinery v0.30.3 + k8s.io/client-go v0.30.3 ) require ( diff --git a/relay-server/go.sum b/relay-server/go.sum index 9d08f06..222ff45 100644 --- a/relay-server/go.sum +++ b/relay-server/go.sum @@ -176,12 +176,12 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.30.1 h1:kCm/6mADMdbAxmIh0LBjS54nQBE+U4KmbCfIkF5CpJY= -k8s.io/api v0.30.1/go.mod h1:ddbN2C0+0DIiPntan/bye3SW3PdwLa11/0yqwvuRrJM= -k8s.io/apimachinery v0.30.1 h1:ZQStsEfo4n65yAdlGTfP/uSHMQSoYzU/oeEbkmF7P2U= -k8s.io/apimachinery v0.30.1/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= -k8s.io/client-go v0.30.1 h1:uC/Ir6A3R46wdkgCV3vbLyNOYyCJ8oZnjtJGKfytl/Q= -k8s.io/client-go v0.30.1/go.mod h1:wrAqLNs2trwiCH/wxxmT/x3hKVH9PuV0GGW0oDoHVqc= +k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= +k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= +k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= +k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= +k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= From a037494ae32951ed0fdad1d4b14c2eb3ad873b61 Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Tue, 30 Jul 2024 14:09:47 +0530 Subject: [PATCH 7/8] degrade go 1.22 to 1.21 and remove toolchain Signed-off-by: Harisudarsan --- relay-server/go.mod | 12 ++++++------ relay-server/go.sum | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/relay-server/go.mod b/relay-server/go.mod index f99dd6b..1d64fac 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -1,8 +1,8 @@ module github.com/kubearmor/kubearmor-relay-server/relay-server -go 1.22.0 +go 1.21.0 -toolchain go1.22.4 +// toolchain go1.21.4 replace ( github.com/kubearmor/kubearmor-relay-server/relay-server => ./ @@ -16,14 +16,14 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/google/uuid v1.6.0 - github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342 + github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271 github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 github.com/spf13/viper v1.18.2 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.63.2 - k8s.io/api v0.30.3 - k8s.io/apimachinery v0.30.3 - k8s.io/client-go v0.30.3 + k8s.io/api v0.29.0 + k8s.io/apimachinery v0.29.0 + k8s.io/client-go v0.29.0 ) require ( diff --git a/relay-server/go.sum b/relay-server/go.sum index 222ff45..53991c2 100644 --- a/relay-server/go.sum +++ b/relay-server/go.sum @@ -55,8 +55,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342 h1:KG/lzLoMvigBelyUdpA/e9Z40jOknPoVzgVqMEYF5i4= -github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342/go.mod h1:48TgzRBV/NrXfD0ASWec9oeC8WY4ikMldPNTDPiE3F4= +github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271 h1:dtYyD1AQ2DPLeslN2JGWjpr6766wm/E1kggPbyLEJek= +github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271/go.mod h1:2gnApJAzS7qaVD/mE88+MoyvNQTv/PwjyJ+QkT6UThE= github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 h1:94/yG9an8JXOz+zNnJwf764WV6YM2jAOjfrIoZrXahk= github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9/go.mod h1:w4r1uqpCe02IbEjm0X43Dv6v4gfyOGgspFrMzapqQ+Q= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -176,12 +176,12 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= -k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= -k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= -k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= -k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= -k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= +k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A= +k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA= +k8s.io/apimachinery v0.29.0 h1:+ACVktwyicPz0oc6MTMLwa2Pw3ouLAfAon1wPLtG48o= +k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis= +k8s.io/client-go v0.29.0 h1:KmlDtFcrdUzOYrBhXHgKw5ycWzc3ryPX5mQe0SkG3y8= +k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= From 1cf3167cb3b3ed53dfa50d119f01797ea484dd47 Mon Sep 17 00:00:00 2001 From: Harisudarsan Date: Tue, 6 Aug 2024 20:05:57 +0530 Subject: [PATCH 8/8] Add error handling for event handlers Signed-off-by: Harisudarsan --- relay-server/informers/informercache.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/relay-server/informers/informercache.go b/relay-server/informers/informercache.go index 60825dc..34e3f71 100644 --- a/relay-server/informers/informercache.go +++ b/relay-server/informers/informercache.go @@ -89,7 +89,7 @@ func StartInformers(client *Client) { kg.Printf("pod informers created") // Set up event handlers for Pods - podInformer.AddEventHandler( + _, err := podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod, ok := obj.(*v1.Pod) @@ -137,11 +137,16 @@ func StartInformers(client *Client) { }, ) + if err != nil { + kg.Errf("Failed to add Event handlers for pods: %v", err) + return + } + // Get the Service informer serviceInformer := informerFactory.Core().V1().Services().Informer() // Set up event handlers - serviceInformer.AddEventHandler( + _, err = serviceInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { service, ok := obj.(*v1.Service) @@ -191,6 +196,11 @@ func StartInformers(client *Client) { }, ) + if err != nil { + kg.Errf("Failed to add Event handlers for services: %v", err) + return + } + // Start the informer stopCh := make(chan struct{}) defer close(stopCh)