From 591ddde85f48eee4ab5f1c03be2938cdfe95abe4 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 4 Jun 2024 18:24:01 +0200 Subject: [PATCH] Fix linter complaints and cleanups --- .../input/azureeventhub/decoder_test.go | 3 +- .../filebeat/input/azureeventhub/v1_input.go | 56 +------------------ .../input/azureeventhub/v2_migration.go | 6 +- 3 files changed, 7 insertions(+), 58 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/decoder_test.go b/x-pack/filebeat/input/azureeventhub/decoder_test.go index 7f2613493a82..f1c30651ae5a 100644 --- a/x-pack/filebeat/input/azureeventhub/decoder_test.go +++ b/x-pack/filebeat/input/azureeventhub/decoder_test.go @@ -10,9 +10,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/stretchr/testify/assert" ) func TestDecodeRecords(t *testing.T) { diff --git a/x-pack/filebeat/input/azureeventhub/v1_input.go b/x-pack/filebeat/input/azureeventhub/v1_input.go index a5014fac3054..3a6a21189c5e 100644 --- a/x-pack/filebeat/input/azureeventhub/v1_input.go +++ b/x-pack/filebeat/input/azureeventhub/v1_input.go @@ -232,7 +232,7 @@ func (in *eventHubInputV1) processEvents(event *eventhub.Event) { processingStartTime := time.Now() eventHubMetadata := mapstr.M{ // The `partition_id` is not available in the - // current version of the SDK. + // legacy version of the SDK. "eventhub": in.config.EventHubName, "consumer_group": in.config.ConsumerGroup, } @@ -298,60 +298,6 @@ func stripConnectionString(c string) string { return "(redacted)" } -//// unpackRecords will try to split the message into multiple ones based on the group field provided by the configuration -//func (in *eventHubInputV1) unpackRecords(bMessage []byte) []string { -// var mapObject map[string][]interface{} -// var messages []string -// -// // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. -// // Sanitization occurs if options are available and the message contains an invalid JSON. -// // -// // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps -// if len(in.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { -// bMessage = sanitize(bMessage, in.config.SanitizeOptions...) -// in.metrics.sanitizedMessages.Inc() -// } -// -// // check if the message is a "records" object containing a list of events -// err := json.Unmarshal(bMessage, &mapObject) -// if err == nil { -// if len(mapObject[expandEventListFromField]) > 0 { -// for _, ms := range mapObject[expandEventListFromField] { -// js, err := json.Marshal(ms) -// if err == nil { -// messages = append(messages, string(js)) -// in.metrics.receivedEvents.Inc() -// } else { -// in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) -// } -// } -// } -// } else { -// in.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) -// // in some cases the message is an array -// var arrayObject []interface{} -// err = json.Unmarshal(bMessage, &arrayObject) -// if err != nil { -// // return entire message -// in.log.Debugf("deserializing multiple messages to an array returning error: %s", err) -// in.metrics.decodeErrors.Inc() -// return []string{string(bMessage)} -// } -// -// for _, ms := range arrayObject { -// js, err := json.Marshal(ms) -// if err == nil { -// messages = append(messages, string(js)) -// in.metrics.receivedEvents.Inc() -// } else { -// in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) -// } -// } -// } -// -// return messages -//} - func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { // if no override is set then the azure public cloud is used if overrideResManager == "" || overrideResManager == "" { diff --git a/x-pack/filebeat/input/azureeventhub/v2_migration.go b/x-pack/filebeat/input/azureeventhub/v2_migration.go index 755cabc7b83d..52a91137ccc8 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_migration.go +++ b/x-pack/filebeat/input/azureeventhub/v2_migration.go @@ -11,14 +11,16 @@ import ( "encoding/json" "errors" "fmt" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "net/url" "strconv" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/elastic/elastic-agent-libs/logp" )