Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of custom endpoints in AWS input #41504

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504]

*Heartbeat*

Expand Down
25 changes: 20 additions & 5 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import (
"errors"
"fmt"
"net/url"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -106,6 +107,13 @@
if c.ProviderOverride != "" && c.NonAWSBucketName == "" {
return errors.New("provider can only be overridden when polling non-AWS S3 services")
}
if c.AWSConfig.Endpoint != "" {
// Make sure the given endpoint can be parsed
_, err := url.Parse(c.AWSConfig.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint: %w", err)
}
}
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
Expand Down Expand Up @@ -245,14 +253,18 @@
// options struct.
// Should be provided as a parameter to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.NonAWSBucketName != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
// Apply slightly different endpoint resolvers depending on whether we're in S3 or SQS mode.
if c.AWSConfig.Endpoint != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = s3.EndpointResolverFromURL(c.AWSConfig.Endpoint,
func(e *awssdk.Endpoint) {
// The S3 hostname is immutable in bucket polling mode, mutable otherwise.
e.HostnameImmutable = (c.getBucketARN() != "")
})
}
o.UsePathStyle = c.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
Expand All @@ -269,6 +281,9 @@
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)

Check failure on line 285 in x-pack/filebeat/input/awss3/config.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: o.EndpointResolver is deprecated: Deprecated: EndpointResolver and WithEndpointResolver. Providing a value for this field will likely prevent you from using any endpoint-related service features released after the introduction of EndpointResolverV2 and BaseEndpoint. (staticcheck)
}
}

func (c config) getFileSelectors() []fileSelectorConfig {
Expand Down
15 changes: 4 additions & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package awss3
import (
"fmt"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/feature"
Expand Down Expand Up @@ -48,15 +46,10 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
return nil, fmt.Errorf("initializing AWS config: %w", err)
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
if config.RegionName != "" {
awsConfig.Region = config.RegionName
}

if config.QueueURL != "" {
Expand Down
168 changes: 168 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunSQSWithConfig(t *testing.T) {
tests := []struct {
name string
queue_url string
endpoint string
region string
default_region string
want string
wantErr error
}{
{
name: "no region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "no region but with long endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "no region but with short endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "no region custom queue domain",
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
want: "us-west-2",
},
{
name: "default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
default_region: "us-west-2",
want: "us-west-2",
},
{
name: "region and default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-east-2",
default_region: "us-east-3",
want: "us-east-2",
},
{
name: "short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://amazonaws.com",
want: "us-east-1",
},
{
name: "long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-east-1",
},
{
name: "region and custom short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://.elastic.co",
want: "us-west-2",
},
{
name: "region and custom long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.elastic.co",
want: "us-west-2",
},
{
name: "region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
}

for _, test := range tests {
logp.TestingSetup()

// Create a filebeat config using the provided test parameters
config := ""
if test.queue_url != "" {
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
}
if test.region != "" {
config += fmt.Sprintf("region: %s \n", test.region)
}
if test.default_region != "" {
config += fmt.Sprintf("default_region: %s \n", test.default_region)
}
if test.endpoint != "" {
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
}

s3Input := createInput(t, conf.MustNewConfigFrom(config))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
time.AfterFunc(5*time.Second, func() {
cancel()
})

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
// assert that err == test.wantErr
if test.wantErr != nil {
continue
}
// Print the test name to help identify the failing test
t.Fatal(test.name, err)
}

// If the endpoint starts with s3, the endpoint resolver should be null at this point
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
// If the endpoint is not set, the endpoint resolver should be null
if test.endpoint == "" {
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else if strings.HasPrefix(test.endpoint, "https://s3") {
// S3 resolvers are added later in the code than this integration test covers
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else { // If the endpoint is specified but is not s3
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
}

assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
}
}

func TestInputRunSQS(t *testing.T) {
logp.TestingSetup()

Expand Down
16 changes: 14 additions & 2 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ func TestRegionSelection(t *testing.T) {
want: "us-west-3",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_no_region_term",
queueURL: "https://sqs.abc.xyz/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
Expand Down Expand Up @@ -118,7 +130,7 @@ func TestRegionSelection(t *testing.T) {
{
name: "non_aws_vpce_without_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
wantErr: errBadQueueURL,
want: "us-east-1",
},
{
name: "non_aws_vpce_with_region_override",
Expand Down
9 changes: 0 additions & 9 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,3 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
}
return "unknown"
}

type nonAWSBucketResolver struct {
endpoint string
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
//nolint:staticcheck // haven't migrated to the new interface yet
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}
27 changes: 14 additions & 13 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ const (

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL, endpoint string) string {
func getRegionFromQueueURL(queueURL string) string {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
// We use a simple heuristic that works for all essential cases:
// - If queue hostname is sqs.X.*, return region X
// - If queue hostname is X.sqs.Y.*, return region Y
// Hosts that don't follow this convention need the input config to
// specify a custom endpoint and an explicit region.
u, err := url.Parse(queueURL)
if err != nil {
return ""
}
hostSplit := strings.SplitN(u.Hostname(), ".", 5)

// check for sqs queue url
host := strings.SplitN(u.Host, ".", 3)
if len(host) == 3 && host[0] == "sqs" {
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
return host[1]
}
// check for sqs-style queue url
if len(hostSplit) >= 4 && hostSplit[0] == "sqs" {
return hostSplit[1]
}

// check for vpce url
host = strings.SplitN(u.Host, ".", 5)
if len(host) == 5 && host[1] == "sqs" {
if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) {
return host[2]
}
// check for vpce-style url
if len(hostSplit) == 5 && hostSplit[1] == "sqs" {
return hostSplit[2]
}

return ""
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/awss3/sqs_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ func (in *sqsReaderInput) setup(
in.log = inputContext.Logger.With("queue_url", in.config.QueueURL)
in.pipeline = pipeline

in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL)
if in.config.RegionName != "" {
// Configured region always takes precedence
in.awsConfig.Region = in.config.RegionName
} else if in.detectedRegion != "" {
// Only use detected region if there is no explicit region configured.
in.awsConfig.Region = in.detectedRegion
} else if in.config.AWSConfig.DefaultRegion != "" {
// If we can't find anything else, fall back on the default.
in.awsConfig.Region = in.config.AWSConfig.DefaultRegion
} else {
// If we can't get a region from the config or the URL, return an error.
return fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL)
// If we can't find a usable region, return an error
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", errBadQueueURL)
}

in.sqs = &awsSQSAPI{
Expand Down
Loading