From 9ca5a0431b2681218713f07802dd3f859a78fd83 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 4 Jun 2024 17:38:00 +0200 Subject: [PATCH] Update tests --- x-pack/filebeat/input/azureeventhub/config.go | 80 +++--- .../input/azureeventhub/config_test.go | 13 +- .../filebeat/input/azureeventhub/decoder.go | 2 +- .../input/azureeventhub/decoder_test.go | 106 ++++++++ x-pack/filebeat/input/azureeventhub/input.go | 233 +----------------- .../input/azureeventhub/input_test.go | 135 +--------- .../input/azureeventhub/metrics_test.go | 7 +- .../input/azureeventhub/sanitization_test.go | 35 --- .../filebeat/input/azureeventhub/v1_input.go | 38 ++- .../input/azureeventhub/v1_input_test.go | 30 +++ .../filebeat/input/azureeventhub/v2_input.go | 67 +++-- .../input/azureeventhub/v2_migration.go | 221 +++++++++-------- .../activitylogs/config/azure-eventhub.yml | 20 +- .../module/azure/activitylogs/manifest.yml | 12 +- .../azure/auditlogs/config/azure-eventhub.yml | 19 +- .../module/azure/auditlogs/manifest.yml | 13 +- .../platformlogs/config/azure-eventhub.yml | 20 +- .../module/azure/platformlogs/manifest.yml | 13 +- .../signinlogs/config/azure-eventhub.yml | 20 +- .../module/azure/signinlogs/manifest.yml | 13 +- 20 files changed, 490 insertions(+), 607 deletions(-) create mode 100644 x-pack/filebeat/input/azureeventhub/decoder_test.go diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 6d1a03bff448..76c3541ee492 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -16,37 +16,66 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +const ephContainerName = "filebeat" + type azureInputConfig struct { + // EventHubName is the name of the event hub to connect to. + EventHubName string `config:"eventhub" validate:"required"` + // ConnectionString is the connection string to connect to the event hub. ConnectionString string `config:"connection_string" validate:"required"` - EventHubName string `config:"eventhub" validate:"required"` - ConsumerGroup string `config:"consumer_group"` + // ConsumerGroup is the name of the consumer group to use. + ConsumerGroup string `config:"consumer_group"` // Azure Storage container to store leases and checkpoints - SAName string `config:"storage_account" validate:"required"` - SAKey string `config:"storage_account_key"` // (processor v1 only) - SAConnectionString string `config:"storage_account_connection_string"` // (processor v2 only) - SAContainer string `config:"storage_account_container"` + SAName string `config:"storage_account" validate:"required"` + // SAKey is used to connect to the storage account (processor v1 only) + SAKey string `config:"storage_account_key"` + // SAConnectionString is used to connect to the storage account (processor v2 only) + SAConnectionString string `config:"storage_account_connection_string"` + // SAContainer is the name of the storage account container to store + // partition ownership and checkpoint information. + SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` // cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES SanitizeOptions []string `config:"sanitize_options"` - // Processor version to use (v1 or v2). Default is v1 (processor v2 only). - ProcessorVersion string `config:"processor_version"` // Controls if the input should perform the checkpoint information // migration from v1 to v2 (processor v2 only). MigrateCheckpoint bool `config:"migrate_checkpoint"` + // Processor version to use (v1 or v2). Default is v1. + ProcessorVersion string `config:"processor_version"` + // + ProcessorUpdateInterval time.Duration `config:"processor_update_interval"` // Controls the start position for all partitions (processor v2 only). - StartPosition string `config:"start_position"` + ProcessorStartPosition string `config:"processor_start_position"` // Processor receive timeout (processor v2 only). - // Wait up to `ReceiveTimeout` for `ReceiveCount` events, + // Wait up to `PartitionReceiveTimeout` for `PartitionReceiveCount` events, // otherwise returns whatever we collected during that time. - ReceiveTimeout time.Duration `config:"receive_timeout"` + PartitionReceiveTimeout time.Duration `config:"partition_receive_timeout"` // Processor receive count (processor v2 only). - // Wait up to `ReceiveTimeout` for `ReceiveCount` events, + // Wait up to `PartitionReceiveTimeout` for `PartitionReceiveCount` events, // otherwise returns whatever we collected during that time. - ReceiveCount int `config:"receive_count"` + PartitionReceiveCount int `config:"partition_receive_count"` } -const ephContainerName = "filebeat" +func defaultConfig() azureInputConfig { + return azureInputConfig{ + // For this release, we continue to use + // the processor v1 as the default. + ProcessorVersion: processorV1, + // + ProcessorUpdateInterval: 10 * time.Second, + // For backward compatibility with v1, + // the default start position is "earliest". + ProcessorStartPosition: startPositionEarliest, + // Receive timeout and count control how + // many events we want to receive from + // the processor before returning. + PartitionReceiveTimeout: 5 * time.Second, + PartitionReceiveCount: 100, + // Default + SanitizeOptions: []string{}, + } +} // Validate validates the config. func (conf *azureInputConfig) Validate() error { @@ -90,11 +119,6 @@ func (conf *azureInputConfig) Validate() error { } } - if conf.ProcessorVersion == "" { - // The default processor version is "v1". - conf.ProcessorVersion = processorV1 - } - switch conf.ProcessorVersion { case processorV1: if conf.SAKey == "" { @@ -102,7 +126,7 @@ func (conf *azureInputConfig) Validate() error { } case processorV2: if conf.SAKey != "" { - logger.Warnf("storage_account_key is not used in processor v2") + logger.Warnf("storage_account_key is not used in processor v2, please remove it from the configuration (config: storage_account_key)") } if conf.SAConnectionString == "" { return errors.New("no storage account connection string configured (config: storage_account_connection_string)") @@ -111,22 +135,6 @@ func (conf *azureInputConfig) Validate() error { return fmt.Errorf("invalid azure-eventhub processor version: %s (available versions: v1, v2)", conf.ProcessorVersion) } - if conf.StartPosition == "" { - // For backward compatibility with v1, - // the default start position is "earliest". - conf.StartPosition = startPositionEarliest - } - - // Default receive timeout and count. - if conf.ReceiveTimeout == 0 { - // The default receive timeout is 5 second. - conf.ReceiveTimeout = 5 * time.Second - } - if conf.ReceiveCount == 0 { - // The default receive count is 100. - conf.ReceiveCount = 100 - } - return nil } diff --git a/x-pack/filebeat/input/azureeventhub/config_test.go b/x-pack/filebeat/input/azureeventhub/config_test.go index 047a4ee56265..321cf9d1ebcc 100644 --- a/x-pack/filebeat/input/azureeventhub/config_test.go +++ b/x-pack/filebeat/input/azureeventhub/config_test.go @@ -36,13 +36,12 @@ func TestStorageContainerValidate(t *testing.T) { func TestValidate(t *testing.T) { t.Run("Sanitize storage account containers with underscores", func(t *testing.T) { - config := azureInputConfig{ - ConnectionString: "sb://test-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SECRET", - EventHubName: "event_hub_00", - SAName: "teststorageaccount", - SAKey: "secret", - SAContainer: "filebeat-activitylogs-event_hub_00", - } + config := defaultConfig() + config.ConnectionString = "sb://test-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SECRET" + config.EventHubName = "event_hub_00" + config.SAName = "teststorageaccount" + config.SAKey = "secret" + config.SAContainer = "filebeat-activitylogs-event_hub_00" if err := config.Validate(); err != nil { t.Fatalf("unexpected validation error: %v", err) diff --git a/x-pack/filebeat/input/azureeventhub/decoder.go b/x-pack/filebeat/input/azureeventhub/decoder.go index 6b9c40f5c8ac..49f6ca3f6480 100644 --- a/x-pack/filebeat/input/azureeventhub/decoder.go +++ b/x-pack/filebeat/input/azureeventhub/decoder.go @@ -14,7 +14,7 @@ import ( ) type messageDecoder struct { - config *azureInputConfig + config azureInputConfig log *logp.Logger metrics *inputMetrics } diff --git a/x-pack/filebeat/input/azureeventhub/decoder_test.go b/x-pack/filebeat/input/azureeventhub/decoder_test.go new file mode 100644 index 000000000000..7f2613493a82 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/decoder_test.go @@ -0,0 +1,106 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import ( + "fmt" + "testing" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/stretchr/testify/assert" +) + +func TestDecodeRecords(t *testing.T) { + config := defaultConfig() + log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) + reg := monitoring.NewRegistry() + + decoder := messageDecoder{ + config: config, + log: log, + metrics: newInputMetrics("test", reg), + } + + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + } + + t.Run("Decode multiple records", func(t *testing.T) { + msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" + + messages := decoder.Decode([]byte(msg)) + + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + }) + + t.Run("Decode array of events", func(t *testing.T) { + msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" + + messages := decoder.Decode([]byte(msg1)) + + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + }) + + t.Run("Decode one event only", func(t *testing.T) { + msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" + + messages := decoder.Decode([]byte(msg2)) + + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 1) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + }) +} + +func TestDecodeRecordsWithSanitization(t *testing.T) { + config := defaultConfig() + config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"} + log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg) + defer metrics.Close() + + decoder := messageDecoder{ + config: config, + log: log, + metrics: metrics, + } + + msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + } + + messages := decoder.Decode([]byte(msg)) + + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 0d5e7749b6e1..b7b624104155 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -8,14 +8,10 @@ package azureeventhub import ( "fmt" - "strings" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/go-autorest/autorest/azure" v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -62,7 +58,7 @@ func (m *eventHubInputManager) Init(unison.Group) error { // Create creates a new azure-eventhub input based on the configuration. func (m *eventHubInputManager) Create(cfg *conf.C) (v2.Input, error) { - var config azureInputConfig + config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("reading %s input config: %w", inputName, err) } @@ -75,231 +71,4 @@ func (m *eventHubInputManager) Create(cfg *conf.C) (v2.Input, error) { default: return nil, fmt.Errorf("invalid azure-eventhub processor version: %s (available versions: v1, v2)", config.ProcessorVersion) } - - //return &azureInput{ - // config: config, - // log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)), - //}, nil -} - -// func init() { -// err := input.Register(inputName, NewInput) -// if err != nil { -// panic(fmt.Errorf("failed to register %v input: %w", inputName, err)) -// } -// } - -// // configID computes a unique ID for the input configuration. -// // -// // It is used to identify the input in the registry and to detect -// // changes in the configuration. -// // -// // We will remove this function as we upgrade the input to the -// // v2 API (there is an ID in the v2 context). -// func configID(config *conf.C) (string, error) { -// var tmp struct { -// ID string `config:"id"` -// } -// if err := config.Unpack(&tmp); err != nil { -// return "", fmt.Errorf("error extracting ID: %w", err) -// } -// if tmp.ID != "" { -// return tmp.ID, nil -// } - -// var h map[string]interface{} -// _ = config.Unpack(&h) -// id, err := hashstructure.Hash(h, nil) -// if err != nil { -// return "", fmt.Errorf("can not compute ID from configuration: %w", err) -// } - -// return fmt.Sprintf("%16X", id), nil -// } - -//// azureInput struct for the azure-eventhub input -//type azureInput struct { -// config azureInputConfig // azure-eventhub configuration -// context input.Context -// outlet channel.Outleter -// log *logp.Logger // logging info and error messages -// workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits. -// workerCancel context.CancelFunc // used to signal that the worker should stop. -// workerOnce sync.Once // guarantees that the worker goroutine is only started once. -// processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option -// id string // ID of the input; used to identify the input in the input metrics registry only, and will be removed once the input is migrated to v2. -// metrics *inputMetrics // Metrics for the input. -//} - -// // NewInput creates a new azure-eventhub input -// func NewInput( -// cfg *conf.C, -// connector channel.Connector, -// inputContext input.Context, -// ) (input.Input, error) { -// var config azureInputConfig -// if err := cfg.Unpack(&config); err != nil { -// return nil, fmt.Errorf("reading %s input config: %w", inputName, err) -// } - -// // Since this is a v1 input, we need to set the ID manually. -// // -// // We need an ID to identify the input in the input metrics -// // registry. -// // -// // This is a temporary workaround until we migrate the input to v2. -// inputId, err := configID(cfg) -// if err != nil { -// return nil, err -// } - -// inputCtx, cancelInputCtx := context.WithCancel(context.Background()) -// go func() { -// defer cancelInputCtx() -// select { -// case <-inputContext.Done: -// case <-inputCtx.Done(): -// } -// }() - -// // If the input ever needs to be made restartable, then context would need -// // to be recreated with each restart. -// workerCtx, workerCancel := context.WithCancel(inputCtx) - -// in := azureInput{ -// id: inputId, -// config: config, -// log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)), -// context: inputContext, -// workerCtx: workerCtx, -// workerCancel: workerCancel, -// } -// out, err := connector.Connect(cfg) -// if err != nil { -// return nil, err -// } -// in.outlet = out -// in.log.Infof("Initialized %s input.", inputName) - -// return &in, nil -// } -// -//func (a *azureInput) Name() string { -// return inputName -//} -// -//func (a *azureInput) Test(v2.TestContext) error { -// return nil -//} -// -//// Run starts the `azure-eventhub` input and then returns. -//// -//// The first invocation will start an input worker. All subsequent -//// invocations will be no-ops. -//// -//// The input worker will continue fetching data from the event hub until -//// the input Runner calls the `Stop()` method. -//func (a *azureInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { -// ctx := v2.GoContextFromCanceler(inputContext.Cancelation) -// -// // `Run` is invoked periodically by the input Runner. The `sync.Once` -// // guarantees that we only start the worker once during the first -// // invocation. -// // a.workerOnce.Do(func() { -// a.log.Infof("%s input worker is starting.", inputName) -// -// // We set up the metrics in the `Run()` method and tear them down -// // in the `Stop()` method. -// // -// // The factory method `NewInput` is not a viable solution because -// // the Runner invokes it during the configuration check without -// // calling the `Stop()` function; this causes panics -// // due to multiple metrics registrations. -// a.metrics = newInputMetrics(inputContext.ID, nil) -// -// err := a.runWithEPH() -// if err != nil { -// a.log.Errorw("error starting the input worker", "error", err) -// return err -// } -// a.log.Infof("%s input worker has started.", inputName) -// // }) -// -// for { -// select { -// case <-ctx.Done(): -// a.log.Infof("%s input worker is stopping.", inputName) -// if a.processor != nil { -// // Tells the processor to stop processing events and release all -// // resources (like scheduler, leaser, checkpointer, and pipelineClient). -// err := a.processor.Close(context.Background()) -// if err != nil { -// a.log.Errorw("error while closing eventhostprocessor", "error", err) -// } -// } -// -// if a.metrics != nil { -// a.metrics.Close() -// } -// -// // a.workerCancel() // FIXME: is this needed? -// a.log.Infof("%s input worker has stopped.", inputName) -// } -// -// break -// } -// -// return nil -//} -// -//// // Stop stops `azure-eventhub` input. -//// func (a *azureInput) Stop() { -//// a.log.Infof("%s input worker is stopping.", inputName) -//// if a.processor != nil { -//// // Tells the processor to stop processing events and release all -//// // resources (like scheduler, leaser, checkpointer, and pipelineClient). -//// err := a.processor.Close(context.Background()) -//// if err != nil { -//// a.log.Errorw("error while closing eventhostprocessor", "error", err) -//// } -//// } -// -//// if a.metrics != nil { -//// a.metrics.Close() -//// } -// -//// a.workerCancel() -//// a.log.Infof("%s input worker has stopped.", inputName) -//// } -// -//// // Wait stop the current server -//// func (a *azureInput) Wait() { -//// a.Stop() -//// } - -func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { - return pipeline.ConnectWith(beat.ClientConfig{ - EventListener: acker.LastEventPrivateReporter(func(acked int, data interface{}) { - // fmt.Println(acked, data) - }), - Processing: beat.ProcessingConfig{ - // This input only produces events with basic types so normalization - // is not required. - EventNormalization: to.Ptr(false), - }, - }) -} - -// Strip connection string to remove sensitive information -// A connection string should look like this: -// Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly= -// This code will remove everything after ';' so key information is stripped -func stripConnectionString(c string) string { - if parts := strings.SplitN(c, ";", 2); len(parts) == 2 { - return parts[0] - } - - // We actually expect the string to have the documented format - // if we reach here something is wrong, so let's stay on the safe side - return "(redacted)" } diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 270987633580..839e7bb1f110 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -64,7 +64,13 @@ func TestProcessEvents(t *testing.T) { log: log, metrics: metrics, pipelineClient: &fakePipelineClient, + messageDecoder: messageDecoder{ + config: defaultTestConfig, + log: log, + metrics: metrics, + }, } + var sn int64 = 12 now := time.Now() var off int64 = 1234 @@ -83,10 +89,7 @@ func TestProcessEvents(t *testing.T) { Data: []byte(msg), SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") - if !ok { - t.Fatal("OnEvent function returned false") - } + input.processEvents(&ev) assert.Equal(t, len(fakePipelineClient.publishedEvents), 1) message, err := fakePipelineClient.publishedEvents[0].Fields.GetValue("message") @@ -96,58 +99,6 @@ func TestProcessEvents(t *testing.T) { assert.Equal(t, message, single) } -func TestParseMultipleRecords(t *testing.T) { - // records object - msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" - msgs := []string{ - "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - } - - reg := monitoring.NewRegistry() - metrics := newInputMetrics("test", reg) - defer metrics.Close() - - fakePipelineClient := fakeClient{} - - input := eventHubInputV1{ - config: azureInputConfig{}, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), - metrics: metrics, - pipelineClient: &fakePipelineClient, - } - - messages := input.unpackRecords([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } - - // array of events - msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" - messages = input.unpackRecords([]byte(msg1)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } - - // one event only - msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" - messages = input.unpackRecords([]byte(msg2)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 1) - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } -} - //func TestNewInputDone(t *testing.T) { // log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) // config := mapstr.M{ @@ -159,43 +110,6 @@ func TestParseMultipleRecords(t *testing.T) { // inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) //} -func TestStripConnectionString(t *testing.T) { - tests := []struct { - connectionString, expected string - }{ - { - "Endpoint=sb://something", - "(redacted)", - }, - { - "Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly=", - "Endpoint=sb://dummynamespace.servicebus.windows.net/", - }, - { - "Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly=", - "Endpoint=sb://dummynamespace.servicebus.windows.net/", - }, - } - - for _, tt := range tests { - res := stripConnectionString(tt.connectionString) - assert.Equal(t, res, tt.expected) - } -} - -//// fakePipeline returns new fakeClients for simple tests. -//type fakePipeline struct{} -// -//func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) { -// return &fakeClient{}, nil -//} -// -//func (c *fakePipeline) Connect() (beat.Client, error) { -// return &fakeClient{}, nil -//} - -//var _ beat.Client = (*fakeClient)(nil) - // ackClient is a fake beat.Client that ACKs the published messages. type fakeClient struct { sync.Mutex @@ -215,38 +129,3 @@ func (c *fakeClient) PublishAll(event []beat.Event) { c.Publish(e) } } - -// -//type stubOutleter struct { -// sync.Mutex -// cond *sync.Cond -// done bool -// Events []beat.Event -//} -// -//func newStubOutlet(stub *stubOutleter) (channel.Outleter, error) { -// stub.cond = sync.NewCond(stub) -// defer stub.Close() -// -// connector := channel.ConnectorFunc(func(_ *conf.C, _ beat.ClientConfig) (channel.Outleter, error) { -// return stub, nil -// }) -// return connector.ConnectWith(nil, beat.ClientConfig{ -// Processing: beat.ProcessingConfig{}, -// }) -//} -// -//func (o *stubOutleter) Close() error { -// o.Lock() -// defer o.Unlock() -// o.done = true -// return nil -//} -//func (o *stubOutleter) Done() <-chan struct{} { return nil } -//func (o *stubOutleter) OnEvent(event beat.Event) bool { -// o.Lock() -// defer o.Unlock() -// o.Events = append(o.Events, event) -// o.cond.Broadcast() -// return o.done -//} diff --git a/x-pack/filebeat/input/azureeventhub/metrics_test.go b/x-pack/filebeat/input/azureeventhub/metrics_test.go index 1bb3bf9d7e0c..1c4ce0ea987d 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics_test.go +++ b/x-pack/filebeat/input/azureeventhub/metrics_test.go @@ -135,6 +135,11 @@ func TestInputMetricsEventsReceived(t *testing.T) { metrics: metrics, pipelineClient: &fakeClient, log: log, + messageDecoder: messageDecoder{ + config: inputConfig, + metrics: metrics, + log: log, + }, } ev := eventhub.Event{ @@ -142,7 +147,7 @@ func TestInputMetricsEventsReceived(t *testing.T) { SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") + ok := input.processEvents(&ev) if !ok { t.Fatal("OnEvent function returned false") } diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index 6e2645c40d74..3ad8928cdc3c 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -7,46 +7,11 @@ package azureeventhub import ( - "fmt" "testing" "github.com/stretchr/testify/assert" - - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/monitoring" ) -func TestParseMultipleRecordsSanitization(t *testing.T) { - msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + - "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" - msgs := []string{ - "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", - "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", - } - - reg := monitoring.NewRegistry() - metrics := newInputMetrics("test", reg) - defer metrics.Close() - - input := eventHubInputV1{ - config: azureInputConfig{ - SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, - }, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), - metrics: metrics, - pipelineClient: &fakeClient{}, - } - - messages := input.unpackRecords([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) - for _, ms := range messages { - assert.Contains(t, msgs, ms) - } -} - func TestSanitize(t *testing.T) { jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") diff --git a/x-pack/filebeat/input/azureeventhub/v1_input.go b/x-pack/filebeat/input/azureeventhub/v1_input.go index badf997e073e..a3c47f3ccba9 100644 --- a/x-pack/filebeat/input/azureeventhub/v1_input.go +++ b/x-pack/filebeat/input/azureeventhub/v1_input.go @@ -10,8 +10,10 @@ import ( "context" "errors" "fmt" + "strings" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/go-autorest/autorest/azure" eventhub "github.com/Azure/azure-event-hubs-go/v3" @@ -21,6 +23,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -75,11 +78,11 @@ func (in *eventHubInputV1) Run( defer in.metrics.Close() in.messageDecoder = messageDecoder{ - config: &in.config, + config: in.config, log: in.log, metrics: in.metrics, } - + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Initialize everything for this run @@ -157,7 +160,7 @@ func (in *eventHubInputV1) setup(ctx context.Context) error { in.log.Debugw("received event", "ts", time.Now().String()) var onEventErr error // partitionID is not yet mapped in the azure-eventhub sdk - ok := in.processEvents(e, "") + ok := in.processEvents(e) if !ok { onEventErr = errors.New("OnEvent function returned false. Stopping input worker") in.log.Error(onEventErr.Error()) @@ -210,7 +213,7 @@ func (in *eventHubInputV1) run(ctx context.Context) error { return ctx.Err() } -func (in *eventHubInputV1) processEvents(event *eventhub.Event, partitionID string) bool { +func (in *eventHubInputV1) processEvents(event *eventhub.Event) bool { processingStartTime := time.Now() eventHubMetadata := mapstr.M{ // The `partition_id` is not available in the @@ -275,6 +278,33 @@ func (in *eventHubInputV1) processEvents(event *eventhub.Event, partitionID stri return true } +func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { + return pipeline.ConnectWith(beat.ClientConfig{ + EventListener: acker.LastEventPrivateReporter(func(acked int, data interface{}) { + // fmt.Println(acked, data) + }), + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: to.Ptr(false), + }, + }) +} + +// Strip connection string to remove sensitive information +// A connection string should look like this: +// Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly= +// This code will remove everything after ';' so key information is stripped +func stripConnectionString(c string) string { + if parts := strings.SplitN(c, ";", 2); len(parts) == 2 { + return parts[0] + } + + // We actually expect the string to have the documented format + // if we reach here something is wrong, so let's stay on the safe side + return "(redacted)" +} + //// unpackRecords will try to split the message into multiple ones based on the group field provided by the configuration //func (in *eventHubInputV1) unpackRecords(bMessage []byte) []string { // var mapObject map[string][]interface{} diff --git a/x-pack/filebeat/input/azureeventhub/v1_input_test.go b/x-pack/filebeat/input/azureeventhub/v1_input_test.go index f288a5f852fd..cd20ecbfdcc8 100644 --- a/x-pack/filebeat/input/azureeventhub/v1_input_test.go +++ b/x-pack/filebeat/input/azureeventhub/v1_input_test.go @@ -5,3 +5,33 @@ //go:build !aix package azureeventhub + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStripConnectionString(t *testing.T) { + tests := []struct { + connectionString, expected string + }{ + { + "Endpoint=sb://something", + "(redacted)", + }, + { + "Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly=", + "Endpoint=sb://dummynamespace.servicebus.windows.net/", + }, + { + "Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly=", + "Endpoint=sb://dummynamespace.servicebus.windows.net/", + }, + } + + for _, tt := range tests { + res := stripConnectionString(tt.connectionString) + assert.Equal(t, res, tt.expected) + } +} diff --git a/x-pack/filebeat/input/azureeventhub/v2_input.go b/x-pack/filebeat/input/azureeventhub/v2_input.go index bd973c58c136..2bbbba53abf1 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input.go @@ -12,12 +12,13 @@ import ( "fmt" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -29,9 +30,17 @@ import ( ) const ( - startPositionEarliest = "earliest" - startPositionLatest = "latest" - processorRestartBackoff = 10 * time.Second + // startPositionEarliest lets the processor start from the earliest + // available event from the event hub retention period. + startPositionEarliest = "earliest" + // startPositionEarliest lets the processor start from the latest + // available event from the event hub retention period. + startPositionLatest = "latest" + // processorRestartBackoff is the initial backoff time before + // restarting the processor. + processorRestartBackoff = 10 * time.Second + // processorRestartMaxBackoff is the maximum backoff time before + // restarting the processor. processorRestartMaxBackoff = 120 * time.Second ) @@ -104,16 +113,20 @@ func (in *eventHubInputV2) setup(ctx context.Context) error { // Decode the messages from event hub into // a `[]string`. in.messageDecoder = messageDecoder{ - config: &in.config, + config: in.config, log: in.log, metrics: in.metrics, } // FIXME: check more pipelineClient creation options. - blobContainerClient, err := container.NewClientFromConnectionString( + containerClient, err := container.NewClientFromConnectionString( in.config.SAConnectionString, in.config.SAContainer, - nil, + &container.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Cloud: cloud.AzurePublic, + }, + }, ) if err != nil { return fmt.Errorf("failed to create blob container pipelineClient: %w", err) @@ -126,14 +139,14 @@ func (in *eventHubInputV2) setup(ctx context.Context) error { // "the container must exist before the checkpoint store can be used." // // We need to ensure it exists before we can use it. - err = in.ensureContainerExists(ctx, blobContainerClient) + err = in.ensureContainerExists(ctx, containerClient) if err != nil { return fmt.Errorf("failed to ensure blob container exists: %w", err) } // The checkpoint store is used to store the checkpoint information // in the blob container. - checkpointStore, err := checkpoints.NewBlobStore(blobContainerClient, nil) + checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil) if err != nil { return fmt.Errorf("failed to create checkpoint store: %w", err) } @@ -156,7 +169,7 @@ func (in *eventHubInputV2) setup(ctx context.Context) error { in.migrationAssistant = newMigrationAssistant( in.log, consumerClient, - blobContainerClient, + containerClient, checkpointStore, ) @@ -245,13 +258,6 @@ func (in *eventHubInputV2) run(ctx context.Context) { // createProcessorOptions creates the processor options using the input configuration. func createProcessorOptions(config azureInputConfig) *azeventhubs.ProcessorOptions { - // LoadBalancingStrategy offers multiple options: - // - // - Balanced - // - Greedy - // - // As of now, we only support Balanced. - loadBalancingStrategy := azeventhubs.ProcessorStrategyBalanced // Start position offers multiple options: // @@ -266,7 +272,7 @@ func createProcessorOptions(config azureInputConfig) *azeventhubs.ProcessorOptio // available from the storage account container. defaultStartPosition := azeventhubs.StartPosition{} - switch config.StartPosition { + switch config.ProcessorStartPosition { case startPositionEarliest: defaultStartPosition.Earliest = to.Ptr(true) case startPositionLatest: @@ -274,7 +280,22 @@ func createProcessorOptions(config azureInputConfig) *azeventhubs.ProcessorOptio } return &azeventhubs.ProcessorOptions{ - LoadBalancingStrategy: loadBalancingStrategy, + // + // The `LoadBalancingStrategy` controls how the + // processor distributes the partitions across the + // consumers. + // + // LoadBalancingStrategy offers multiple options: + // + // - Balanced + // - Greedy + // + // As of now, we only support the "balanced" load + // balancing strategy for retro compatibility with + // the old SDK. + // + LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced, + UpdateInterval: config.ProcessorUpdateInterval, StartPositions: azeventhubs.StartPositions{ Default: defaultStartPosition, }, @@ -388,10 +409,10 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit // 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint. for { - // Wait up to `in.config.ReceiveTimeout` for `in.config.ReceiveCount` events, + // Wait up to `in.config.PartitionReceiveTimeout` for `in.config.PartitionReceiveCount` events, // otherwise returns whatever we collected during that time. - receiveCtx, cancelReceive := context.WithTimeout(ctx, in.config.ReceiveTimeout) - events, err := partitionClient.ReceiveEvents(receiveCtx, in.config.ReceiveCount, nil) + receiveCtx, cancelReceive := context.WithTimeout(ctx, in.config.PartitionReceiveTimeout) + events, err := partitionClient.ReceiveEvents(receiveCtx, in.config.PartitionReceiveCount, nil) cancelReceive() if err != nil && !errors.Is(err, context.DeadlineExceeded) { diff --git a/x-pack/filebeat/input/azureeventhub/v2_migration.go b/x-pack/filebeat/input/azureeventhub/v2_migration.go index e59603be015c..755cabc7b83d 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_migration.go +++ b/x-pack/filebeat/input/azureeventhub/v2_migration.go @@ -11,26 +11,41 @@ import ( "encoding/json" "errors" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "net/url" "strconv" "strings" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/elastic/elastic-agent-libs/logp" ) +type consumerClient interface { + GetEventHubProperties(ctx context.Context, options *azeventhubs.GetEventHubPropertiesOptions) (azeventhubs.EventHubProperties, error) +} + +type containerClient interface { + NewBlobClient(blobName string) *blob.Client + NewListBlobsFlatPager(o *container.ListBlobsFlatOptions) *runtime.Pager[container.ListBlobsFlatResponse] +} + +type checkpointer interface { + SetCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.SetCheckpointOptions) error +} + // migrationAssistant assists the input in migrating -// checkpoint data from v1 to v2. +// v1 checkpoint information to v2. type migrationAssistant struct { log *logp.Logger - consumerClient *azeventhubs.ConsumerClient - blobContainerClient *container.Client - checkpointStore *checkpoints.BlobStore + consumerClient consumerClient + blobContainerClient containerClient + checkpointStore checkpointer } -func newMigrationAssistant(log *logp.Logger, consumerClient *azeventhubs.ConsumerClient, blobContainerClient *container.Client, checkpointStore *checkpoints.BlobStore) *migrationAssistant { +// newMigrationAssistant creates a new migration assistant. +func newMigrationAssistant(log *logp.Logger, consumerClient consumerClient, blobContainerClient containerClient, checkpointStore checkpointer) *migrationAssistant { return &migrationAssistant{ log: log, consumerClient: consumerClient, @@ -39,6 +54,8 @@ func newMigrationAssistant(log *logp.Logger, consumerClient *azeventhubs.Consume } } +// checkAndMigrate checks if the v1 checkpoint information for the partitions +// exists and migrates it to v2 if it does. func (m *migrationAssistant) checkAndMigrate(ctx context.Context, eventHubConnectionString, eventHubName, consumerGroup string) error { // Fetching event hub information eventHubProperties, err := m.consumerClient.GetEventHubProperties(ctx, nil) @@ -54,146 +71,132 @@ func (m *migrationAssistant) checkAndMigrate(ctx context.Context, eventHubConnec ) // Parse the connection string to get FQDN. - props, err := parseConnectionString(eventHubConnectionString) + connectionStringInfo, err := parseConnectionString(eventHubConnectionString) if err != nil { return fmt.Errorf("failed to parse connection string: %w", err) } - err = m.checkAndMigratePartition(ctx, eventHubProperties, props, eventHubName, consumerGroup) + blobs, err := m.listBlobs(ctx) if err != nil { - return fmt.Errorf("failed to check and migrate partition: %w", err) + return err } - // blobClient := m.blobContainerClient.NewBlobClient("") - // blobClient.BlobExists(ctx) - - // blobPager := m.blobContainerClient.NewListBlobsFlatPager(nil) - - // for blobPager.More() { - // page, err := blobPager.NextPage(ctx) - // if err != nil { - // return fmt.Errorf("failed to list blobs: %w", err) - // } - - // } - - // Fetching the list of blobs in the container. - - // Search for the checkpoint blobs in the container. - // The blobs are named as ///checkpoint/ - - // blobPager := m.blobContainerClient.NewListBlobsFlatPager(nil) - - // r, err := blobPager.NextPage(ctx) - // if err != nil { - // return fmt.Errorf("failed to list blobs: %w", err) - // } - - // props.FullyQualifiedNamespace - - // // Fetching event hub information - // eventHubProperties, err := m.consumerClient.GetEventHubProperties(ctx, nil) - // if err != nil { - // return fmt.Errorf("failed to get event hub properties: %w", err) - // } - - // // v2 checkpoint information path - // // mbranca-general.servicebus.windows.net/sdh4552/$Default/checkpoint/0 - - // eventHubProperties.PartitionIDs + for _, partitionID := range eventHubProperties.PartitionIDs { + err = m.checkAndMigratePartition(ctx, blobs, partitionID, connectionStringInfo.FullyQualifiedNamespace, eventHubName, consumerGroup) + if err != nil { + return fmt.Errorf("failed to check and migrate partition: %w", err) + } + } return nil } +// checkAndMigratePartition checks if the v1 checkpoint information for the +// `partitionID` partition. func (m *migrationAssistant) checkAndMigratePartition( ctx context.Context, - eventHubProperties azeventhubs.EventHubProperties, - props ConnectionStringProperties, + blobs map[string]bool, + partitionID, + fullyQualifiedNamespace, eventHubName, consumerGroup string) error { - blobs := map[string]bool{} + // v2 checkpoint information path + // mbranca-general.servicebus.windows.net/sdh4552/$Default/checkpoint/0 + blob := fmt.Sprintf("%s/%s/%s/checkpoint/%s", fullyQualifiedNamespace, eventHubName, consumerGroup, partitionID) - c := m.blobContainerClient.NewListBlobsFlatPager(nil) + // Check if v2 checkpoint information exists + if _, ok := blobs[blob]; ok { + m.log.Infow( + "checkpoint v2 information for partition already exists, no migration needed", + "partitionID", partitionID, + ) - for c.More() { - page, err := c.NextPage(ctx) - if err != nil { - return fmt.Errorf("failed to list blobs: %w", err) - } - - for _, blob := range page.Segment.BlobItems { - blobs[*blob.Name] = true - } + return nil } - for _, partitionID := range eventHubProperties.PartitionIDs { - // v2 checkpoint information path - // mbranca-general.servicebus.windows.net/sdh4552/$Default/checkpoint/0 - blob := fmt.Sprintf("%s/%s/%s/checkpoint/%s", props.FullyQualifiedNamespace, eventHubName, consumerGroup, partitionID) - - if _, ok := blobs[blob]; ok { - m.log.Infow( - "checkpoint v2 information for partition already exists, no migration needed", - "partitionID", partitionID, - ) - continue - } + // Check if v1 checkpoint information exists + if _, ok := blobs[partitionID]; !ok { + m.log.Infow( + "checkpoint v1 information for partition doesn't exist, no migration needed", + "partitionID", partitionID, + ) - // try downloading the checkpoint v1 information for the partition - if _, ok := blobs[partitionID]; !ok { - m.log.Infow( - "checkpoint v1 information for partition doesn't exist, no migration needed", - "partitionID", partitionID, - ) - continue - } + return nil + } - // v1 checkpoint information path is the partition ID itself - cln := m.blobContainerClient.NewBlobClient(partitionID) + // Try downloading the checkpoint v1 information for the partition + cln := m.blobContainerClient.NewBlobClient(partitionID) - buff := [4000]byte{} - size, err := cln.DownloadBuffer(ctx, buff[:], nil) - if err != nil { - return fmt.Errorf("failed to download checkpoint v1 information for partition %s: %w", partitionID, err) - } + // 4KB buffer should be enough to read + // the checkpoint v1 information. + buff := [4000]byte{} - m.log.Infow("downloaded checkpoint v1 information for partition", "partitionID", partitionID, "size", size) + size, err := cln.DownloadBuffer(ctx, buff[:], nil) + if err != nil { + return fmt.Errorf("failed to download checkpoint v1 information for partition %s: %w", partitionID, err) + } - var checkpointV1 *LegacyCheckpoint + m.log.Infow( + "downloaded checkpoint v1 information for partition", + "partitionID", partitionID, + "size", size, + ) - if err := json.Unmarshal(buff[0:size], &checkpointV1); err != nil { - return fmt.Errorf("failed to unmarshal checkpoint v1 information for partition %s: %w", partitionID, err) - } + // Unmarshal the checkpoint v1 information + var checkpointV1 *LegacyCheckpoint - // migrate the checkpoint v1 information to v2 - m.log.Infow("migrating checkpoint v1 information to v2", "partitionID", partitionID) + if err := json.Unmarshal(buff[0:size], &checkpointV1); err != nil { + return fmt.Errorf("failed to unmarshal checkpoint v1 information for partition %s: %w", partitionID, err) + } - checkpointV2 := azeventhubs.Checkpoint{ - ConsumerGroup: consumerGroup, - EventHubName: eventHubName, - FullyQualifiedNamespace: props.FullyQualifiedNamespace, - PartitionID: partitionID, - } + // migrate the checkpoint v1 information to v2 + m.log.Infow("migrating checkpoint v1 information to v2", "partitionID", partitionID) - offset, err := strconv.ParseInt(checkpointV1.Checkpoint.Offset, 10, 64) - if err != nil { - return fmt.Errorf("failed to parse offset: %w", err) - } + // Common checkpoint information + checkpointV2 := azeventhubs.Checkpoint{ + ConsumerGroup: consumerGroup, + EventHubName: eventHubName, + FullyQualifiedNamespace: fullyQualifiedNamespace, + PartitionID: partitionID, + } - checkpointV2.Offset = &offset - checkpointV2.SequenceNumber = &checkpointV1.Checkpoint.SequenceNumber + offset, err := strconv.ParseInt(checkpointV1.Checkpoint.Offset, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse offset: %w", err) + } - if err := m.checkpointStore.SetCheckpoint(ctx, checkpointV2, nil); err != nil { - return fmt.Errorf("failed to update checkpoint v2 information for partition %s: %w", partitionID, err) - } + checkpointV2.Offset = &offset + checkpointV2.SequenceNumber = &checkpointV1.Checkpoint.SequenceNumber - m.log.Infow("migrated checkpoint v1 information to v2", "partitionID", partitionID) + // Stores the checkpoint v2 information for the partition + if err := m.checkpointStore.SetCheckpoint(ctx, checkpointV2, nil); err != nil { + return fmt.Errorf("failed to update checkpoint v2 information for partition %s: %w", partitionID, err) } + m.log.Infow("migrated checkpoint v1 information to v2", "partitionID", partitionID) + return nil } +// listBlobs lists all the blobs in the container. +func (m *migrationAssistant) listBlobs(ctx context.Context) (map[string]bool, error) { + blobs := map[string]bool{} + + c := m.blobContainerClient.NewListBlobsFlatPager(nil) + for c.More() { + page, err := c.NextPage(ctx) + if err != nil { + return map[string]bool{}, fmt.Errorf("failed to list blobs: %w", err) + } + + for _, blob := range page.Segment.BlobItems { + blobs[*blob.Name] = true + } + } + return blobs, nil +} + type LegacyCheckpoint struct { PartitionID string `json:"partitionID"` Epoch int `json:"epoch"` diff --git a/x-pack/filebeat/module/azure/activitylogs/config/azure-eventhub.yml b/x-pack/filebeat/module/azure/activitylogs/config/azure-eventhub.yml index b69d473dd9b9..ac3d2d352bb9 100644 --- a/x-pack/filebeat/module/azure/activitylogs/config/azure-eventhub.yml +++ b/x-pack/filebeat/module/azure/activitylogs/config/azure-eventhub.yml @@ -37,16 +37,28 @@ resource_manager_endpoint: {{ .resource_manager_endpoint }} tags: {{.tags | tojson}} publisher_pipeline.disable_host: {{ inList .tags "forwarded" }} +{{ if .migrate_checkpoint }} +migrate_checkpoint: {{ .migrate_checkpoint }} +{{ end }} + {{ if .processor_version }} processor_version: {{ .processor_version }} {{ end }} -{{ if .migrate_checkpoint }} -migrate_checkpoint: {{ .migrate_checkpoint }} +{{ if .processor_update_interval }} +processor_update_interval: {{ .processor_update_interval }} +{{ end }} + +{{ if .processor_start_position }} +processor_start_position: {{ .processor_start_position }} +{{ end }} + +{{ if .partition_receive_timeout }} +partition_receive_timeout: {{ .partition_receive_timeout }} {{ end }} -{{ if .start_position }} -start_position: {{ .start_position }} +{{ if .partition_receive_count }} +partition_receive_count: {{ .partition_receive_count }} {{ end }} processors: diff --git a/x-pack/filebeat/module/azure/activitylogs/manifest.yml b/x-pack/filebeat/module/azure/activitylogs/manifest.yml index 59c1ef9b7291..140b34a42d75 100644 --- a/x-pack/filebeat/module/azure/activitylogs/manifest.yml +++ b/x-pack/filebeat/module/azure/activitylogs/manifest.yml @@ -15,12 +15,18 @@ var: - name: resource_manager_endpoint - name: tags default: [forwarded] - - name: processor_version - default: "v1" - name: migrate_checkpoint default: yes - - name: start_position + - name: processor_version + default: "v1" + - name: processor_update_interval + default: "10s" + - name: processor_start_position default: "earliest" + - name: partition_receive_timeout + default: "5s" + - name: partition_receive_count + default: 100 ingest_pipeline: - ingest/pipeline.yml - ../azure-shared-pipeline.yml diff --git a/x-pack/filebeat/module/azure/auditlogs/config/azure-eventhub.yml b/x-pack/filebeat/module/azure/auditlogs/config/azure-eventhub.yml index 8b6dd0d383f4..9e84a9e6951d 100644 --- a/x-pack/filebeat/module/azure/auditlogs/config/azure-eventhub.yml +++ b/x-pack/filebeat/module/azure/auditlogs/config/azure-eventhub.yml @@ -31,18 +31,29 @@ storage_account_container: filebeat-auditlogs-{{ .eventhub }} resource_manager_endpoint: {{ .resource_manager_endpoint }} {{ end }} +{{ if .migrate_checkpoint }} +migrate_checkpoint: {{ .migrate_checkpoint }} +{{ end }} + {{ if .processor_version }} processor_version: {{ .processor_version }} {{ end }} -{{ if .migrate_checkpoint }} -migrate_checkpoint: {{ .migrate_checkpoint }} +{{ if .processor_update_interval }} +processor_update_interval: {{ .processor_update_interval }} +{{ end }} + +{{ if .processor_start_position }} +processor_start_position: {{ .processor_start_position }} {{ end }} -{{ if .start_position }} -start_position: {{ .start_position }} +{{ if .partition_receive_timeout }} +partition_receive_timeout: {{ .partition_receive_timeout }} {{ end }} +{{ if .partition_receive_count }} +partition_receive_count: {{ .partition_receive_count }} +{{ end }} tags: {{.tags | tojson}} publisher_pipeline.disable_host: {{ inList .tags "forwarded" }} processors: diff --git a/x-pack/filebeat/module/azure/auditlogs/manifest.yml b/x-pack/filebeat/module/azure/auditlogs/manifest.yml index 8da58bfc2529..32cb1719fb55 100644 --- a/x-pack/filebeat/module/azure/auditlogs/manifest.yml +++ b/x-pack/filebeat/module/azure/auditlogs/manifest.yml @@ -15,13 +15,18 @@ var: - name: resource_manager_endpoint - name: tags default: [forwarded] - - name: processor_version - default: "v1" - name: migrate_checkpoint default: yes - - name: start_position + - name: processor_version + default: "v1" + - name: processor_update_interval + default: "10s" + - name: processor_start_position default: "earliest" - + - name: partition_receive_timeout + default: "5s" + - name: partition_receive_count + default: 100 ingest_pipeline: - ingest/pipeline.yml - ../azure-shared-pipeline.yml diff --git a/x-pack/filebeat/module/azure/platformlogs/config/azure-eventhub.yml b/x-pack/filebeat/module/azure/platformlogs/config/azure-eventhub.yml index 6648e40dceff..ee7c2727ffbf 100644 --- a/x-pack/filebeat/module/azure/platformlogs/config/azure-eventhub.yml +++ b/x-pack/filebeat/module/azure/platformlogs/config/azure-eventhub.yml @@ -31,16 +31,28 @@ storage_account_container: filebeat-platformlogs-{{ .eventhub }} resource_manager_endpoint: {{ .resource_manager_endpoint }} {{ end }} +{{ if .migrate_checkpoint }} +migrate_checkpoint: {{ .migrate_checkpoint }} +{{ end }} + {{ if .processor_version }} processor_version: {{ .processor_version }} {{ end }} -{{ if .migrate_checkpoint }} -migrate_checkpoint: {{ .migrate_checkpoint }} +{{ if .processor_update_interval }} +processor_update_interval: {{ .processor_update_interval }} +{{ end }} + +{{ if .processor_start_position }} +processor_start_position: {{ .processor_start_position }} +{{ end }} + +{{ if .partition_receive_timeout }} +partition_receive_timeout: {{ .partition_receive_timeout }} {{ end }} -{{ if .start_position }} -start_position: {{ .start_position }} +{{ if .partition_receive_count }} +partition_receive_count: {{ .partition_receive_count }} {{ end }} tags: {{.tags | tojson}} diff --git a/x-pack/filebeat/module/azure/platformlogs/manifest.yml b/x-pack/filebeat/module/azure/platformlogs/manifest.yml index 36e1f438f015..345fe4cd555c 100644 --- a/x-pack/filebeat/module/azure/platformlogs/manifest.yml +++ b/x-pack/filebeat/module/azure/platformlogs/manifest.yml @@ -14,13 +14,18 @@ var: - name: resource_manager_endpoint - name: tags default: [forwarded] - - name: processor_version - default: "v1" - name: migrate_checkpoint default: yes - - name: start_position + - name: processor_version + default: "v1" + - name: processor_update_interval + default: "10s" + - name: processor_start_position default: "earliest" - + - name: partition_receive_timeout + default: "5s" + - name: partition_receive_count + default: 100 ingest_pipeline: - ingest/pipeline.yml - ../azure-shared-pipeline.yml diff --git a/x-pack/filebeat/module/azure/signinlogs/config/azure-eventhub.yml b/x-pack/filebeat/module/azure/signinlogs/config/azure-eventhub.yml index 6e11a945acc0..02f4bf7421dc 100644 --- a/x-pack/filebeat/module/azure/signinlogs/config/azure-eventhub.yml +++ b/x-pack/filebeat/module/azure/signinlogs/config/azure-eventhub.yml @@ -31,16 +31,28 @@ storage_account_container: filebeat-signinlogs-{{ .eventhub }} resource_manager_endpoint: {{ .resource_manager_endpoint }} {{ end }} +{{ if .migrate_checkpoint }} +migrate_checkpoint: {{ .migrate_checkpoint }} +{{ end }} + {{ if .processor_version }} processor_version: {{ .processor_version }} {{ end }} -{{ if .migrate_checkpoint }} -migrate_checkpoint: {{ .migrate_checkpoint }} +{{ if .processor_update_interval }} +processor_update_interval: {{ .processor_update_interval }} +{{ end }} + +{{ if .processor_start_position }} +processor_start_position: {{ .processor_start_position }} +{{ end }} + +{{ if .partition_receive_timeout }} +partition_receive_timeout: {{ .partition_receive_timeout }} {{ end }} -{{ if .start_position }} -start_position: {{ .start_position }} +{{ if .partition_receive_count }} +partition_receive_count: {{ .partition_receive_count }} {{ end }} tags: {{.tags | tojson}} diff --git a/x-pack/filebeat/module/azure/signinlogs/manifest.yml b/x-pack/filebeat/module/azure/signinlogs/manifest.yml index c64dff0b2079..ec9bd467d975 100644 --- a/x-pack/filebeat/module/azure/signinlogs/manifest.yml +++ b/x-pack/filebeat/module/azure/signinlogs/manifest.yml @@ -15,13 +15,18 @@ var: - name: resource_manager_endpoint - name: tags default: [forwarded] - - name: processor_version - default: "v1" - name: migrate_checkpoint default: yes - - name: start_position + - name: processor_version + default: "v1" + - name: processor_update_interval + default: "10s" + - name: processor_start_position default: "earliest" - + - name: partition_receive_timeout + default: "5s" + - name: partition_receive_count + default: 100 ingest_pipeline: - ingest/pipeline.yml - ../azure-shared-pipeline.yml