diff --git a/relay-server/common/utils.go b/relay-server/common/utils.go new file mode 100644 index 0000000..2baacd5 --- /dev/null +++ b/relay-server/common/utils.go @@ -0,0 +1,25 @@ +package common + +import ( + "strings" +) + +func Extractdata(body string) map[string]string { + + pairs := strings.Split(body, " ") + + // Initialize a map to store extracted values + dataMap := make(map[string]string) + + // Loop through each key-value pair + for _, pair := range pairs { + // Split each pair by '=' to separate key and value + parts := strings.Split(pair, "=") + if len(parts) == 2 { + key := parts[0] + value := parts[1] + dataMap[key] = value + } + } + return dataMap +} diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index 4fa0bdd..459154f 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -5,17 +5,17 @@ import ( "context" "encoding/json" "fmt" - "log" - "strings" - "sync/atomic" - "time" - "github.com/cenkalti/backoff/v4" "github.com/dustin/go-humanize" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/google/uuid" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + "log" + "strings" + "sync/atomic" + "time" + "github.com/kubearmor/kubearmor-relay-server/relay-server/server" ) @@ -27,12 +27,10 @@ var ( // ElasticsearchClient Structure type ElasticsearchClient struct { - kaClient *server.LogClient esClient *elasticsearch.Client cancel context.CancelFunc bulkIndexer esutil.BulkIndexer ctx context.Context - alertCh chan interface{} } // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL @@ -68,9 +66,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error if err != nil { log.Fatalf("Error creating the indexer: %s", err) } - alertCh := make(chan interface{}, 10000) - kaClient := server.NewClient(Endpoint) - return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil + return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient}, nil } // bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer. @@ -114,44 +110,29 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) { // Additional goroutines consume alert from the alert channel and bulk index them. func (ecl *ElasticsearchClient) Start() error { start = time.Now() - client := ecl.kaClient + // client := ecl.kaClient ecl.ctx, ecl.cancel = context.WithCancel(context.Background()) - // do healthcheck - if ok := client.DoHealthCheck(); !ok { - return fmt.Errorf("failed to check the liveness of the gRPC server") - } - kg.Printf("Checked the liveness of the gRPC server") - - client.WgServer.Go(func() error { - for client.Running { - res, err := client.AlertStream.Recv() - if err != nil { - return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err) - } - tel, _ := json.Marshal(res) - fmt.Printf("%s\n", string(tel)) - - select { - case ecl.alertCh <- res: - case <-client.Context.Done(): - // The context is over, stop processing results - return nil - default: - //not able to add it to Log buffer - } - } - return nil - }) - for i := 0; i < 5; i++ { go func() { for { select { - case alert := <-ecl.alertCh: + case alert := <-server.ESAlertChannel: ecl.bulkIndex(alert, "alert") case <-ecl.ctx.Done(): - close(ecl.alertCh) + close(server.ESAlertChannel) + return + } + } + }() + + go func() { + for { + select { + case log := <-server.ESLogChannel: + ecl.bulkIndex(log, "log") + case <-ecl.ctx.Done(): + close(server.ESLogChannel) return } } @@ -163,14 +144,10 @@ func (ecl *ElasticsearchClient) Start() error { // Stop stops the Elasticsearch client and performs necessary cleanup operations. // It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context. func (ecl *ElasticsearchClient) Stop() error { - logClient := ecl.kaClient - logClient.Running = false + // logClient := ecl.kaClient + server.Running = false time.Sleep(2 * time.Second) - //Destoy KubeArmor Relay Client - if err := logClient.DestroyClient(); err != nil { - return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error()) - } kg.Printf("Destroyed kubearmor relay gRPC client") //Close BulkIndexer diff --git a/relay-server/go.mod b/relay-server/go.mod index bbcb47e..1d64fac 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -1,6 +1,8 @@ module github.com/kubearmor/kubearmor-relay-server/relay-server -go 1.22 +go 1.21.0 + +// toolchain go1.21.4 replace ( github.com/kubearmor/kubearmor-relay-server/relay-server => ./ @@ -14,29 +16,29 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/google/uuid v1.6.0 - github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342 + github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271 github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 github.com/spf13/viper v1.18.2 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.7.0 google.golang.org/grpc v1.63.2 - k8s.io/apimachinery v0.29.2 - k8s.io/client-go v0.29.2 + k8s.io/api v0.29.0 + k8s.io/apimachinery v0.29.0 + k8s.io/client-go v0.29.0 ) require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.12.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -47,7 +49,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/gomega v1.30.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -58,23 +59,21 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect - golang.org/x/net v0.24.0 // indirect - golang.org/x/oauth2 v0.18.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/term v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.2 // indirect k8s.io/klog/v2 v2.120.1 // indirect - k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect - k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect + k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect + k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/relay-server/go.sum b/relay-server/go.sum index df5d956..53991c2 100644 --- a/relay-server/go.sum +++ b/relay-server/go.sum @@ -8,14 +8,14 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk= -github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= @@ -23,24 +23,22 @@ github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDsl github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 h1:0VpGH+cDhbDtdcweoyCVsF3fhN8kejK6rFe/2FFX2nU= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -57,8 +55,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342 h1:KG/lzLoMvigBelyUdpA/e9Z40jOknPoVzgVqMEYF5i4= -github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342/go.mod h1:48TgzRBV/NrXfD0ASWec9oeC8WY4ikMldPNTDPiE3F4= +github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271 h1:dtYyD1AQ2DPLeslN2JGWjpr6766wm/E1kggPbyLEJek= +github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240726081102-8129fe08d271/go.mod h1:2gnApJAzS7qaVD/mE88+MoyvNQTv/PwjyJ+QkT6UThE= github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 h1:94/yG9an8JXOz+zNnJwf764WV6YM2jAOjfrIoZrXahk= github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9/go.mod h1:w4r1uqpCe02IbEjm0X43Dv6v4gfyOGgspFrMzapqQ+Q= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -74,10 +72,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= -github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= -github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= +github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -112,7 +110,6 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -122,70 +119,50 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f h1:3CW0unweImhOzd5FmYuRsD4Y4oQFKZIjAnKbjV4WIrw= golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= -golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= -golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= -golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -199,18 +176,18 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.29.2 h1:hBC7B9+MU+ptchxEqTNW2DkUosJpp1P+Wn6YncZ474A= -k8s.io/api v0.29.2/go.mod h1:sdIaaKuU7P44aoyyLlikSLayT6Vb7bvJNCX105xZXY0= -k8s.io/apimachinery v0.29.2 h1:EWGpfJ856oj11C52NRCHuU7rFDwxev48z+6DSlGNsV8= -k8s.io/apimachinery v0.29.2/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= -k8s.io/client-go v0.29.2 h1:FEg85el1TeZp+/vYJM7hkDlSTFZ+c5nnK44DJ4FyoRg= -k8s.io/client-go v0.29.2/go.mod h1:knlvFZE58VpqbQpJNbCbctTVXcd35mMyAAwBdpt4jrA= +k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A= +k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA= +k8s.io/apimachinery v0.29.0 h1:+ACVktwyicPz0oc6MTMLwa2Pw3ouLAfAon1wPLtG48o= +k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis= +k8s.io/client-go v0.29.0 h1:KmlDtFcrdUzOYrBhXHgKw5ycWzc3ryPX5mQe0SkG3y8= +k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= -k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= -k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY= -k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/relay-server/informers/informercache.go b/relay-server/informers/informercache.go new file mode 100644 index 0000000..34e3f71 --- /dev/null +++ b/relay-server/informers/informercache.go @@ -0,0 +1,253 @@ +package informers + +import ( + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" + "k8s.io/client-go/rest" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type PodServiceInfo struct { + Type string + PodName string + DeploymentName string + ServiceName string + NamespaceName string +} + +type ClusterCache struct { + mu *sync.RWMutex + ipPodCache map[string]PodServiceInfo +} + +func (cc *ClusterCache) Get(IP string) (PodServiceInfo, bool) { + cc.mu.RLock() + defer cc.mu.RUnlock() + value, ok := cc.ipPodCache[IP] + return value, ok + +} +func (cc *ClusterCache) Set(IP string, pi PodServiceInfo) { + cc.mu.Lock() + defer cc.mu.Unlock() + + switch pi.Type { + case "POD": + + kg.Printf("Received IP %s and cached the host %s", IP, pi.PodName) + break + case "SERVICE": + + kg.Printf("Received IP %s and cached the host %s", IP, pi.ServiceName) + break + + } + cc.ipPodCache[IP] = pi + +} +func (cc *ClusterCache) Delete(IP string) { + cc.mu.Lock() + defer cc.mu.Unlock() + delete(cc.ipPodCache, IP) + +} + +type Client struct { + k8sClient *kubernetes.Clientset + ClusterIPCache *ClusterCache +} + +func InitializeClient() *Client { + clusterCache := &ClusterCache{ + mu: &sync.RWMutex{}, + + ipPodCache: make(map[string]PodServiceInfo), + } + return &Client{ + k8sClient: getK8sClient(), + ClusterIPCache: clusterCache, + } +} + +func StartInformers(client *Client) { + + informerFactory := informers.NewSharedInformerFactory(client.k8sClient, time.Minute*10) + kg.Printf("informerFactory created") + + podInformer := informerFactory.Core().V1().Pods().Informer() + + kg.Printf("pod informers created") + + // Set up event handlers for Pods + _, err := podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in AddFunc: received type %T", obj) + return + } + + deploymentName := getDeploymentNamefromPod(pod) + podInfo := PodServiceInfo{ + Type: "POD", + PodName: pod.Name, + DeploymentName: deploymentName, + NamespaceName: pod.Namespace, + } + + client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod, ok := newObj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in UpdateFunc: received type %T", newObj) + return + } + + deploymentName := getDeploymentNamefromPod(pod) + podInfo := PodServiceInfo{ + Type: "POD", + PodName: pod.Name, + DeploymentName: deploymentName, + NamespaceName: pod.Namespace, + } + + client.ClusterIPCache.Set(pod.Status.PodIP, podInfo) + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + kg.Errf("Failed to cast object to *v1.Pod in DeleteFunc: received type %T", obj) + return + } + + client.ClusterIPCache.Delete(pod.Status.PodIP) + }, + }, + ) + + if err != nil { + kg.Errf("Failed to add Event handlers for pods: %v", err) + return + } + + // Get the Service informer + serviceInformer := informerFactory.Core().V1().Services().Informer() + + // Set up event handlers + _, err = serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service, ok := obj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in AddFunc: received type %T", obj) + return + } + + svcInfo := PodServiceInfo{ + + Type: "SERVICE", + ServiceName: service.Name, + DeploymentName: service.Name, + + NamespaceName: service.Namespace, + } + client.ClusterIPCache.Set(service.Spec.ClusterIP, svcInfo) + + // kg.Printf("Service Added: %s/%s, remoteIP %s\n", service.Namespace, service.Name, service.Spec.ClusterIP) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + service, ok := newObj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in UpdateFunc: received type %T", newObj) + return + } + svcInfo := PodServiceInfo{ + + Type: "SERVICE", + ServiceName: service.Name, + DeploymentName: service.Name, + NamespaceName: service.Namespace, + } + client.ClusterIPCache.Set(service.Spec.ClusterIP, svcInfo) + // kg.Printf("Service Updated: %s/%s\n", service.Namespace, service.Name) + }, + DeleteFunc: func(obj interface{}) { + service, ok := obj.(*v1.Service) + if !ok { + kg.Errf("Failed to cast object to *v1.Service in DeleteFunc: received type %T", obj) + return + } + + client.ClusterIPCache.Delete(service.Spec.ClusterIP) + // kg.Printf("Service Deleted: %s/%s\n", service.Namespace, service.Name) + }, + }, + ) + + if err != nil { + kg.Errf("Failed to add Event handlers for services: %v", err) + return + } + + // Start the informer + stopCh := make(chan struct{}) + defer close(stopCh) + + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + // Wait for signals to exit + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs +} + +func getK8sClient() *kubernetes.Clientset { + config, err := rest.InClusterConfig() + if err != nil { + kg.Errf("Error creating Kubernetes config: %v\n", err) + return nil + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + kg.Errf("Error creating Kubernetes clientset: %v\n", err) + return nil + } + kg.Printf("Successfully created k8sClientSet") + return clientset +} + +func getDeploymentNamefromPod(pod *v1.Pod) string { + for _, ownerReference := range pod.OwnerReferences { + switch ownerReference.Kind { + case "ReplicaSet": + return getDeploymentNameFromReplicaSetName(ownerReference.Name) + case "Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "ReplicationController": + return ownerReference.Name + } + } + return "" +} + +func getDeploymentNameFromReplicaSetName(replicaSetName string) string { + // Assuming the ReplicaSet name is in the format of `deploymentname-randomsuffix` + // Split the name by the last dash + parts := strings.Split(replicaSetName, "-") + if len(parts) < 2 { + return replicaSetName // If not in the expected format, return the original name + } + return strings.Join(parts[:len(parts)-1], "-") +} diff --git a/relay-server/server/k8sHandler.go b/relay-server/server/k8sHandler.go index da73177..a056ded 100644 --- a/relay-server/server/k8sHandler.go +++ b/relay-server/server/k8sHandler.go @@ -54,6 +54,8 @@ var stdoutlogs = false var stdoutalerts = false var stdoutmsg = false +var enableEsDashboards = os.Getenv("ENABLE_DASHBOARDS") == "true" + // NewK8sHandler Function func NewK8sHandler() *K8sHandler { kh := &K8sHandler{} diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 6c46030..fecd140 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -10,12 +10,12 @@ import ( "fmt" "math/rand" "net" + "strings" "sync" "time" "github.com/google/uuid" pb "github.com/kubearmor/KubeArmor/protobuf" - "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -25,6 +25,7 @@ import ( kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config" + kif "github.com/kubearmor/kubearmor-relay-server/relay-server/informers" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" ) @@ -296,8 +297,8 @@ type LogClient struct { // logs LogStream pb.LogService_WatchLogsClient - // wait group - WgServer *errgroup.Group + // // wait group + // WgServer sync.WaitGroup Context context.Context } @@ -510,6 +511,11 @@ func (rs *RelayServer) AddAlertFromBuffChan() { fmt.Printf("%s\n", string(tel)) } AlertLock.RLock() + + if enableEsDashboards { + ESAlertChannel <- (&alert) + } + for uid := range AlertStructs { select { case AlertStructs[uid].Broadcast <- (&alert): @@ -540,7 +546,6 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha default: if res, err = lc.LogStream.Recv(); err != nil { errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error()) - return } select { @@ -566,10 +571,45 @@ func (rs *RelayServer) AddLogFromBuffChan() { if err := kl.Clone(*res, &log); err != nil { kg.Warnf("Failed to clone a log (%v)", *res) } + + if containsKprobe := strings.Contains(log.Data, "kprobe"); containsKprobe && log.GetOperation() == "Network" { + + resourceMap := kl.Extractdata(log.GetResource()) + remoteIP := resourceMap["remoteip"] + podserviceInfo, found := rs.Ifclient.ClusterIPCache.Get(remoteIP) + + if found { + switch podserviceInfo.Type { + case "POD": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s podname=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.PodName, podserviceInfo.NamespaceName) + data := log.GetData() + fmt.Sprintf(" ownertype=pod") + log.Data = data + + log.Resource = resource + kg.Printf("logData:%s", log.Data) + break + case "SERVICE": + resource := log.GetResource() + fmt.Sprintf(" hostname=%s servicename=%s namespace=%s", podserviceInfo.DeploymentName, podserviceInfo.ServiceName, podserviceInfo.NamespaceName) + + data := log.GetData() + fmt.Sprintf(" ownertype=service") + log.Data = data + log.Resource = resource + kg.Printf("logData:%s", log.Data) + + break + } + } + + } + if stdoutlogs { tel, _ := json.Marshal(log) fmt.Printf("%s\n", string(tel)) } + if enableEsDashboards { + ESLogChannel <- (&log) + } + for uid := range LogStructs { select { case LogStructs[uid].Broadcast <- (&log): @@ -609,17 +649,26 @@ type RelayServer struct { // wait group WgServer sync.WaitGroup + + //Informerclient + Ifclient *kif.Client } // LogBufferChannel store incoming data from log stream in buffer var LogBufferChannel chan *pb.Log +// ESLogChannel send logs to ES adapter +var ESLogChannel chan *pb.Log + // MsgBufferChannel store incoming data from Alert stream in buffer var MsgBufferChannel chan *pb.Message // AlertBufferChannel store incoming data from msg stream in buffer var AlertBufferChannel chan *pb.Alert +// ESAlertChannel send alerts to ES adapter +var ESAlertChannel chan *pb.Alert + // NewRelayServer Function func NewRelayServer(port string) *RelayServer { rs := &RelayServer{} @@ -630,6 +679,11 @@ func NewRelayServer(port string) *RelayServer { AlertBufferChannel = make(chan *pb.Alert, 1000) MsgBufferChannel = make(chan *pb.Message, 100) + if enableEsDashboards { + ESLogChannel = make(chan *pb.Log, 10000) + ESAlertChannel = make(chan *pb.Alert, 1000) + } + // listen to gRPC port listener, err := net.Listen("tcp", ":"+rs.Port) if err != nil { @@ -675,6 +729,12 @@ func NewRelayServer(port string) *RelayServer { LogStructs = make(map[string]LogStruct) LogLock = &sync.RWMutex{} + // start informers + Informerclient := kif.InitializeClient() + go kif.StartInformers(Informerclient) + + rs.Ifclient = Informerclient + // set wait group rs.WgServer = sync.WaitGroup{} @@ -814,6 +874,7 @@ func (rs *RelayServer) GetFeedsFromNodes() { go rs.AddLogFromBuffChan() if K8s.InitK8sClient() { + kg.Print("Initialized the Kubernetes client") for Running {