From 42f2d41694106acbdc3d85e4a0f612a3a5a36795 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 15 Oct 2024 07:22:20 -0700 Subject: [PATCH] [Filebeat] [AWS] Add support to source AWS cloudwatch logs from linked accounts (#41188) * use LogGroupIdentifier fiter instead of LogGroupName and related parameter, field renaming Signed-off-by: Kavindu Dodanduwa * configuration parsing to support arn & linked accounts Signed-off-by: Kavindu Dodanduwa * document the ARN usage Signed-off-by: Kavindu Dodanduwa * add changelog entry Signed-off-by: Kavindu Dodanduwa * code review changes Signed-off-by: Kavindu Dodanduwa * code review change - fix typo Signed-off-by: Kavindu Dodanduwa --------- Signed-off-by: Kavindu Dodanduwa Co-authored-by: kaiyan-sheng --- CHANGELOG.next.asciidoc | 1 + .../filebeat.inputs.reference.xpack.yml.tmpl | 2 + .../docs/inputs/input-aws-cloudwatch.asciidoc | 18 +++- x-pack/filebeat/filebeat.reference.yml | 2 + .../input/awscloudwatch/cloudwatch.go | 36 +++---- .../input/awscloudwatch/cloudwatch_test.go | 100 +++++++++--------- x-pack/filebeat/input/awscloudwatch/input.go | 86 +++++++++------ .../input/awscloudwatch/input_test.go | 98 ++++++++++++++++- .../filebeat/input/awscloudwatch/processor.go | 10 +- 9 files changed, 237 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ebd20cb190c..2ba3d43c93f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -325,6 +325,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Improved GCS input documentation. {pull}41143[41143] - Add CSV decoding capacity to azureblobstorage input {pull}40978[40978] - Add CSV decoding capacity to gcs input {pull}40979[40979] +- Add support to source AWS cloudwatch logs from linked accounts. {pull}41188[41188] - Jounrald input now supports filtering by facilities {pull}41061[41061] - System module now supports reading from jounrald. {pull}41061[41061] diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 8215bc3c389..3f131b6dc49 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -135,6 +135,8 @@ #credential_profile_name: test-aws-s3-input # ARN of the log group to collect logs from + # This ARN could refer to a log group from a linked source account + # Note: This property precedes over `log_group_name` & `log_group_name_prefix` #log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*" # Name of the log group to collect logs from. diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index c2b898da358..733f0bac41f 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -40,16 +40,26 @@ The `aws-cloudwatch` input supports the following configuration options plus the [float] ==== `log_group_arn` ARN of the log group to collect logs from. +The ARN may refer to a log group in a linked source account. + +Note: `log_group_arn` cannot be combined with `log_group_name`, `log_group_name_prefix` and `region_name` properties. +If set, values extracted from `log_group_arn` takes precedence over them. + +Note: If the log group is in a linked source account and filebeat is configured to use a monitoring account, you must use the `log_group_arn`. +You can read more about AWS account linking and cross account observability from the https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Unified-Cross-Account.html[official documentation]. [float] ==== `log_group_name` -Name of the log group to collect logs from. Note: `region_name` is required when -log_group_name is given. +Name of the log group to collect logs from. + +Note: `region_name` is required when log_group_name is given. [float] ==== `log_group_name_prefix` -The prefix for a group of log group names. Note: `region_name` is required when -log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix` +The prefix for a group of log group names. + +Note: `region_name` is required when +`log_group_name_prefix` is given. `log_group_name` and `log_group_name_prefix` cannot be given at the same time. The number of workers that will process the log groups under this prefix is set through the `number_of_workers` config. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 749f0e0c291..a2f1daeebb4 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3069,6 +3069,8 @@ filebeat.inputs: #credential_profile_name: test-aws-s3-input # ARN of the log group to collect logs from + # This ARN could refer to a log group from a linked source account + # Note: This property precedes over `log_group_name` & `log_group_name_prefix` #log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*" # Name of the log group to collect logs from. diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ffc5b2e3cd8..4d089268e35 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -37,7 +37,7 @@ type cloudwatchPoller struct { } type workResponse struct { - logGroup string + logGroupId string startTime, endTime time.Time } @@ -64,8 +64,8 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, } } -func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) { - err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) +func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) { + err := p.getLogEventsFromCloudWatch(svc, logGroupId, startTime, endTime, logProcessor) if err != nil { var errRequestCanceled *awssdk.RequestCanceledError if errors.As(err, &errRequestCanceled) { @@ -76,9 +76,9 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) error { // construct FilterLogEventsInput - filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) + filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupId) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) for paginator.HasMorePages() { filterLogEventsOutput, err := paginator.NextPage(context.TODO()) @@ -96,16 +96,16 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) - logProcessor.processLogEvents(logEvents, logGroup, p.region) + logProcessor.processLogEvents(logEvents, logGroupId, p.region) } return nil } -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroupId string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(unixMsFromTime(startTime)), - EndTime: awssdk.Int64(unixMsFromTime(endTime)), + LogGroupIdentifier: awssdk.String(logGroupId), + StartTime: awssdk.Int64(unixMsFromTime(startTime)), + EndTime: awssdk.Int64(unixMsFromTime(endTime)), } if len(p.config.LogStreams) > 0 { @@ -138,9 +138,9 @@ func (p *cloudwatchPoller) startWorkers( work = <-p.workResponseChan } - p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup) - p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor) - p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup) + p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroupId) + p.run(svc, work.logGroupId, work.startTime, work.endTime, logProcessor) + p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroupId) } }() } @@ -149,7 +149,7 @@ func (p *cloudwatchPoller) startWorkers( // receive implements the main run loop that distributes tasks to the worker // goroutines. It accepts a "clock" callback (which on a live input should // equal time.Now) to allow deterministic unit tests. -func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) { +func (p *cloudwatchPoller) receive(ctx context.Context, logGroupIDs []string, clock func() time.Time) { defer p.workerWg.Wait() // startTime and endTime are the bounds of the current scanning interval. // If we're starting at the end of the logs, advance the start time to the @@ -160,15 +160,15 @@ func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, startTime = endTime.Add(-p.config.ScanFrequency) } for ctx.Err() == nil { - for _, lg := range logGroupNames { + for _, lg := range logGroupIDs { select { case <-ctx.Done(): return case <-p.workRequestChan: p.workResponseChan <- workResponse{ - logGroup: lg, - startTime: startTime, - endTime: endTime, + logGroupId: lg, + startTime: startTime, + endTime: endTime, } } } diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go index f666db85982..0c266c8291f 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go @@ -31,7 +31,7 @@ type receiveTestStep struct { type receiveTestCase struct { name string - logGroups []string + logGroupIDs []string configOverrides func(*config) startTime time.Time steps []receiveTestStep @@ -46,37 +46,37 @@ func TestReceive(t *testing.T) { t3 := t2.Add(time.Hour) testCases := []receiveTestCase{ { - name: "Default config with one log group", - logGroups: []string{"a"}, - startTime: t1, + name: "Default config with one log group", + logGroupIDs: []string{"a"}, + startTime: t1, steps: []receiveTestStep{ { expected: []workResponse{ - {logGroup: "a", startTime: t0, endTime: t1}, + {logGroupId: "a", startTime: t0, endTime: t1}, }, nextTime: t2, }, { expected: []workResponse{ - {logGroup: "a", startTime: t1, endTime: t2}, + {logGroupId: "a", startTime: t1, endTime: t2}, }, nextTime: t3, }, { expected: []workResponse{ - {logGroup: "a", startTime: t2, endTime: t3}, + {logGroupId: "a", startTime: t2, endTime: t3}, }, }, }, }, { - name: "Default config with two log groups", - logGroups: []string{"a", "b"}, - startTime: t1, + name: "Default config with two log groups", + logGroupIDs: []string{"a", "b"}, + startTime: t1, steps: []receiveTestStep{ { expected: []workResponse{ - {logGroup: "a", startTime: t0, endTime: t1}, + {logGroupId: "a", startTime: t0, endTime: t1}, }, nextTime: t2, }, @@ -84,49 +84,49 @@ func TestReceive(t *testing.T) { expected: []workResponse{ // start/end times for the second log group should be the same // even though the clock has changed. - {logGroup: "b", startTime: t0, endTime: t1}, + {logGroupId: "b", startTime: t0, endTime: t1}, }, }, { expected: []workResponse{ - {logGroup: "a", startTime: t1, endTime: t2}, - {logGroup: "b", startTime: t1, endTime: t2}, + {logGroupId: "a", startTime: t1, endTime: t2}, + {logGroupId: "b", startTime: t1, endTime: t2}, }, nextTime: t3, }, { expected: []workResponse{ - {logGroup: "a", startTime: t2, endTime: t3}, - {logGroup: "b", startTime: t2, endTime: t3}, + {logGroupId: "a", startTime: t2, endTime: t3}, + {logGroupId: "b", startTime: t2, endTime: t3}, }, }, }, }, { - name: "One log group with start_position: end", - logGroups: []string{"a"}, - startTime: t1, + name: "One log group with start_position: end", + logGroupIDs: []string{"a"}, + startTime: t1, configOverrides: func(c *config) { c.StartPosition = "end" }, steps: []receiveTestStep{ { expected: []workResponse{ - {logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1}, + {logGroupId: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1}, }, nextTime: t2, }, { expected: []workResponse{ - {logGroup: "a", startTime: t1, endTime: t2}, + {logGroupId: "a", startTime: t1, endTime: t2}, }, }, }, }, { - name: "Two log group with start_position: end and latency", - logGroups: []string{"a", "b"}, - startTime: t1, + name: "Two log group with start_position: end and latency", + logGroupIDs: []string{"a", "b"}, + startTime: t1, configOverrides: func(c *config) { c.StartPosition = "end" c.Latency = time.Second @@ -134,40 +134,40 @@ func TestReceive(t *testing.T) { steps: []receiveTestStep{ { expected: []workResponse{ - {logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, - {logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + {logGroupId: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + {logGroupId: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, }, nextTime: t2, }, { expected: []workResponse{ - {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, - {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, }, }, }, }, { - name: "Three log groups with latency", - logGroups: []string{"a", "b", "c"}, - startTime: t1, + name: "Three log groups with latency", + logGroupIDs: []string{"a", "b", "c"}, + startTime: t1, configOverrides: func(c *config) { c.Latency = time.Second }, steps: []receiveTestStep{ { expected: []workResponse{ - {logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)}, - {logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)}, - {logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroupId: "a", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroupId: "b", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroupId: "c", startTime: t0, endTime: t1.Add(-time.Second)}, }, nextTime: t2, }, { expected: []workResponse{ - {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, - {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, - {logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroupId: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, }, }, }, @@ -191,7 +191,7 @@ func TestReceive(t *testing.T) { test.configOverrides(&p.config) } clock.time = test.startTime - go p.receive(ctx, test.logGroups, clock.now) + go p.receive(ctx, test.logGroupIDs, clock.now) for _, step := range test.steps { for i, expected := range step.expected { p.workRequestChan <- struct{}{} @@ -209,34 +209,36 @@ func TestReceive(t *testing.T) { } type filterLogEventsTestCase struct { - name string - logGroup string - startTime time.Time - endTime time.Time - expected *cloudwatchlogs.FilterLogEventsInput + name string + logGroupId string + startTime time.Time + endTime time.Time + expected *cloudwatchlogs.FilterLogEventsInput } func TestFilterLogEventsInput(t *testing.T) { now, _ := time.Parse(time.RFC3339, "2024-07-12T13:00:00+00:00") + id := "myLogGroup" + testCases := []filterLogEventsTestCase{ { - name: "StartPosition: beginning, first iteration", - logGroup: "a", + name: "StartPosition: beginning, first iteration", + logGroupId: id, // The zero value of type time.Time{} is January 1, year 1, 00:00:00.000000000 UTC // Events with a timestamp before the time - January 1, 1970, 00:00:00 UTC are not returned by AWS API // make sure zero value of time.Time{} was converted startTime: time.Time{}, endTime: now, expected: &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String("a"), - StartTime: awssdk.Int64(0), - EndTime: awssdk.Int64(1720789200000), + LogGroupIdentifier: awssdk.String(id), + StartTime: awssdk.Int64(0), + EndTime: awssdk.Int64(1720789200000), }, }, } for _, test := range testCases { p := cloudwatchPoller{} - result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroup) + result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroupId) assert.Equal(t, test.expected, result) } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index d10ae348d94..f66e403a1a9 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -62,25 +62,13 @@ type cloudwatchInput struct { func newInput(config config) (*cloudwatchInput, error) { cfgwarn.Beta("aws-cloudwatch input type is used") + + // perform AWS configuration validation awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } - if config.LogGroupARN != "" { - logGroupName, regionName, err := parseARN(config.LogGroupARN) - if err != nil { - return nil, fmt.Errorf("parse log group ARN failed: %w", err) - } - - config.LogGroupName = logGroupName - config.RegionName = regionName - } - - if config.RegionName != "" { - awsConfig.Region = config.RegionName - } - return &cloudwatchInput{ config: config, awsConfig: awsConfig, @@ -103,15 +91,25 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) } defer client.Close() + var logGroupIDs []string + logGroupIDs, region, err := fromConfig(in.config, in.awsConfig) + if err != nil { + return fmt.Errorf("error processing configurations: %w", err) + } + + in.awsConfig.Region = region svc := cloudwatchlogs.NewFromConfig(in.awsConfig, func(o *cloudwatchlogs.Options) { if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } }) - logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) - if err != nil { - return fmt.Errorf("failed to get log group names: %w", err) + if len(logGroupIDs) == 0 { + // fallback to LogGroupNamePrefix to derive group IDs + logGroupIDs, err = getLogGroupNames(svc, in.config.LogGroupNamePrefix) + if err != nil { + return fmt.Errorf("failed to get log group names from LogGroupNamePrefix: %w", err) + } } log := inputContext.Logger @@ -120,36 +118,54 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) cwPoller := newCloudwatchPoller( log.Named("cloudwatch_poller"), in.metrics, - in.awsConfig.Region, + region, in.config) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) - cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) + cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupIDs))) cwPoller.startWorkers(ctx, svc, logProcessor) - cwPoller.receive(ctx, logGroupNames, time.Now) + cwPoller.receive(ctx, logGroupIDs, time.Now) return nil } -func parseARN(logGroupARN string) (string, string, error) { - arnParsed, err := arn.Parse(logGroupARN) - if err != nil { - return "", "", fmt.Errorf("error Parse arn %s: %w", logGroupARN, err) - } +// fromConfig is a helper to parse input configurations and derive logGroupIDs & aws region +// Returned logGroupIDs could be empty, which require other fallback mechanisms to derive them. +// See getLogGroupNames for example. +func fromConfig(cfg config, awsCfg awssdk.Config) (logGroupIDs []string, region string, err error) { + // LogGroupARN has precedence over LogGroupName & RegionName + if cfg.LogGroupARN != "" { + parsedArn, err := arn.Parse(cfg.LogGroupARN) + if err != nil { + return nil, "", fmt.Errorf("failed to parse log group ARN: %w", err) + } - if strings.Contains(arnParsed.Resource, ":") { - resourceARNSplit := strings.Split(arnParsed.Resource, ":") - if len(resourceARNSplit) >= 2 && resourceARNSplit[0] == "log-group" { - return resourceARNSplit[1], arnParsed.Region, nil + if parsedArn.Region == "" { + return nil, "", fmt.Errorf("failed to parse log group ARN: missing region") } + + // refine to match AWS API parameter regex of logGroupIdentifier + groupId := strings.TrimSuffix(cfg.LogGroupARN, ":*") + logGroupIDs = append(logGroupIDs, groupId) + + return logGroupIDs, parsedArn.Region, nil } - return "", "", fmt.Errorf("cannot get log group name from log group ARN: %s", logGroupARN) -} -// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names -func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) { - if logGroupNamePrefix == "" { - return []string{logGroupName}, nil + // then fallback to LogrGroupName + if cfg.LogGroupName != "" { + logGroupIDs = append(logGroupIDs, cfg.LogGroupName) + } + + // finally derive region + if cfg.RegionName != "" { + region = cfg.RegionName + } else { + region = awsCfg.Region } + return logGroupIDs, region, nil +} + +// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names +func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string) ([]string, error) { // construct DescribeLogGroupsInput describeLogGroupsInput := &cloudwatchlogs.DescribeLogGroupsInput{ LogGroupNamePrefix: awssdk.String(logGroupNamePrefix), diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index 25ecc18ea57..4d8c6e84e2b 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -50,9 +50,97 @@ func TestCreateEvent(t *testing.T) { assert.Equal(t, expectedEventFields, event.Fields) } -func TestParseARN(t *testing.T) { - logGroup, regionName, err := parseARN("arn:aws:logs:us-east-1:428152502467:log-group:test:*") - assert.Equal(t, "test", logGroup) - assert.Equal(t, "us-east-1", regionName) - assert.NoError(t, err) +func Test_FromConfig(t *testing.T) { + tests := []struct { + name string + cfg config + awsCfg awssdk.Config + expectGroups []string + expectRegion string + isError bool + }{ + { + name: "Valid log group ARN", + cfg: config{ + LogGroupARN: "arn:aws:logs:us-east-1:123456789012:myLogs", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectGroups: []string{"arn:aws:logs:us-east-1:123456789012:myLogs"}, + expectRegion: "us-east-1", + isError: false, + }, + { + name: "Invalid ARN results in an error", + cfg: config{ + LogGroupARN: "invalidARN", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectRegion: "", + isError: true, + }, + { + name: "Valid log group ARN but empty region cause error", + cfg: config{ + LogGroupARN: "arn:aws:logs::123456789012:otherLogs", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectRegion: "", + isError: true, + }, + { + name: "ARN suffix trimming to match logGroupIdentifier requirement", + cfg: config{ + LogGroupARN: "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesisfirehose/ProjectA:*", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectGroups: []string{"arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesisfirehose/ProjectA"}, + expectRegion: "us-east-1", + isError: false, + }, + { + name: "LogGroupName only", + cfg: config{ + LogGroupName: "myLogGroup", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectGroups: []string{"myLogGroup"}, + expectRegion: "us-east-1", + isError: false, + }, + { + name: "LogGroupName and region override", + cfg: config{ + LogGroupName: "myLogGroup", + RegionName: "sa-east-1", + }, + awsCfg: awssdk.Config{ + Region: "us-east-1", + }, + expectGroups: []string{"myLogGroup"}, + expectRegion: "sa-east-1", + isError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + groups, region, err := fromConfig(tt.cfg, tt.awsCfg) + if tt.isError { + assert.Error(t, err) + } + + assert.Equal(t, tt.expectGroups, groups) + assert.Equal(t, tt.expectRegion, region) + }) + } } diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 818ba85d57e..c0be3692163 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -32,22 +32,22 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli } } -func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) { +func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroupId string, regionName string) { for _, logEvent := range logEvents { - event := createEvent(logEvent, logGroup, regionName) + event := createEvent(logEvent, logGroupId, regionName) p.metrics.cloudwatchEventsCreatedTotal.Inc() p.publisher.Publish(event) } } -func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event { +func createEvent(logEvent types.FilteredLogEvent, logGroupId string, regionName string) beat.Event { event := beat.Event{ Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), Fields: mapstr.M{ "message": *logEvent.Message, "log": mapstr.M{ "file": mapstr.M{ - "path": logGroup + "/" + *logEvent.LogStreamName, + "path": logGroupId + "/" + *logEvent.LogStreamName, }, }, "event": mapstr.M{ @@ -55,7 +55,7 @@ func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName st "ingested": time.Now(), }, "aws.cloudwatch": mapstr.M{ - "log_group": logGroup, + "log_group": logGroupId, "log_stream": *logEvent.LogStreamName, "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), },