diff --git a/relay-server/Makefile b/relay-server/Makefile index 63153bf..de12e11 100644 --- a/relay-server/Makefile +++ b/relay-server/Makefile @@ -48,4 +48,4 @@ ifeq (, $(shell which gosec)) go install github.com/securego/gosec/v2/cmd/gosec@latest;\ } endif - cd $(CURDIR); gosec -exclude=G402 ./... + cd $(CURDIR); gosec -exclude=G402,G304 ./... diff --git a/relay-server/elasticsearch/adapter.go b/relay-server/elasticsearch/adapter.go index e2983ab..49e98c5 100644 --- a/relay-server/elasticsearch/adapter.go +++ b/relay-server/elasticsearch/adapter.go @@ -3,9 +3,12 @@ package elasticsearch import ( "bytes" "context" + "crypto/tls" "encoding/json" "fmt" "log" + "net/http" + "os" "strings" "sync" "sync/atomic" @@ -38,7 +41,8 @@ type ElasticsearchClient struct { // NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL // and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays. // It then creates a new NewBulkIndexer with the esClient -func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*ElasticsearchClient, error) { +func NewElasticsearchClient(esURL string, esUser string, esPassword string, esCaCertPath string, esAllowInsecureTLS bool) (*ElasticsearchClient, error) { + retryBackoff := backoff.NewExponentialBackOff() cfg := elasticsearch.Config{ Addresses: []string{esURL}, @@ -54,6 +58,19 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*El return retryBackoff.NextBackOff() }, MaxRetries: 5, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: esAllowInsecureTLS, + }, + }, + } + + if esCaCertPath != "" { + caCertBytes, err := os.ReadFile(esCaCertPath) + if err != nil { + return nil, fmt.Errorf("failed to open Elasticsearch CA file: %v", err) + } + cfg.CACert = caCertBytes } if len(esUser) != 0 && len(esPassword) != 0 { @@ -68,10 +85,13 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*El bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: esClient, // The Elasticsearch client FlushBytes: 1000000, // The flush threshold in bytes [1mb] - FlushInterval: 30 * time.Second, // The periodic flush interval [30 secs] + FlushInterval: 10 * time.Second, // The periodic flush interval [30 secs] + OnError: func(ctx context.Context, err error) { + log.Fatalf("Error creating the indexer: %v", err) + }, }) if err != nil { - log.Fatalf("Error creating the indexer: %s", err) + log.Fatalf("Error creating the indexer: %v", err) } alertCh := make(chan interface{}, 10000) return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil diff --git a/relay-server/main.go b/relay-server/main.go index 11a39f1..3128e0a 100644 --- a/relay-server/main.go +++ b/relay-server/main.go @@ -59,6 +59,11 @@ func main() { esUrl := os.Getenv("ES_URL") esUser := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") + esCaCertPath := os.Getenv("ES_CA_CERT_PATH") + esAllowInsecureTLS := false + if os.Getenv("ES_ALLOW_INSECURE_TLS") != "" { + esAllowInsecureTLS = true + } esAlertsIndex := os.Getenv("ES_ALERTS_INDEX") if esAlertsIndex == "" { esAlertsIndex = "kubearmor-alerts" @@ -90,9 +95,9 @@ func main() { // check and start an elasticsearch client if enableEsDashboards == "true" { - esCl, err := elasticsearch.NewElasticsearchClient(esUrl, esUser, esPassword) + esCl, err := elasticsearch.NewElasticsearchClient(esUrl, esUser, esPassword, esCaCertPath, esAllowInsecureTLS) if err != nil { - kg.Warnf("Failed to start a Elasticsearch Client") + kg.Warnf("Failed to start a Elasticsearch Client, %v", err) return } relayServer.ELKClient = esCl