From d2867fdd9fb7845922cf3b50d82c594234486d06 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 15 Oct 2024 22:51:59 -0400 Subject: [PATCH] Add asynchronous ACK handling to S3 and SQS inputs (#40699) Modify SQS ingestion to listen for ACKs asynchronously so that input workers can keep reading new objects after a previous one has been published, instead of blocking on full upstream ingestion. This addresses the bottleneck where ingesting many small objects is slow as each one waits for a full ingestion round trip. With a default configuration, SQS queues with many small objects are now ingested up to 60x faster. --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 32 +++ go.mod | 1 + go.sum | 2 + .../filebeat.inputs.reference.xpack.yml.tmpl | 4 +- .../docs/inputs/input-aws-s3.asciidoc | 14 +- x-pack/filebeat/filebeat.reference.yml | 34 +--- x-pack/filebeat/input/awss3/acks.go | 106 ++++++++++ x-pack/filebeat/input/awss3/config.go | 59 +++--- x-pack/filebeat/input/awss3/config_test.go | 34 +--- .../input/awss3/input_benchmark_test.go | 48 ++--- .../input/awss3/input_integration_test.go | 2 +- x-pack/filebeat/input/awss3/interfaces.go | 34 +--- .../input/awss3/mock_interfaces_test.go | 189 ++---------------- x-pack/filebeat/input/awss3/s3.go | 6 +- x-pack/filebeat/input/awss3/s3_input.go | 98 +++++---- x-pack/filebeat/input/awss3/s3_objects.go | 107 ++++------ .../filebeat/input/awss3/s3_objects_test.go | 59 ++---- x-pack/filebeat/input/awss3/s3_test.go | 12 +- x-pack/filebeat/input/awss3/sqs_input.go | 95 ++++++--- x-pack/filebeat/input/awss3/sqs_s3_event.go | 152 ++++++++------ .../filebeat/input/awss3/sqs_s3_event_test.go | 99 ++++----- x-pack/filebeat/input/awss3/sqs_test.go | 42 +++- x-pack/filebeat/module/aws/_meta/config.yml | 30 +-- .../module/aws/cloudtrail/config/aws-s3.yml | 4 - .../module/aws/cloudtrail/manifest.yml | 1 - .../module/aws/s3access/config/aws-s3.yml | 4 - .../filebeat/module/aws/s3access/manifest.yml | 1 - .../module/aws/vpcflow/config/input.yml | 4 - .../filebeat/module/aws/vpcflow/manifest.yml | 1 - x-pack/filebeat/modules.d/aws.yml.disabled | 30 +-- 31 files changed, 593 insertions(+), 712 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/acks.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 76062b5a19d..72ff8083fea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450] - Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] +- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - System module events now contain `input.type: systemlogs` instead of `input.type: log` when harvesting log files. {pull}41061[41061] diff --git a/NOTICE.txt b/NOTICE.txt index bb5807f9a41..4447873499f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -23112,6 +23112,38 @@ Contents of probable licence file $GOMODCACHE/github.com/xdg-go/scram@v1.1.2/LIC of your accepting any such warranty or additional liability. +-------------------------------------------------------------------------------- +Dependency : github.com/zyedidia/generic +Version: v1.2.1 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/zyedidia/generic@v1.2.1/LICENSE: + +MIT License + +Copyright (c) 2021: Zachary Yedidia. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : go.elastic.co/apm/module/apmelasticsearch/v2 Version: v2.6.0 diff --git a/go.mod b/go.mod index c643f16b1fa..9459bb0f13e 100644 --- a/go.mod +++ b/go.mod @@ -214,6 +214,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.10 github.com/tklauser/go-sysconf v0.3.10 github.com/xdg-go/scram v1.1.2 + github.com/zyedidia/generic v1.2.1 go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 go.elastic.co/apm/module/apmhttp/v2 v2.6.0 go.elastic.co/apm/v2 v2.6.0 diff --git a/go.sum b/go.sum index 4f561fa3d6e..1a57c813276 100644 --- a/go.sum +++ b/go.sum @@ -941,6 +941,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= go.einride.tech/aip v0.67.1 h1:d/4TW92OxXBngkSOwWS2CH5rez869KpKMaN44mdxkFI= go.einride.tech/aip v0.67.1/go.mod h1:ZGX4/zKw8dcgzdLsrvpOOGxfxI2QSk12SlP7d6c0/XI= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 4a2065ddf6a..4188035f832 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -79,8 +79,8 @@ # SQS queue URL to receive messages from (required). #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue" - # Maximum number of SQS messages that can be inflight at any time. - #max_number_of_messages: 5 + # Number of workers on S3 bucket or SQS queue + #number_of_workers: 5 # Maximum duration of an AWS API call (excluding S3 GetObject calls). #api_timeout: 120s diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 43d4b102f63..aa8ecbf7259 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -307,18 +307,6 @@ The maximum number of bytes that a single log message can have. All bytes after multiline log messages, which can get large. This only applies to non-JSON logs. The default is `10 MiB`. -[float] -==== `max_number_of_messages` - -The maximum number of SQS messages that can be inflight at any time. Defaults -to 5. Setting this parameter too high can overload Elastic Agent and cause -ingest failures in situations where the SQS messages contain many S3 objects -or the S3 objects themselves contain large numbers of messages. -We recommend to keep the default value 5 and use the `Balanced` or `Optimized for -Throughput` setting in the -{fleet-guide}/es-output-settings.html#es-output-settings-performance-tuning-settings[preset] -options to tune your Elastic Agent performance. - [id="input-{type}-parsers"] [float] ==== `parsers` @@ -504,7 +492,7 @@ Prefix to apply for the list request to the S3 bucket. Default empty. [float] ==== `number_of_workers` -Number of workers that will process the S3 objects listed. (Required when `bucket_arn` is set). +Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` is set, otherwise (in the SQS case) defaults to 5. [float] diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 4dedabc28e3..c00099c3667 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -139,7 +139,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Process CloudTrail logs @@ -188,9 +188,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -212,7 +209,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -249,9 +246,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -273,7 +267,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -310,9 +304,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -334,7 +325,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -371,9 +362,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -395,7 +383,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -432,9 +420,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -456,7 +441,7 @@ filebeat.modules: # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -493,9 +478,6 @@ filebeat.modules: # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -3013,8 +2995,8 @@ filebeat.inputs: # SQS queue URL to receive messages from (required). #queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue" - # Maximum number of SQS messages that can be inflight at any time. - #max_number_of_messages: 5 + # Number of workers on S3 bucket or SQS queue + #number_of_workers: 5 # Maximum duration of an AWS API call (excluding S3 GetObject calls). #api_timeout: 120s diff --git a/x-pack/filebeat/input/awss3/acks.go b/x-pack/filebeat/input/awss3/acks.go new file mode 100644 index 00000000000..a3850c01e87 --- /dev/null +++ b/x-pack/filebeat/input/awss3/acks.go @@ -0,0 +1,106 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "github.com/zyedidia/generic/queue" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" +) + +type awsACKHandler struct { + pending *queue.Queue[pendingACK] + ackedCount int + + pendingChan chan pendingACK + ackChan chan int +} + +type pendingACK struct { + eventCount int + ackCallback func() +} + +func newAWSACKHandler() *awsACKHandler { + handler := &awsACKHandler{ + pending: queue.New[pendingACK](), + + // Channel buffer sizes are somewhat arbitrary: synchronous channels + // would be safe, but buffers slightly reduce scheduler overhead since + // the ack loop goroutine doesn't need to wake up as often. + // + // pendingChan receives one message each time an S3/SQS worker goroutine + // finishes processing an object. If it is full, workers will not be able + // to advance to the next object until the ack loop wakes up. + // + // ackChan receives approximately one message every time an acknowledged + // batch of events contains at least one event from this input. (Sometimes + // fewer if messages can be coalesced.) If it is full, acknowledgement + // notifications for inputs/queue will stall until the ack loop wakes up. + // (This is a much worse consequence than pendingChan, but ackChan also + // receives fewer messages than pendingChan by a factor of ~thousands, + // so in practice it's still low-impact.) + pendingChan: make(chan pendingACK, 10), + ackChan: make(chan int, 10), + } + go handler.run() + return handler +} + +func (ah *awsACKHandler) Add(eventCount int, ackCallback func()) { + ah.pendingChan <- pendingACK{ + eventCount: eventCount, + ackCallback: ackCallback, + } +} + +// Called when a worker is closing, to indicate to the ack handler that it +// should shut down as soon as the current pending list is acknowledged. +func (ah *awsACKHandler) Close() { + close(ah.pendingChan) +} + +func (ah *awsACKHandler) pipelineEventListener() beat.EventListener { + return acker.TrackingCounter(func(_ int, total int) { + // Notify the ack handler goroutine + ah.ackChan <- total + }) +} + +// Listener that handles both incoming metadata and ACK +// confirmations. +func (ah *awsACKHandler) run() { + for { + select { + case result, ok := <-ah.pendingChan: + if ok { + ah.pending.Enqueue(result) + } else { + // Channel is closed, reset so we don't receive any more values + ah.pendingChan = nil + } + case count := <-ah.ackChan: + ah.ackedCount += count + } + + // Finalize any objects that are now completed + for !ah.pending.Empty() && ah.ackedCount >= ah.pending.Peek().eventCount { + result := ah.pending.Dequeue() + ah.ackedCount -= result.eventCount + // Run finalization asynchronously so we don't block the SQS worker + // or the queue by ignoring the ack handler's input channels. Ordering + // is no longer important at this point. + if result.ackCallback != nil { + go result.ackCallback() + } + } + + // If the input is closed and all acks are completed, we're done + if ah.pending.Empty() && ah.pendingChan == nil { + return + } + } +} diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index b85c3f3871c..d80108590ce 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -24,37 +24,36 @@ import ( ) type config struct { - APITimeout time.Duration `config:"api_timeout"` - VisibilityTimeout time.Duration `config:"visibility_timeout"` - SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. - SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. - SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` - MaxNumberOfMessages int `config:"max_number_of_messages"` - QueueURL string `config:"queue_url"` - RegionName string `config:"region"` - BucketARN string `config:"bucket_arn"` - NonAWSBucketName string `config:"non_aws_bucket_name"` - BucketListInterval time.Duration `config:"bucket_list_interval"` - BucketListPrefix string `config:"bucket_list_prefix"` - NumberOfWorkers int `config:"number_of_workers"` - AWSConfig awscommon.ConfigAWS `config:",inline"` - FileSelectors []fileSelectorConfig `config:"file_selectors"` - ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. - PathStyle bool `config:"path_style"` - ProviderOverride string `config:"provider"` - BackupConfig backupConfig `config:",inline"` + APITimeout time.Duration `config:"api_timeout"` + VisibilityTimeout time.Duration `config:"visibility_timeout"` + SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. + SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. + SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` + QueueURL string `config:"queue_url"` + RegionName string `config:"region"` + BucketARN string `config:"bucket_arn"` + NonAWSBucketName string `config:"non_aws_bucket_name"` + BucketListInterval time.Duration `config:"bucket_list_interval"` + BucketListPrefix string `config:"bucket_list_prefix"` + NumberOfWorkers int `config:"number_of_workers"` + AWSConfig awscommon.ConfigAWS `config:",inline"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + PathStyle bool `config:"path_style"` + ProviderOverride string `config:"provider"` + BackupConfig backupConfig `config:",inline"` } func defaultConfig() config { c := config{ - APITimeout: 120 * time.Second, - VisibilityTimeout: 300 * time.Second, - BucketListInterval: 120 * time.Second, - BucketListPrefix: "", - SQSWaitTime: 20 * time.Second, - SQSMaxReceiveCount: 5, - MaxNumberOfMessages: 5, - PathStyle: false, + APITimeout: 120 * time.Second, + VisibilityTimeout: 300 * time.Second, + BucketListInterval: 120 * time.Second, + BucketListPrefix: "", + SQSWaitTime: 20 * time.Second, + SQSMaxReceiveCount: 5, + NumberOfWorkers: 5, + PathStyle: false, } c.ReaderConfig.InitDefaults() return c @@ -93,11 +92,6 @@ func (c *config) Validate() error { "less than or equal to 20s", c.SQSWaitTime) } - if c.QueueURL != "" && c.MaxNumberOfMessages <= 0 { - return fmt.Errorf("max_number_of_messages <%v> must be greater than 0", - c.MaxNumberOfMessages) - } - if c.QueueURL != "" && c.APITimeout < c.SQSWaitTime { return fmt.Errorf("api_timeout <%v> must be greater than the sqs.wait_time <%v", c.APITimeout, c.SQSWaitTime) @@ -252,6 +246,7 @@ func (c config) getBucketARN() string { // Should be provided as a parameter to s3.NewFromConfig. func (c config) s3ConfigModifier(o *s3.Options) { if c.NonAWSBucketName != "" { + //nolint:staticcheck // haven't migrated to the new interface yet o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint} } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 651f8099d91..907a5854b28 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -30,17 +30,17 @@ func TestConfig(t *testing.T) { parserConf := parser.Config{} require.NoError(t, parserConf.Unpack(conf.MustNewConfigFrom(""))) return config{ - QueueURL: quequeURL, - BucketARN: s3Bucket, - NonAWSBucketName: nonAWSS3Bucket, - APITimeout: 120 * time.Second, - VisibilityTimeout: 300 * time.Second, - SQSMaxReceiveCount: 5, - SQSWaitTime: 20 * time.Second, - BucketListInterval: 120 * time.Second, - BucketListPrefix: "", - PathStyle: false, - MaxNumberOfMessages: 5, + QueueURL: quequeURL, + BucketARN: s3Bucket, + NonAWSBucketName: nonAWSS3Bucket, + APITimeout: 120 * time.Second, + VisibilityTimeout: 300 * time.Second, + SQSMaxReceiveCount: 5, + SQSWaitTime: 20 * time.Second, + BucketListInterval: 120 * time.Second, + BucketListPrefix: "", + PathStyle: false, + NumberOfWorkers: 5, ReaderConfig: readerConfig{ BufferSize: 16 * humanize.KiByte, MaxBytes: 10 * humanize.MiByte, @@ -304,18 +304,6 @@ func TestConfig(t *testing.T) { expectedErr: "number_of_workers <0> must be greater than 0", expectedCfg: nil, }, - { - name: "error on max_number_of_messages == 0", - queueURL: queueURL, - s3Bucket: "", - nonAWSS3Bucket: "", - config: mapstr.M{ - "queue_url": queueURL, - "max_number_of_messages": "0", - }, - expectedErr: "max_number_of_messages <0> must be greater than 0", - expectedCfg: nil, - }, { name: "error on buffer_size == 0 ", queueURL: queueURL, diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 0d7d79b615b..54e22773602 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -27,8 +27,6 @@ import ( "github.com/dustin/go-humanize" "github.com/olekukonko/tablewriter" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -164,10 +162,17 @@ func (c constantS3) ListObjectsPaginator(string, string) s3Pager { var _ beat.Pipeline = (*fakePipeline)(nil) // fakePipeline returns new ackClients. -type fakePipeline struct{} +type fakePipeline struct { +} -func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { - return &ackClient{}, nil +func newFakePipeline() *fakePipeline { + return &fakePipeline{} +} + +func (c *fakePipeline) ConnectWith(config beat.ClientConfig) (beat.Client, error) { + return &ackClient{ + eventListener: config.EventListener, + }, nil } func (c *fakePipeline) Connect() (beat.Client, error) { @@ -177,13 +182,15 @@ func (c *fakePipeline) Connect() (beat.Client, error) { var _ beat.Client = (*ackClient)(nil) // ackClient is a fake beat.Client that ACKs the published messages. -type ackClient struct{} +type ackClient struct { + eventListener beat.EventListener +} func (c *ackClient) Close() error { return nil } func (c *ackClient) Publish(event beat.Event) { - // Fake the ACK handling. - event.Private.(*awscommon.EventACKTracker).ACK() + c.eventListener.AddEvent(event, true) + go c.eventListener.ACKEvents(1) } func (c *ackClient) PublishAll(event []beat.Event) { @@ -208,20 +215,20 @@ file_selectors: return inputConfig } -func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkResult { +func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult { return testing.Benchmark(func(b *testing.B) { var err error - pipeline := &fakePipeline{} config := makeBenchmarkConfig(t) - config.MaxNumberOfMessages = maxMessagesInflight + config.NumberOfWorkers = workerCount sqsReader := newSQSReaderInput(config, aws.Config{}) sqsReader.log = log.Named("sqs") - sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight) + sqsReader.pipeline = newFakePipeline() + sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), workerCount) sqsReader.sqs, err = newConstantSQS() require.NoError(t, err) sqsReader.s3 = newConstantS3(t) - sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline) + sqsReader.msgHandler, err = sqsReader.createEventProcessor() require.NoError(t, err, "createEventProcessor must succeed") ctx, cancel := context.WithCancel(context.Background()) @@ -240,7 +247,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR b.StopTimer() elapsed := time.Since(start) - b.ReportMetric(float64(maxMessagesInflight), "max_messages_inflight") + b.ReportMetric(float64(workerCount), "number_of_workers") b.ReportMetric(elapsed.Seconds(), "sec") b.ReportMetric(float64(sqsReader.metrics.s3EventsCreatedTotal.Get()), "events") @@ -303,14 +310,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult metricRegistry := monitoring.NewRegistry() metrics := newInputMetrics("test_id", metricRegistry, numberOfWorkers) - - client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { - event.Private.(*awscommon.EventACKTracker).ACK() - }) - - defer func() { - _ = client.Close() - }() + pipeline := newFakePipeline() config := makeBenchmarkConfig(t) config.NumberOfWorkers = numberOfWorkers @@ -342,13 +342,13 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult states, err := newStates(nil, store) assert.NoError(t, err, "states creation should succeed") - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}) + s3EventHandlerFactory := newS3ObjectProcessorFactory(metrics, s3API, config.FileSelectors, backupConfig{}) s3Poller := &s3PollerInput{ log: logp.NewLogger(inputName), config: config, metrics: metrics, s3: s3API, - client: client, + pipeline: pipeline, s3ObjectHandler: s3EventHandlerFactory, states: states, provider: "provider", diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 88d81a9f0c8..9303c5c7259 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -112,7 +112,7 @@ file_selectors: func makeTestConfigSQS(queueURL string) *conf.C { return conf.MustNewConfigFrom(fmt.Sprintf(`--- queue_url: %s -max_number_of_messages: 1 +number_of_workers: 1 visibility_timeout: 30s region: us-east-1 file_selectors: diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 5e9eb13d243..6a3b119303b 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -17,7 +17,6 @@ import ( "github.com/aws/smithy-go/middleware" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -41,25 +40,9 @@ import ( const s3RequestURLMetadataKey = `x-beat-s3-request-url` type sqsAPI interface { - sqsReceiver - sqsDeleter - sqsVisibilityChanger - sqsAttributeGetter -} - -type sqsReceiver interface { ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error) -} - -type sqsDeleter interface { DeleteMessage(ctx context.Context, msg *types.Message) error -} - -type sqsVisibilityChanger interface { ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error -} - -type sqsAttributeGetter interface { GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) } @@ -68,7 +51,7 @@ type sqsProcessor interface { // given message and is responsible for updating the message's visibility // timeout while it is being processed and for deleting it when processing // completes successfully. - ProcessSQS(ctx context.Context, msg *types.Message) error + ProcessSQS(ctx context.Context, msg *types.Message, eventCallback func(e beat.Event)) sqsProcessingResult } // ------ @@ -103,25 +86,18 @@ type s3ObjectHandlerFactory interface { // Create returns a new s3ObjectHandler that can be used to process the // specified S3 object. If the handler is not configured to process the // given S3 object (based on key name) then it will return nil. - Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler + Create(ctx context.Context, obj s3EventV2) s3ObjectHandler } type s3ObjectHandler interface { // ProcessS3Object downloads the S3 object, parses it, creates events, and - // publishes them. It returns when processing finishes or when it encounters - // an unrecoverable error. It does not wait for the events to be ACKed by - // the publisher before returning (use eventACKTracker's Wait() method to - // determine this). - ProcessS3Object() error + // passes to the given callback. It returns when processing finishes or + // when it encounters an unrecoverable error. + ProcessS3Object(log *logp.Logger, eventCallback func(e beat.Event)) error // FinalizeS3Object finalizes processing of an S3 object after the current // batch is finished. FinalizeS3Object() error - - // Wait waits for every event published by ProcessS3Object() to be ACKed - // by the publisher before returning. Internally it uses the - // s3ObjectHandler eventACKTracker's Wait() method - Wait() } // ------ diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index ccae48a59b2..086ca34136f 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -18,7 +18,6 @@ import ( gomock "github.com/golang/mock/gomock" beat "github.com/elastic/beats/v7/libbeat/beat" - aws "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" logp "github.com/elastic/elastic-agent-libs/logp" ) @@ -103,156 +102,6 @@ func (mr *MockSQSAPIMockRecorder) ReceiveMessage(ctx, maxMessages interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockSQSAPI)(nil).ReceiveMessage), ctx, maxMessages) } -// MocksqsReceiver is a mock of sqsReceiver interface. -type MocksqsReceiver struct { - ctrl *gomock.Controller - recorder *MocksqsReceiverMockRecorder -} - -// MocksqsReceiverMockRecorder is the mock recorder for MocksqsReceiver. -type MocksqsReceiverMockRecorder struct { - mock *MocksqsReceiver -} - -// NewMocksqsReceiver creates a new mock instance. -func NewMocksqsReceiver(ctrl *gomock.Controller) *MocksqsReceiver { - mock := &MocksqsReceiver{ctrl: ctrl} - mock.recorder = &MocksqsReceiverMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocksqsReceiver) EXPECT() *MocksqsReceiverMockRecorder { - return m.recorder -} - -// ReceiveMessage mocks base method. -func (m *MocksqsReceiver) ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReceiveMessage", ctx, maxMessages) - ret0, _ := ret[0].([]types.Message) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ReceiveMessage indicates an expected call of ReceiveMessage. -func (mr *MocksqsReceiverMockRecorder) ReceiveMessage(ctx, maxMessages interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MocksqsReceiver)(nil).ReceiveMessage), ctx, maxMessages) -} - -// MocksqsDeleter is a mock of sqsDeleter interface. -type MocksqsDeleter struct { - ctrl *gomock.Controller - recorder *MocksqsDeleterMockRecorder -} - -// MocksqsDeleterMockRecorder is the mock recorder for MocksqsDeleter. -type MocksqsDeleterMockRecorder struct { - mock *MocksqsDeleter -} - -// NewMocksqsDeleter creates a new mock instance. -func NewMocksqsDeleter(ctrl *gomock.Controller) *MocksqsDeleter { - mock := &MocksqsDeleter{ctrl: ctrl} - mock.recorder = &MocksqsDeleterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocksqsDeleter) EXPECT() *MocksqsDeleterMockRecorder { - return m.recorder -} - -// DeleteMessage mocks base method. -func (m *MocksqsDeleter) DeleteMessage(ctx context.Context, msg *types.Message) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteMessage", ctx, msg) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteMessage indicates an expected call of DeleteMessage. -func (mr *MocksqsDeleterMockRecorder) DeleteMessage(ctx, msg interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MocksqsDeleter)(nil).DeleteMessage), ctx, msg) -} - -// MocksqsVisibilityChanger is a mock of sqsVisibilityChanger interface. -type MocksqsVisibilityChanger struct { - ctrl *gomock.Controller - recorder *MocksqsVisibilityChangerMockRecorder -} - -// MocksqsVisibilityChangerMockRecorder is the mock recorder for MocksqsVisibilityChanger. -type MocksqsVisibilityChangerMockRecorder struct { - mock *MocksqsVisibilityChanger -} - -// NewMocksqsVisibilityChanger creates a new mock instance. -func NewMocksqsVisibilityChanger(ctrl *gomock.Controller) *MocksqsVisibilityChanger { - mock := &MocksqsVisibilityChanger{ctrl: ctrl} - mock.recorder = &MocksqsVisibilityChangerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocksqsVisibilityChanger) EXPECT() *MocksqsVisibilityChangerMockRecorder { - return m.recorder -} - -// ChangeMessageVisibility mocks base method. -func (m *MocksqsVisibilityChanger) ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ChangeMessageVisibility", ctx, msg, timeout) - ret0, _ := ret[0].(error) - return ret0 -} - -// ChangeMessageVisibility indicates an expected call of ChangeMessageVisibility. -func (mr *MocksqsVisibilityChangerMockRecorder) ChangeMessageVisibility(ctx, msg, timeout interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeMessageVisibility", reflect.TypeOf((*MocksqsVisibilityChanger)(nil).ChangeMessageVisibility), ctx, msg, timeout) -} - -// MocksqsAttributeGetter is a mock of sqsAttributeGetter interface. -type MocksqsAttributeGetter struct { - ctrl *gomock.Controller - recorder *MocksqsAttributeGetterMockRecorder -} - -// MocksqsAttributeGetterMockRecorder is the mock recorder for MocksqsAttributeGetter. -type MocksqsAttributeGetterMockRecorder struct { - mock *MocksqsAttributeGetter -} - -// NewMocksqsAttributeGetter creates a new mock instance. -func NewMocksqsAttributeGetter(ctrl *gomock.Controller) *MocksqsAttributeGetter { - mock := &MocksqsAttributeGetter{ctrl: ctrl} - mock.recorder = &MocksqsAttributeGetterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocksqsAttributeGetter) EXPECT() *MocksqsAttributeGetterMockRecorder { - return m.recorder -} - -// GetQueueAttributes mocks base method. -func (m *MocksqsAttributeGetter) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) - ret0, _ := ret[0].(map[string]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetQueueAttributes indicates an expected call of GetQueueAttributes. -func (mr *MocksqsAttributeGetterMockRecorder) GetQueueAttributes(ctx, attr interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsAttributeGetter)(nil).GetQueueAttributes), ctx, attr) -} - // MockSQSProcessor is a mock of sqsProcessor interface. type MockSQSProcessor struct { ctrl *gomock.Controller @@ -277,17 +126,17 @@ func (m *MockSQSProcessor) EXPECT() *MockSQSProcessorMockRecorder { } // ProcessSQS mocks base method. -func (m *MockSQSProcessor) ProcessSQS(ctx context.Context, msg *types.Message) error { +func (m *MockSQSProcessor) ProcessSQS(ctx context.Context, msg *types.Message, eventCallback func(beat.Event)) sqsProcessingResult { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessSQS", ctx, msg) - ret0, _ := ret[0].(error) + ret := m.ctrl.Call(m, "ProcessSQS", ctx, msg, eventCallback) + ret0, _ := ret[0].(sqsProcessingResult) return ret0 } // ProcessSQS indicates an expected call of ProcessSQS. -func (mr *MockSQSProcessorMockRecorder) ProcessSQS(ctx, msg interface{}) *gomock.Call { +func (mr *MockSQSProcessorMockRecorder) ProcessSQS(ctx, msg, eventCallback interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSQS", reflect.TypeOf((*MockSQSProcessor)(nil).ProcessSQS), ctx, msg) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSQS", reflect.TypeOf((*MockSQSProcessor)(nil).ProcessSQS), ctx, msg, eventCallback) } // MockS3API is a mock of s3API interface. @@ -581,17 +430,17 @@ func (m *MockS3ObjectHandlerFactory) EXPECT() *MockS3ObjectHandlerFactoryMockRec } // Create mocks base method. -func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *aws.EventACKTracker, obj s3EventV2) s3ObjectHandler { +func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, obj s3EventV2) s3ObjectHandler { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", ctx, log, client, acker, obj) + ret := m.ctrl.Call(m, "Create", ctx, obj) ret0, _ := ret[0].(s3ObjectHandler) return ret0 } // Create indicates an expected call of Create. -func (mr *MockS3ObjectHandlerFactoryMockRecorder) Create(ctx, log, client, acker, obj interface{}) *gomock.Call { +func (mr *MockS3ObjectHandlerFactoryMockRecorder) Create(ctx, obj interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockS3ObjectHandlerFactory)(nil).Create), ctx, log, client, acker, obj) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockS3ObjectHandlerFactory)(nil).Create), ctx, obj) } // MockS3ObjectHandler is a mock of s3ObjectHandler interface. @@ -632,27 +481,15 @@ func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { } // ProcessS3Object mocks base method. -func (m *MockS3ObjectHandler) ProcessS3Object() error { +func (m *MockS3ObjectHandler) ProcessS3Object(log *logp.Logger, eventCallback func(beat.Event)) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessS3Object") + ret := m.ctrl.Call(m, "ProcessS3Object", log, eventCallback) ret0, _ := ret[0].(error) return ret0 } // ProcessS3Object indicates an expected call of ProcessS3Object. -func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) -} - -// Wait mocks base method. -func (m *MockS3ObjectHandler) Wait() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Wait") -} - -// Wait indicates an expected call of Wait. -func (mr *MockS3ObjectHandlerMockRecorder) Wait() *gomock.Call { +func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object(log, eventCallback interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Wait", reflect.TypeOf((*MockS3ObjectHandler)(nil).Wait)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object), log, eventCallback) } diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index d611470ec80..9901d5fe41d 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -14,7 +14,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (*awsS3API, error) { @@ -32,9 +31,9 @@ func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (* return newAWSs3API(s3Client), nil } -func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { +func createPipelineClient(pipeline beat.Pipeline, acks *awsACKHandler) (beat.Client, error) { return pipeline.ConnectWith(beat.ClientConfig{ - EventListener: awscommon.NewEventACKHandler(), + EventListener: acks.pipelineEventListener(), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization // is not required. @@ -117,5 +116,6 @@ type nonAWSBucketResolver struct { } func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) { + //nolint:staticcheck // haven't migrated to the new interface yet return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil } diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index bd1e8f7700e..c3a83c284a2 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -17,7 +17,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/backoff" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/timed" ) @@ -28,23 +27,17 @@ var readerLoopMaxCircuitBreaker = 10 type s3PollerInput struct { log *logp.Logger + pipeline beat.Pipeline config config awsConfig awssdk.Config store beater.StateStore provider string s3 s3API metrics *inputMetrics - client beat.Client s3ObjectHandler s3ObjectHandlerFactory states *states } -// s3FetchTask contains metadata for one S3 object that a worker should fetch. -type s3FetchTask struct { - s3ObjectHandler s3ObjectHandler - objectState state -} - func newS3PollerInput( config config, awsConfig awssdk.Config, @@ -69,6 +62,7 @@ func (in *s3PollerInput) Run( pipeline beat.Pipeline, ) error { in.log = inputContext.Logger.Named("s3") + in.pipeline = pipeline var err error // Load the persistent S3 polling state. @@ -78,24 +72,16 @@ func (in *s3PollerInput) Run( } defer in.states.Close() - // Create client for publishing events and receive notification of their ACKs. - in.client, err = createPipelineClient(pipeline) - if err != nil { - return fmt.Errorf("failed to create pipeline client: %w", err) - } - defer in.client.Close() - ctx := v2.GoContextFromCanceler(inputContext.Cancelation) in.s3, err = createS3API(ctx, in.config, in.awsConfig) if err != nil { return fmt.Errorf("failed to create S3 API: %w", err) } - in.metrics = newInputMetrics(inputContext.ID, nil, in.config.MaxNumberOfMessages) + in.metrics = newInputMetrics(inputContext.ID, nil, in.config.NumberOfWorkers) defer in.metrics.Close() in.s3ObjectHandler = newS3ObjectProcessorFactory( - in.log, in.metrics, in.s3, in.config.getFileSelectors(), @@ -117,7 +103,7 @@ func (in *s3PollerInput) run(ctx context.Context) { func (in *s3PollerInput) runPoll(ctx context.Context) { var workerWg sync.WaitGroup - workChan := make(chan *s3FetchTask) + workChan := make(chan state) // Start the worker goroutines to listen on the work channel for i := 0; i < in.config.NumberOfWorkers; i++ { @@ -133,15 +119,37 @@ func (in *s3PollerInput) runPoll(ctx context.Context) { workerWg.Wait() } -func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan *s3FetchTask) { +func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) { + acks := newAWSACKHandler() + // Create client for publishing events and receive notification of their ACKs. + client, err := createPipelineClient(in.pipeline, acks) + if err != nil { + in.log.Errorf("failed to create pipeline client: %v", err.Error()) + return + } + defer client.Close() + defer acks.Close() + rateLimitWaiter := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120) - for s3ObjectPayload := range workChan { - objHandler := s3ObjectPayload.s3ObjectHandler - state := s3ObjectPayload.objectState + for _state := range workChan { + state := _state + event := in.s3EventForState(state) + + objHandler := in.s3ObjectHandler.Create(ctx, event) + if objHandler == nil { + in.log.Debugw("empty s3 processor (no matching reader configs).", "state", state) + continue + } // Process S3 object (download, parse, create events). - err := objHandler.ProcessS3Object() + publishCount := 0 + err := objHandler.ProcessS3Object(in.log, func(e beat.Event) { + in.metrics.s3EventsCreatedTotal.Inc() + client.Publish(e) + publishCount++ + }) + in.metrics.s3EventsPerObject.Update(int64(publishCount)) if errors.Is(err, errS3DownloadFailed) { // Download errors are ephemeral. Add a backoff delay, then skip to the // next iteration so we don't mark the object as permanently failed. @@ -151,9 +159,7 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan *s3Fetc // Reset the rate limit delay on results that aren't download errors. rateLimitWaiter.Reset() - // Wait for downloaded objects to be ACKed. - objHandler.Wait() - + // Update state, but don't persist it until this object is acknowledged. if err != nil { in.log.Errorf("failed processing S3 event for object key %q in bucket %q: %v", state.Key, state.Bucket, err.Error()) @@ -164,22 +170,20 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan *s3Fetc state.Stored = true } - // Persist the result, report any errors - err = in.states.AddState(state) - if err != nil { - in.log.Errorf("saving completed object state: %v", err.Error()) - } - - // Metrics - in.metrics.s3ObjectsAckedTotal.Inc() + // Add the cleanup handling to the acks helper + acks.Add(publishCount, func() { + err := in.states.AddState(state) + if err != nil { + in.log.Errorf("saving completed object state: %v", err.Error()) + } - if finalizeErr := objHandler.FinalizeS3Object(); finalizeErr != nil { - in.log.Errorf("failed finalizing objects from S3 bucket (manual cleanup is required): %w", finalizeErr) - } + // Metrics + in.metrics.s3ObjectsAckedTotal.Inc() + }) } } -func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- *s3FetchTask) { +func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) { defer close(workChan) bucketName := getBucketNameFromARN(in.config.getBucketARN()) @@ -220,31 +224,19 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- *s3Fetc continue } - s3Processor := in.createS3ObjectProcessor(ctx, state) - if s3Processor == nil { - in.log.Debugw("empty s3 processor.", "state", state) - continue - } - - workChan <- &s3FetchTask{ - s3ObjectHandler: s3Processor, - objectState: state, - } + workChan <- state in.metrics.s3ObjectsProcessedTotal.Inc() } } } -func (in *s3PollerInput) createS3ObjectProcessor(ctx context.Context, state state) s3ObjectHandler { +func (in *s3PollerInput) s3EventForState(state state) s3EventV2 { event := s3EventV2{} event.AWSRegion = in.awsConfig.Region event.Provider = in.provider event.S3.Bucket.Name = state.Bucket event.S3.Bucket.ARN = in.config.getBucketARN() event.S3.Object.Key = state.Key - - acker := awscommon.NewEventACKTracker(ctx) - - return in.s3ObjectHandler.Create(ctx, in.log, in.client, acker, event) + return event } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 82a9e817bc6..93219d9a640 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -25,30 +25,48 @@ import ( "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) -const ( - contentTypeJSON = "application/json" - contentTypeNDJSON = "application/x-ndjson" -) - type s3ObjectProcessorFactory struct { - log *logp.Logger metrics *inputMetrics s3 s3API fileSelectors []fileSelectorConfig backupConfig backupConfig } +type s3ObjectProcessor struct { + *s3ObjectProcessorFactory + + ctx context.Context + eventCallback func(beat.Event) + readerConfig *readerConfig // Config about how to process the object. + s3Obj s3EventV2 // S3 object information. + s3ObjHash string + s3RequestURL string + + s3Metadata map[string]interface{} // S3 object metadata. +} + +type s3DownloadedObject struct { + body io.ReadCloser + length int64 + contentType string + metadata map[string]interface{} +} + +const ( + contentTypeJSON = "application/json" + contentTypeNDJSON = "application/x-ndjson" +) + // errS3DownloadFailed reports problems downloading an S3 object. Download errors // should never treated as permanent, they are just an indication to apply a // retry backoff until the connection is healthy again. var errS3DownloadFailed = errors.New("S3 download failure") -func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { +func newS3ObjectProcessorFactory(metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { if metrics == nil { // Metrics are optional. Initialize a stub. metrics = newInputMetrics("", nil, 0) @@ -59,7 +77,6 @@ func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3A } } return &s3ObjectProcessorFactory{ - log: log, metrics: metrics, s3: s3, fileSelectors: sel, @@ -78,64 +95,33 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig { // Create returns a new s3ObjectProcessor. It returns nil when no file selectors // match the S3 object key. -func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, client beat.Client, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler { - log = log.With( - "bucket_arn", obj.S3.Bucket.Name, - "object_key", obj.S3.Object.Key) - +func (f *s3ObjectProcessorFactory) Create(ctx context.Context, obj s3EventV2) s3ObjectHandler { readerConfig := f.findReaderConfig(obj.S3.Object.Key) if readerConfig == nil { - log.Debug("Skipping S3 object processing. No file_selectors are a match.") + // No file_selectors are a match, skip. return nil } return &s3ObjectProcessor{ s3ObjectProcessorFactory: f, - log: log, ctx: ctx, - publisher: client, - acker: ack, readerConfig: readerConfig, s3Obj: obj, s3ObjHash: s3ObjectHash(obj), } } -// s3DownloadedObject encapsulate downloaded s3 object for internal processing -type s3DownloadedObject struct { - body io.ReadCloser - length int64 - contentType string - metadata map[string]interface{} -} - -type s3ObjectProcessor struct { - *s3ObjectProcessorFactory - - log *logp.Logger - ctx context.Context - publisher beat.Client - acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). - readerConfig *readerConfig // Config about how to process the object. - s3Obj s3EventV2 // S3 object information. - s3ObjHash string - s3RequestURL string - eventCount int64 - - s3Metadata map[string]interface{} // S3 object metadata. -} - -func (p *s3ObjectProcessor) Wait() { - p.acker.Wait() -} - -func (p *s3ObjectProcessor) ProcessS3Object() error { +func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func(e beat.Event)) error { if p == nil { return nil } + p.eventCallback = eventCallback + log = log.With( + "bucket_arn", p.s3Obj.S3.Bucket.Name, + "object_key", p.s3Obj.S3.Object.Key) // Metrics and Logging - p.log.Debug("Begin S3 object processing.") + log.Debug("Begin S3 object processing.") p.metrics.s3ObjectsRequestedTotal.Inc() p.metrics.s3ObjectsInflight.Inc() start := time.Now() @@ -143,7 +129,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { elapsed := time.Since(start) p.metrics.s3ObjectsInflight.Dec() p.metrics.s3ObjectProcessingTime.Update(elapsed.Nanoseconds()) - p.log.Debugw("End S3 object processing.", "elapsed_time_ns", elapsed) + log.Debugw("End S3 object processing.", "elapsed_time_ns", elapsed) }() // Request object (download). @@ -181,7 +167,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { for dec.next() { val, err := dec.decodeValue() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { return nil } break @@ -191,7 +177,8 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { return err } evt := p.createEvent(string(data), evtOffset) - p.publish(p.acker, &evt) + + p.eventCallback(evt) } case decoder: @@ -226,7 +213,6 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { time.Since(start).Nanoseconds(), err) } - p.metrics.s3EventsPerObject.Update(p.eventCount) return nil } @@ -298,7 +284,7 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error { data, _ := item.MarshalJSON() evt := p.createEvent(string(data), offset) - p.publish(p.acker, &evt) + p.eventCallback(evt) } return nil @@ -333,7 +319,7 @@ func (p *s3ObjectProcessor) readJSONSlice(r io.Reader, evtOffset int64) (int64, data, _ := item.MarshalJSON() evt := p.createEvent(string(data), evtOffset) - p.publish(p.acker, &evt) + p.eventCallback(evt) evtOffset++ } @@ -378,7 +364,7 @@ func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offs data, _ := item.MarshalJSON() p.s3ObjHash = objHash evt := p.createEvent(string(data), offset+arrayOffset) - p.publish(p.acker, &evt) + p.eventCallback(evt) } return nil @@ -418,7 +404,7 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { event := p.createEvent(string(message.Content), offset) event.Fields.DeepUpdate(message.Fields) offset += int64(message.Bytes) - p.publish(p.acker, &event) + p.eventCallback(event) } if errors.Is(err, io.EOF) { @@ -433,15 +419,6 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return nil } -// publish the generated event and perform necessary tracking -func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { - ack.Add() - event.Private = ack - p.eventCount += 1 - p.metrics.s3EventsCreatedTotal.Inc() - p.publisher.Publish(*event) -} - func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event { event := beat.Event{ Timestamp: time.Now().UTC(), diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 635955ed8c4..d20d81ced6c 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -148,7 +147,6 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event := newS3Event("log.txt") @@ -156,9 +154,8 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupConfig{}) + err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(logp.NewLogger(inputName), func(_ beat.Event) {}) require.Error(t, err) assert.True(t, errors.Is(err, errS3DownloadFailed), "expected errS3DownloadFailed") }) @@ -170,7 +167,6 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event := newS3Event("log.txt") @@ -178,9 +174,8 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupConfig{}) + err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(logp.NewLogger(inputName), func(_ beat.Event) {}) require.Error(t, err) }) @@ -191,23 +186,20 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "") - var events []beat.Event gomock.InOrder( mockS3API.EXPECT(). GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(s3Resp, nil), - mockPublisher.EXPECT(). - Publish(gomock.Any()). - Do(func(event beat.Event) { events = append(events, event) }). - Times(2), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() + var events []beat.Event + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupConfig{}) + err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(logp.NewLogger(inputName), func(event beat.Event) { + events = append(events, event) + }) + assert.Equal(t, 2, len(events)) require.NoError(t, err) }) @@ -218,7 +210,6 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ @@ -231,9 +222,8 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupCfg) + err := s3ObjProc.Create(ctx, s3Event).FinalizeS3Object() require.NoError(t, err) }) @@ -244,7 +234,6 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ @@ -261,9 +250,8 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupCfg) + err := s3ObjProc.Create(ctx, s3Event).FinalizeS3Object() require.NoError(t, err) }) @@ -274,7 +262,6 @@ func TestS3ObjectProcessor(t *testing.T) { ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event, _ := newS3Object(t, "testdata/log.txt", "") backupCfg := backupConfig{ @@ -288,9 +275,8 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupCfg) + err := s3ObjProc.Create(ctx, s3Event).FinalizeS3Object() require.NoError(t, err) }) @@ -320,7 +306,6 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, ctrl, ctx := gomock.WithContext(ctx, t) defer ctrl.Finish() mockS3API := NewMockS3API(ctrl) - mockPublisher := NewMockBeatClient(ctrl) s3Event, s3Resp := newS3Object(t, file, contentType) var events []beat.Event @@ -328,20 +313,16 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, mockS3API.EXPECT(). GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(s3Resp, nil), - mockPublisher.EXPECT(). - Publish(gomock.Any()). - Do(func(event beat.Event) { events = append(events, event) }). - Times(numEvents), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}) - ack := awscommon.NewEventACKTracker(ctx) - err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, selectors, backupConfig{}) + err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object( + logp.NewLogger(inputName), + func(event beat.Event) { events = append(events, event) }) if !expectErr { require.NoError(t, err) assert.Equal(t, numEvents, len(events)) - assert.EqualValues(t, numEvents, ack.PendingACKs) } else { require.Error(t, err) } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 9c6099e775a..b0b19d82831 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -36,7 +36,7 @@ func TestS3Poller(t *testing.T) { defer ctrl.Finish() mockAPI := NewMockS3API(ctrl) mockPager := NewMockS3Pager(ctrl) - mockPublisher := NewMockBeatClient(ctrl) + pipeline := newFakePipeline() gomock.InOrder( mockAPI.EXPECT(). @@ -126,7 +126,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{}) states, err := newStates(nil, store) require.NoError(t, err, "states creation must succeed") poller := &s3PollerInput{ @@ -139,7 +139,7 @@ func TestS3Poller(t *testing.T) { RegionName: "region", }, s3: mockAPI, - client: mockPublisher, + pipeline: pipeline, s3ObjectHandler: s3ObjProc, states: states, provider: "provider", @@ -162,7 +162,7 @@ func TestS3Poller(t *testing.T) { mockS3 := NewMockS3API(ctrl) mockErrorPager := NewMockS3Pager(ctrl) mockSuccessPager := NewMockS3Pager(ctrl) - mockPublisher := NewMockBeatClient(ctrl) + pipeline := newFakePipeline() gomock.InOrder( // Initial ListObjectPaginator gets an error. @@ -264,7 +264,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{}) states, err := newStates(nil, store) require.NoError(t, err, "states creation must succeed") poller := &s3PollerInput{ @@ -277,7 +277,7 @@ func TestS3Poller(t *testing.T) { RegionName: "region", }, s3: mockS3, - client: mockPublisher, + pipeline: pipeline, s3ObjectHandler: s3ObjProc, states: states, provider: "provider", diff --git a/x-pack/filebeat/input/awss3/sqs_input.go b/x-pack/filebeat/input/awss3/sqs_input.go index a92319cbe19..a4308af45a8 100644 --- a/x-pack/filebeat/input/awss3/sqs_input.go +++ b/x-pack/filebeat/input/awss3/sqs_input.go @@ -8,7 +8,6 @@ import ( "context" "fmt" "sync" - "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -29,6 +28,10 @@ type sqsReaderInput struct { log *logp.Logger metrics *inputMetrics + // The Beats pipeline, used to create clients for event publication when + // creating the worker goroutines. + pipeline beat.Pipeline + // The expected region based on the queue URL detectedRegion string @@ -46,7 +49,7 @@ func newSQSReaderInput(config config, awsConfig awssdk.Config) *sqsReaderInput { return &sqsReaderInput{ config: config, awsConfig: awsConfig, - workRequestChan: make(chan struct{}, config.MaxNumberOfMessages), + workRequestChan: make(chan struct{}, config.NumberOfWorkers), workResponseChan: make(chan types.Message), } } @@ -83,6 +86,7 @@ func (in *sqsReaderInput) setup( pipeline beat.Pipeline, ) error { in.log = inputContext.Logger.With("queue_url", in.config.QueueURL) + in.pipeline = pipeline in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) if in.config.RegionName != "" { @@ -105,10 +109,10 @@ func (in *sqsReaderInput) setup( in.s3 = newAWSs3API(s3.NewFromConfig(in.awsConfig, in.config.s3ConfigModifier)) - in.metrics = newInputMetrics(inputContext.ID, nil, in.config.MaxNumberOfMessages) + in.metrics = newInputMetrics(inputContext.ID, nil, in.config.NumberOfWorkers) var err error - in.msgHandler, err = in.createEventProcessor(pipeline) + in.msgHandler, err = in.createEventProcessor() if err != nil { return fmt.Errorf("failed to initialize sqs reader: %w", err) } @@ -161,42 +165,87 @@ func (in *sqsReaderInput) readerLoop(ctx context.Context) { } } -func (in *sqsReaderInput) workerLoop(ctx context.Context) { +type sqsWorker struct { + input *sqsReaderInput + client beat.Client + ackHandler *awsACKHandler +} + +func (in *sqsReaderInput) newSQSWorker() (*sqsWorker, error) { + // Create a pipeline client scoped to this worker. + ackHandler := newAWSACKHandler() + client, err := in.pipeline.ConnectWith(beat.ClientConfig{ + EventListener: ackHandler.pipelineEventListener(), + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: boolPtr(false), + }, + }) + if err != nil { + return nil, fmt.Errorf("connecting to pipeline: %w", err) + } + return &sqsWorker{ + input: in, + client: client, + ackHandler: ackHandler, + }, nil +} + +func (w *sqsWorker) run(ctx context.Context) { + defer w.client.Close() + defer w.ackHandler.Close() + for ctx.Err() == nil { // Send a work request select { case <-ctx.Done(): // Shutting down return - case in.workRequestChan <- struct{}{}: + case w.input.workRequestChan <- struct{}{}: } // The request is sent, wait for a response select { case <-ctx.Done(): return - case msg := <-in.workResponseChan: - start := time.Now() - - id := in.metrics.beginSQSWorker() - if err := in.msgHandler.ProcessSQS(ctx, &msg); err != nil { - in.log.Warnw("Failed processing SQS message.", - "error", err, - "message_id", *msg.MessageId, - "elapsed_time_ns", time.Since(start)) - } - in.metrics.endSQSWorker(id) + case msg := <-w.input.workResponseChan: + w.processMessage(ctx, msg) } } } +func (w *sqsWorker) processMessage(ctx context.Context, msg types.Message) { + publishCount := 0 + id := w.input.metrics.beginSQSWorker() + result := w.input.msgHandler.ProcessSQS(ctx, &msg, func(e beat.Event) { + w.client.Publish(e) + publishCount++ + }) + + if publishCount == 0 { + // No events made it through (probably an error state), wrap up immediately + result.Done() + } else { + // Add this result's Done callback to the pending ACKs list + w.ackHandler.Add(publishCount, result.Done) + } + + w.input.metrics.endSQSWorker(id) +} + func (in *sqsReaderInput) startWorkers(ctx context.Context) { // Start the worker goroutines that will fetch messages via workRequestChan // and workResponseChan until the input shuts down. - for i := 0; i < in.config.MaxNumberOfMessages; i++ { + for i := 0; i < in.config.NumberOfWorkers; i++ { in.workerWg.Add(1) go func() { defer in.workerWg.Done() - in.workerLoop(ctx) + worker, err := in.newSQSWorker() + if err != nil { + in.log.Error(err) + return + } + go worker.run(ctx) }() } } @@ -209,7 +258,7 @@ func (in *sqsReaderInput) logConfigSummary() { log.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", in.awsConfig.Region, in.detectedRegion, in.awsConfig.Region) } log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout) - log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages) + log.Infof("AWS SQS number_of_workers is set to %v.", in.config.NumberOfWorkers) if in.config.BackupConfig.GetBucketName() != "" { log.Warnf("You have the backup_to_bucket functionality activated with SQS. Please make sure to set appropriate destination buckets " + @@ -217,15 +266,15 @@ func (in *sqsReaderInput) logConfigSummary() { } } -func (in *sqsReaderInput) createEventProcessor(pipeline beat.Pipeline) (sqsProcessor, error) { +func (in *sqsReaderInput) createEventProcessor() (sqsProcessor, error) { fileSelectors := in.config.getFileSelectors() - s3EventHandlerFactory := newS3ObjectProcessorFactory(in.log.Named("s3"), in.metrics, in.s3, fileSelectors, in.config.BackupConfig) + s3EventHandlerFactory := newS3ObjectProcessorFactory(in.metrics, in.s3, fileSelectors, in.config.BackupConfig) script, err := newScriptFromConfig(in.log.Named("sqs_script"), in.config.SQSScript) if err != nil { return nil, err } - return newSQSS3EventProcessor(in.log.Named("sqs_s3_event"), in.metrics, in.sqs, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory), nil + return newSQSS3EventProcessor(in.log.Named("sqs_s3_event"), in.metrics, in.sqs, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory), nil } // Read all pending requests and return their count. If block is true, diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index a489f6a7f72..884cf7adbbc 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -20,7 +20,6 @@ import ( "go.uber.org/multierr" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" ) @@ -117,11 +116,10 @@ type eventBridgeEvent struct { } type sqsS3EventProcessor struct { - s3ObjectHandler s3ObjectHandlerFactory + s3HandlerFactory s3ObjectHandlerFactory sqsVisibilityTimeout time.Duration maxReceiveCount int sqs sqsAPI - pipeline beat.Pipeline // Pipeline creates clients for publishing events. log *logp.Logger warnOnce sync.Once metrics *inputMetrics @@ -135,7 +133,6 @@ func newSQSS3EventProcessor( script *script, sqsVisibilityTimeout time.Duration, maxReceiveCount int, - pipeline beat.Pipeline, s3 s3ObjectHandlerFactory, ) *sqsS3EventProcessor { if metrics == nil { @@ -143,18 +140,32 @@ func newSQSS3EventProcessor( metrics = newInputMetrics("", nil, 0) } return &sqsS3EventProcessor{ - s3ObjectHandler: s3, + s3HandlerFactory: s3, sqsVisibilityTimeout: sqsVisibilityTimeout, maxReceiveCount: maxReceiveCount, sqs: sqs, - pipeline: pipeline, log: log, metrics: metrics, script: script, } } -func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message) error { +type sqsProcessingResult struct { + processor *sqsS3EventProcessor + msg *types.Message + receiveCount int // How many times this SQS object has been read + eventCount int // How many events were generated from this SQS object + keepaliveCancel context.CancelFunc + processingErr error + + // Finalizer callbacks for the returned S3 events, invoked via + // finalizeS3Objects after all events are acknowledged. + finalizers []finalizerFunc +} + +type finalizerFunc func() error + +func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message, eventCallback func(beat.Event)) sqsProcessingResult { log := p.log.With( "message_id", *msg.MessageId, "message_receipt_time", time.Now().UTC()) @@ -165,7 +176,10 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message // Start SQS keepalive worker. var keepaliveWg sync.WaitGroup keepaliveWg.Add(1) - go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg) + go func() { + defer keepaliveWg.Done() + p.keepalive(keepaliveCtx, log, msg) + }() receiveCount := getSQSReceiveCount(msg.Attributes) if receiveCount == 1 { @@ -179,45 +193,69 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message } } - handles, processingErr := p.processS3Events(ctx, log, *msg.Body) + eventCount := 0 + finalizers, processingErr := p.processS3Events(ctx, log, *msg.Body, func(e beat.Event) { + eventCount++ + eventCallback(e) + }) + + return sqsProcessingResult{ + msg: msg, + processor: p, + receiveCount: receiveCount, + eventCount: eventCount, + keepaliveCancel: keepaliveCancel, + processingErr: processingErr, + finalizers: finalizers, + } +} + +// Call Done to indicate that all events from this SQS message have been +// acknowledged and it is safe to stop the keepalive routine and +// delete / finalize the message. +func (r sqsProcessingResult) Done() { + p := r.processor + processingErr := r.processingErr // Stop keepalive routine before changing visibility. - keepaliveCancel() - keepaliveWg.Wait() + r.keepaliveCancel() // No error. Delete SQS. if processingErr == nil { - if msgDelErr := p.sqs.DeleteMessage(context.Background(), msg); msgDelErr != nil { - return fmt.Errorf("failed deleting message from SQS queue (it may be reprocessed): %w", msgDelErr) + if msgDelErr := p.sqs.DeleteMessage(context.Background(), r.msg); msgDelErr != nil { + p.log.Errorf("failed deleting message from SQS queue (it may be reprocessed): %v", msgDelErr.Error()) + return + } + if p.metrics != nil { + // This nil check always passes in production, but it's nice when unit + // tests don't have to initialize irrelevant fields + p.metrics.sqsMessagesDeletedTotal.Inc() } - p.metrics.sqsMessagesDeletedTotal.Inc() // SQS message finished and deleted, finalize s3 objects - if finalizeErr := p.finalizeS3Objects(handles); finalizeErr != nil { - return fmt.Errorf("failed finalizing message from SQS queue (manual cleanup is required): %w", finalizeErr) + if finalizeErr := r.finalizeS3Objects(); finalizeErr != nil { + p.log.Errorf("failed finalizing message from SQS queue (manual cleanup is required): %v", finalizeErr.Error()) } - return nil + return } - if p.maxReceiveCount > 0 && !errors.Is(processingErr, &nonRetryableError{}) { + if p.maxReceiveCount > 0 && r.receiveCount >= p.maxReceiveCount { // Prevent poison pill messages from consuming all workers. Check how // many times this message has been received before making a disposition. - if receiveCount >= p.maxReceiveCount { - processingErr = nonRetryableErrorWrap(fmt.Errorf( - "sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w", - receiveCount, p.maxReceiveCount, processingErr)) - } + processingErr = nonRetryableErrorWrap(fmt.Errorf( + "sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w", + r.receiveCount, p.maxReceiveCount, processingErr)) } // An error that reprocessing cannot correct. Delete SQS. if errors.Is(processingErr, &nonRetryableError{}) { - if msgDelErr := p.sqs.DeleteMessage(context.Background(), msg); msgDelErr != nil { - return multierr.Combine( - fmt.Errorf("failed processing SQS message (attempted to delete message): %w", processingErr), - fmt.Errorf("failed deleting message from SQS queue (it may be reprocessed): %w", msgDelErr), - ) + if msgDelErr := p.sqs.DeleteMessage(context.Background(), r.msg); msgDelErr != nil { + p.log.Errorf("failed processing SQS message (attempted to delete message): %v", processingErr.Error()) + p.log.Errorf("failed deleting message from SQS queue (it may be reprocessed): %v", msgDelErr.Error()) + return } p.metrics.sqsMessagesDeletedTotal.Inc() - return fmt.Errorf("failed processing SQS message (message was deleted): %w", processingErr) + p.log.Errorf("failed processing SQS message (message was deleted): %w", processingErr) + return } // An error that may be resolved by letting the visibility timeout @@ -225,12 +263,10 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message // queue is enabled then the message will eventually placed on the DLQ // after maximum receives is reached. p.metrics.sqsMessagesReturnedTotal.Inc() - return fmt.Errorf("failed processing SQS message (it will return to queue after visibility timeout): %w", processingErr) + p.log.Errorf("failed processing SQS message (it will return to queue after visibility timeout): %w", processingErr) } -func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, wg *sync.WaitGroup, msg *types.Message) { - defer wg.Done() - +func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, msg *types.Message) { t := time.NewTicker(p.sqsVisibilityTimeout / 2) defer t.Stop() @@ -355,7 +391,12 @@ func (*sqsS3EventProcessor) isObjectCreatedEvents(event s3EventV2) bool { return event.EventSource == "aws:s3" && strings.HasPrefix(event.EventName, "ObjectCreated:") } -func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Logger, body string) ([]s3ObjectHandler, error) { +func (p *sqsS3EventProcessor) processS3Events( + ctx context.Context, + log *logp.Logger, + body string, + eventCallback func(beat.Event), +) ([]finalizerFunc, error) { s3Events, err := p.getS3Notifications(body) if err != nil { if errors.Is(err, context.Canceled) { @@ -371,57 +412,36 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log return nil, nil } - // Create a pipeline client scoped to this goroutine. - client, err := p.pipeline.ConnectWith(beat.ClientConfig{ - EventListener: awscommon.NewEventACKHandler(), - Processing: beat.ProcessingConfig{ - // This input only produces events with basic types so normalization - // is not required. - EventNormalization: boolPtr(false), - }, - }) - if err != nil { - return nil, err - } - defer client.Close() - - // Wait for all events to be ACKed before proceeding. - acker := awscommon.NewEventACKTracker(ctx) - defer acker.Wait() - var errs []error - var handles []s3ObjectHandler + var finalizers []finalizerFunc for i, event := range s3Events { - s3Processor := p.s3ObjectHandler.Create(ctx, log, client, acker, event) + s3Processor := p.s3HandlerFactory.Create(ctx, event) if s3Processor == nil { + // A nil result generally means that this object key doesn't match the + // user-configured filters. continue } // Process S3 object (download, parse, create events). - if err := s3Processor.ProcessS3Object(); err != nil { + if err := s3Processor.ProcessS3Object(log, eventCallback); err != nil { errs = append(errs, fmt.Errorf( "failed processing S3 event for object key %q in bucket %q (object record %d of %d in SQS notification): %w", event.S3.Object.Key, event.S3.Bucket.Name, i+1, len(s3Events), err)) } else { - handles = append(handles, s3Processor) + finalizers = append(finalizers, s3Processor.FinalizeS3Object) } } - // Make sure all s3 events were processed successfully - if len(handles) == len(s3Events) { - return handles, multierr.Combine(errs...) - } - - return nil, multierr.Combine(errs...) + return finalizers, multierr.Combine(errs...) } -func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error { +func (r sqsProcessingResult) finalizeS3Objects() error { var errs []error - for i, handle := range handles { - if err := handle.FinalizeS3Object(); err != nil { + for i, finalize := range r.finalizers { + if err := finalize(); err != nil { errs = append(errs, fmt.Errorf( "failed finalizing S3 event (object record %d of %d in SQS notification): %w", - i+1, len(handles), err)) + i+1, len(r.finalizers), err)) } } return multierr.Combine(errs...) diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 92401fe45ee..c7962bb2f0f 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "sync" "testing" "time" @@ -22,7 +21,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/timed" ) @@ -41,18 +39,16 @@ func TestSQSS3EventProcessor(t *testing.T) { defer ctrl.Finish() mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) - mockClient := NewMockBeatClient(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) gomock.InOrder( - mockBeatPipeline.EXPECT().ConnectWith(gomock.Any()).Return(mockClient, nil), - mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil), - mockClient.EXPECT().Close(), + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil), mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - require.NoError(t, p.ProcessSQS(ctx, &msg)) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &msg, func(_ beat.Event) {}) + require.NoError(t, result.processingErr) + result.Done() }) t.Run("invalid SQS JSON body does not retry", func(t *testing.T) { @@ -63,7 +59,6 @@ func TestSQSS3EventProcessor(t *testing.T) { defer ctrl.Finish() mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) invalidBodyMsg, err := newSQSMessage(newS3Event("log.json")) require.NoError(t, err) @@ -72,14 +67,13 @@ func TestSQSS3EventProcessor(t *testing.T) { body = body[10:] invalidBodyMsg.Body = &body - gomock.InOrder( - mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil), - ) + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - err = p.ProcessSQS(ctx, &invalidBodyMsg) - require.Error(t, err) - t.Log(err) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &invalidBodyMsg, func(_ beat.Event) {}) + require.Error(t, result.processingErr) + t.Log(result.processingErr) + result.Done() }) t.Run("zero S3 events in body", func(t *testing.T) { @@ -90,17 +84,16 @@ func TestSQSS3EventProcessor(t *testing.T) { defer ctrl.Finish() mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) emptyRecordsMsg, err := newSQSMessage([]s3EventV2{}...) require.NoError(t, err) - gomock.InOrder( - mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), - ) + mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - require.NoError(t, p.ProcessSQS(ctx, &emptyRecordsMsg)) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &emptyRecordsMsg, func(_ beat.Event) {}) + require.NoError(t, result.processingErr) + result.Done() }) t.Run("visibility is extended after half expires", func(t *testing.T) { @@ -114,25 +107,23 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockS3Handler := NewMockS3ObjectHandler(ctrl) - mockClient := NewMockBeatClient(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)).AnyTimes().Return(nil) gomock.InOrder( - mockBeatPipeline.EXPECT().ConnectWith(gomock.Any()).Return(mockClient, nil), - mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(ctx context.Context, _ *logp.Logger, _ beat.Client, _ *awscommon.EventACKTracker, _ s3EventV2) { + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, _ s3EventV2) { require.NoError(t, timed.Wait(ctx, 5*visibilityTimeout)) }).Return(mockS3Handler), - mockS3Handler.EXPECT().ProcessS3Object().Return(nil), - mockClient.EXPECT().Close(), + mockS3Handler.EXPECT().ProcessS3Object(gomock.Any(), gomock.Any()).Return(nil), mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), mockS3Handler.EXPECT().FinalizeS3Object().Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) - require.NoError(t, p.ProcessSQS(ctx, &msg)) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &msg, func(_ beat.Event) {}) + require.NoError(t, result.processingErr) + result.Done() }) t.Run("message returns to queue on error", func(t *testing.T) { @@ -144,20 +135,17 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockS3Handler := NewMockS3ObjectHandler(ctrl) - mockClient := NewMockBeatClient(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) gomock.InOrder( - mockBeatPipeline.EXPECT().ConnectWith(gomock.Any()).Return(mockClient, nil), - mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockS3Handler), - mockS3Handler.EXPECT().ProcessS3Object().Return(errors.New("fake connectivity problem")), - mockClient.EXPECT().Close(), + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any()).Return(mockS3Handler), + mockS3Handler.EXPECT().ProcessS3Object(gomock.Any(), gomock.Any()).Return(errors.New("fake connectivity problem")), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - err := p.ProcessSQS(ctx, &msg) - t.Log(err) - require.Error(t, err) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &msg, func(_ beat.Event) {}) + t.Log(result.processingErr) + require.Error(t, result.processingErr) + result.Done() }) t.Run("message is deleted after multiple receives", func(t *testing.T) { @@ -169,8 +157,6 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockS3Handler := NewMockS3ObjectHandler(ctrl) - mockClient := NewMockBeatClient(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) msg := msg msg.Attributes = map[string]string{ @@ -178,17 +164,16 @@ func TestSQSS3EventProcessor(t *testing.T) { } gomock.InOrder( - mockBeatPipeline.EXPECT().ConnectWith(gomock.Any()).Return(mockClient, nil), - mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockS3Handler), - mockS3Handler.EXPECT().ProcessS3Object().Return(errors.New("fake connectivity problem")), - mockClient.EXPECT().Close(), + mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any()).Return(mockS3Handler), + mockS3Handler.EXPECT().ProcessS3Object(gomock.Any(), gomock.Any()).Return(errors.New("fake connectivity problem")), mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - err := p.ProcessSQS(ctx, &msg) - t.Log(err) - require.Error(t, err) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) + result := p.ProcessSQS(ctx, &msg, func(_ beat.Event) {}) + t.Log(result.eventCount) + require.Error(t, result.processingErr) + result.Done() }) } @@ -227,16 +212,12 @@ func TestSqsProcessor_keepalive(t *testing.T) { defer ctrl.Finish() mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) - mockBeatPipeline := NewMockBeatPipeline(ctrl) mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)). Times(1).Return(tc.Err) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) - var wg sync.WaitGroup - wg.Add(1) - p.keepalive(ctx, p.log, &wg, &msg) - wg.Wait() + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockS3HandlerFactory) + p.keepalive(ctx, p.log, &msg) }) } } @@ -245,7 +226,7 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { err := logp.TestingSetup() require.NoError(t, err) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil) t.Run("s3 key is url unescaped", func(t *testing.T) { msg, err := newSQSMessage(newS3Event("Happy+Face.jpg")) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index fff17ebc1a6..8bc25397eae 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" ) @@ -33,7 +34,7 @@ func TestSQSReceiver(t *testing.T) { err := logp.TestingSetup() require.NoError(t, err) - const maxMessages = 5 + const workerCount = 5 t.Run("ReceiveMessage success", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) @@ -61,8 +62,6 @@ func TestSQSReceiver(t *testing.T) { ReceiveMessage(gomock.Any(), gomock.Any()). Times(1). DoAndReturn(func(_ context.Context, _ int) ([]types.Message, error) { - // Stop the test. - cancel() return nil, nil }) @@ -72,19 +71,43 @@ func TestSQSReceiver(t *testing.T) { return map[string]string{sqsApproximateNumberOfMessages: "10000"}, nil }).AnyTimes() + mockSQS.EXPECT(). + DeleteMessage(gomock.Any(), gomock.Any()).Times(1).Do( + func(_ context.Context, _ *types.Message) { + cancel() + }) + + logger := logp.NewLogger(inputName) + // Expect the one message returned to have been processed. mockMsgHandler.EXPECT(). - ProcessSQS(gomock.Any(), gomock.Eq(&msg)). + ProcessSQS(gomock.Any(), gomock.Eq(&msg), gomock.Any()). Times(1). - Return(nil) + DoAndReturn( + func(_ context.Context, _ *types.Message, _ func(e beat.Event)) sqsProcessingResult { + return sqsProcessingResult{ + keepaliveCancel: func() {}, + processor: &sqsS3EventProcessor{ + log: logger, + sqs: mockSQS, + }, + } + }) // Execute sqsReader and verify calls/state. - sqsReader := newSQSReaderInput(config{MaxNumberOfMessages: maxMessages}, aws.Config{}) - sqsReader.log = logp.NewLogger(inputName) + sqsReader := newSQSReaderInput(config{NumberOfWorkers: workerCount}, aws.Config{}) + sqsReader.log = logger sqsReader.sqs = mockSQS - sqsReader.msgHandler = mockMsgHandler sqsReader.metrics = newInputMetrics("", nil, 0) + sqsReader.pipeline = &fakePipeline{} + sqsReader.msgHandler = mockMsgHandler sqsReader.run(ctx) + + select { + case <-ctx.Done(): + case <-time.After(time.Second): + require.Fail(t, "Never observed SQS DeleteMessage call") + } }) t.Run("retry after ReceiveMessage error", func(t *testing.T) { @@ -120,11 +143,12 @@ func TestSQSReceiver(t *testing.T) { }).AnyTimes() // Execute SQSReader and verify calls/state. - sqsReader := newSQSReaderInput(config{MaxNumberOfMessages: maxMessages}, aws.Config{}) + sqsReader := newSQSReaderInput(config{NumberOfWorkers: workerCount}, aws.Config{}) sqsReader.log = logp.NewLogger(inputName) sqsReader.sqs = mockSQS sqsReader.msgHandler = mockMsgHandler sqsReader.metrics = newInputMetrics("", nil, 0) + sqsReader.pipeline = &fakePipeline{} sqsReader.run(ctx) }) } diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index e92cb36e7b5..da0377b6e46 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -14,7 +14,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Process CloudTrail logs @@ -63,9 +63,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -87,7 +84,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -124,9 +121,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -148,7 +142,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -185,9 +179,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -209,7 +200,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -246,9 +237,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -270,7 +258,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -307,9 +295,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -331,7 +316,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -368,9 +353,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml index ada3a502fc2..0f395737a05 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml @@ -77,10 +77,6 @@ role_arn: {{ .role_arn }} fips_enabled: {{ .fips_enabled }} {{ end }} -{{ if .max_number_of_messages }} -max_number_of_messages: {{ .max_number_of_messages }} -{{ end }} - {{ if .proxy_url }} proxy_url: {{ .proxy_url }} {{ end }} diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index f19760eb637..84e6d906037 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -28,7 +28,6 @@ var: default: true - name: fips_enabled - name: proxy_url - - name: max_number_of_messages - name: ssl ingest_pipeline: ingest/pipeline.yml diff --git a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml index 8ce1970290d..4c026080925 100644 --- a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml @@ -62,10 +62,6 @@ role_arn: {{ .role_arn }} fips_enabled: {{ .fips_enabled }} {{ end }} -{{ if .max_number_of_messages }} -max_number_of_messages: {{ .max_number_of_messages }} -{{ end }} - {{ if .proxy_url }} proxy_url: {{ .proxy_url }} {{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index e52ba673757..dc17d116928 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -22,7 +22,6 @@ var: default: [forwarded] - name: fips_enabled - name: proxy_url - - name: max_number_of_messages - name: ssl ingest_pipeline: ingest/pipeline.yml diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index ecb1842be7a..34feb9880b6 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -64,10 +64,6 @@ role_arn: {{ .role_arn }} fips_enabled: {{ .fips_enabled }} {{ end }} -{{ if .max_number_of_messages }} -max_number_of_messages: {{ .max_number_of_messages }} -{{ end }} - {{ if .proxy_url }} proxy_url: {{ .proxy_url }} {{ end }} diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index de772408a86..0787eb019b7 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -22,7 +22,6 @@ var: default: [forwarded, preserve_original_event] - name: fips_enabled - name: proxy_url - - name: max_number_of_messages - name: ssl - name: format default: diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index c730b8aea07..44d5e768ddc 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -17,7 +17,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Process CloudTrail logs @@ -66,9 +66,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -90,7 +87,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -127,9 +124,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -151,7 +145,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -188,9 +182,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -212,7 +203,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -249,9 +240,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -273,7 +261,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -310,9 +298,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128 @@ -334,7 +319,7 @@ # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s - # Number of workers on S3 bucket + # Number of workers on S3 bucket or SQS queue #var.number_of_workers: 5 # Filename of AWS credential file @@ -371,9 +356,6 @@ # Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. #var.fips_enabled: false - # The maximum number of messages to return from SQS. Valid values: 1 to 10. - #var.max_number_of_messages: 5 - # URL to proxy AWS API calls #var.proxy_url: http://proxy:3128