Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zmoog committed Jun 4, 2024
1 parent 3ecc81e commit 9ca5a04
Show file tree
Hide file tree
Showing 20 changed files with 490 additions and 607 deletions.
80 changes: 44 additions & 36 deletions x-pack/filebeat/input/azureeventhub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,66 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

const ephContainerName = "filebeat"

type azureInputConfig struct {
// EventHubName is the name of the event hub to connect to.
EventHubName string `config:"eventhub" validate:"required"`
// ConnectionString is the connection string to connect to the event hub.
ConnectionString string `config:"connection_string" validate:"required"`
EventHubName string `config:"eventhub" validate:"required"`
ConsumerGroup string `config:"consumer_group"`
// ConsumerGroup is the name of the consumer group to use.
ConsumerGroup string `config:"consumer_group"`
// Azure Storage container to store leases and checkpoints
SAName string `config:"storage_account" validate:"required"`
SAKey string `config:"storage_account_key"` // (processor v1 only)
SAConnectionString string `config:"storage_account_connection_string"` // (processor v2 only)
SAContainer string `config:"storage_account_container"`
SAName string `config:"storage_account" validate:"required"`
// SAKey is used to connect to the storage account (processor v1 only)
SAKey string `config:"storage_account_key"`
// SAConnectionString is used to connect to the storage account (processor v2 only)
SAConnectionString string `config:"storage_account_connection_string"`
// SAContainer is the name of the storage account container to store
// partition ownership and checkpoint information.
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
// Processor version to use (v1 or v2). Default is v1 (processor v2 only).
ProcessorVersion string `config:"processor_version"`
// Controls if the input should perform the checkpoint information
// migration from v1 to v2 (processor v2 only).
MigrateCheckpoint bool `config:"migrate_checkpoint"`
// Processor version to use (v1 or v2). Default is v1.
ProcessorVersion string `config:"processor_version"`
//
ProcessorUpdateInterval time.Duration `config:"processor_update_interval"`
// Controls the start position for all partitions (processor v2 only).
StartPosition string `config:"start_position"`
ProcessorStartPosition string `config:"processor_start_position"`
// Processor receive timeout (processor v2 only).
// Wait up to `ReceiveTimeout` for `ReceiveCount` events,
// Wait up to `PartitionReceiveTimeout` for `PartitionReceiveCount` events,
// otherwise returns whatever we collected during that time.
ReceiveTimeout time.Duration `config:"receive_timeout"`
PartitionReceiveTimeout time.Duration `config:"partition_receive_timeout"`
// Processor receive count (processor v2 only).
// Wait up to `ReceiveTimeout` for `ReceiveCount` events,
// Wait up to `PartitionReceiveTimeout` for `PartitionReceiveCount` events,
// otherwise returns whatever we collected during that time.
ReceiveCount int `config:"receive_count"`
PartitionReceiveCount int `config:"partition_receive_count"`
}

const ephContainerName = "filebeat"
func defaultConfig() azureInputConfig {
return azureInputConfig{
// For this release, we continue to use
// the processor v1 as the default.
ProcessorVersion: processorV1,
//
ProcessorUpdateInterval: 10 * time.Second,
// For backward compatibility with v1,
// the default start position is "earliest".
ProcessorStartPosition: startPositionEarliest,
// Receive timeout and count control how
// many events we want to receive from
// the processor before returning.
PartitionReceiveTimeout: 5 * time.Second,
PartitionReceiveCount: 100,
// Default
SanitizeOptions: []string{},
}
}

// Validate validates the config.
func (conf *azureInputConfig) Validate() error {
Expand Down Expand Up @@ -90,19 +119,14 @@ func (conf *azureInputConfig) Validate() error {
}
}

if conf.ProcessorVersion == "" {
// The default processor version is "v1".
conf.ProcessorVersion = processorV1
}

switch conf.ProcessorVersion {
case processorV1:
if conf.SAKey == "" {
return errors.New("no storage account key configured (config: storage_account_key)")
}
case processorV2:
if conf.SAKey != "" {
logger.Warnf("storage_account_key is not used in processor v2")
logger.Warnf("storage_account_key is not used in processor v2, please remove it from the configuration (config: storage_account_key)")
}
if conf.SAConnectionString == "" {
return errors.New("no storage account connection string configured (config: storage_account_connection_string)")
Expand All @@ -111,22 +135,6 @@ func (conf *azureInputConfig) Validate() error {
return fmt.Errorf("invalid azure-eventhub processor version: %s (available versions: v1, v2)", conf.ProcessorVersion)
}

if conf.StartPosition == "" {
// For backward compatibility with v1,
// the default start position is "earliest".
conf.StartPosition = startPositionEarliest
}

// Default receive timeout and count.
if conf.ReceiveTimeout == 0 {
// The default receive timeout is 5 second.
conf.ReceiveTimeout = 5 * time.Second
}
if conf.ReceiveCount == 0 {
// The default receive count is 100.
conf.ReceiveCount = 100
}

return nil
}

Expand Down
13 changes: 6 additions & 7 deletions x-pack/filebeat/input/azureeventhub/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ func TestStorageContainerValidate(t *testing.T) {

func TestValidate(t *testing.T) {
t.Run("Sanitize storage account containers with underscores", func(t *testing.T) {
config := azureInputConfig{
ConnectionString: "sb://test-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SECRET",
EventHubName: "event_hub_00",
SAName: "teststorageaccount",
SAKey: "secret",
SAContainer: "filebeat-activitylogs-event_hub_00",
}
config := defaultConfig()
config.ConnectionString = "sb://test-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SECRET"
config.EventHubName = "event_hub_00"
config.SAName = "teststorageaccount"
config.SAKey = "secret"
config.SAContainer = "filebeat-activitylogs-event_hub_00"

if err := config.Validate(); err != nil {
t.Fatalf("unexpected validation error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureeventhub/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type messageDecoder struct {
config *azureInputConfig
config azureInputConfig
log *logp.Logger
metrics *inputMetrics
}
Expand Down
106 changes: 106 additions & 0 deletions x-pack/filebeat/input/azureeventhub/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/stretchr/testify/assert"
)

func TestDecodeRecords(t *testing.T) {
config := defaultConfig()
log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName))
reg := monitoring.NewRegistry()

decoder := messageDecoder{
config: config,
log: log,
metrics: newInputMetrics("test", reg),
}

msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
}

t.Run("Decode multiple records", func(t *testing.T) {
msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"

messages := decoder.Decode([]byte(msg))

assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
})

t.Run("Decode array of events", func(t *testing.T) {
msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]"

messages := decoder.Decode([]byte(msg1))

assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
})

t.Run("Decode one event only", func(t *testing.T) {
msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"

messages := decoder.Decode([]byte(msg2))

assert.NotNil(t, messages)
assert.Equal(t, len(messages), 1)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
})
}

func TestDecodeRecordsWithSanitization(t *testing.T) {
config := defaultConfig()
config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"}
log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName))
reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

decoder := messageDecoder{
config: config,
log: log,
metrics: metrics,
}

msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

messages := decoder.Decode([]byte(msg))

assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}
Loading

0 comments on commit 9ca5a04

Please sign in to comment.