Skip to content

Commit

Permalink
Fix handling of custom endpoints in AWS input (#41504)
Browse files Browse the repository at this point in the history
Fix custom endpoint selection in the S3/SQS input (#39718) by porting @strawgate's 8.14 fix (#39709) to main.

In addition to the previous fixes, this simplifies the logic for detecting queue region, since the 8.14 version still had some broken cases caused by requiring over-strict endpoint matching, and it was concluded (talking to @strawgate) that there's no advantage to rejecting standard region format from queue URLs just because the endpoint URL is different (if there is a genuine mismatch in the queue and endpoint we'll learn it from the connection attempt, not from `getRegionFromQueueURL`).
  • Loading branch information
faec authored Nov 6, 2024
1 parent 87687a5 commit cf13781
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504]

*Heartbeat*

Expand Down
25 changes: 20 additions & 5 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package awss3
import (
"errors"
"fmt"
"net/url"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -106,6 +107,13 @@ func (c *config) Validate() error {
if c.ProviderOverride != "" && c.NonAWSBucketName == "" {
return errors.New("provider can only be overridden when polling non-AWS S3 services")
}
if c.AWSConfig.Endpoint != "" {
// Make sure the given endpoint can be parsed
_, err := url.Parse(c.AWSConfig.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint: %w", err)
}
}
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
Expand Down Expand Up @@ -245,14 +253,18 @@ func (c config) getBucketARN() string {
// options struct.
// Should be provided as a parameter to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.NonAWSBucketName != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
// Apply slightly different endpoint resolvers depending on whether we're in S3 or SQS mode.
if c.AWSConfig.Endpoint != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = s3.EndpointResolverFromURL(c.AWSConfig.Endpoint,
func(e *awssdk.Endpoint) {
// The S3 hostname is immutable in bucket polling mode, mutable otherwise.
e.HostnameImmutable = (c.getBucketARN() != "")
})
}
o.UsePathStyle = c.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
Expand All @@ -269,6 +281,9 @@ func (c config) sqsConfigModifier(o *sqs.Options) {
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)
}
}

func (c config) getFileSelectors() []fileSelectorConfig {
Expand Down
15 changes: 4 additions & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package awss3
import (
"fmt"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/feature"
Expand Down Expand Up @@ -48,15 +46,10 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
return nil, fmt.Errorf("initializing AWS config: %w", err)
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
if config.RegionName != "" {
awsConfig.Region = config.RegionName
}

if config.QueueURL != "" {
Expand Down
168 changes: 168 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunSQSWithConfig(t *testing.T) {
tests := []struct {
name string
queue_url string
endpoint string
region string
default_region string
want string
wantErr error
}{
{
name: "no region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "no region but with long endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "no region but with short endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "no region custom queue domain",
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
want: "us-west-2",
},
{
name: "default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
default_region: "us-west-2",
want: "us-west-2",
},
{
name: "region and default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-east-2",
default_region: "us-east-3",
want: "us-east-2",
},
{
name: "short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://amazonaws.com",
want: "us-east-1",
},
{
name: "long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-east-1",
},
{
name: "region and custom short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://.elastic.co",
want: "us-west-2",
},
{
name: "region and custom long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.elastic.co",
want: "us-west-2",
},
{
name: "region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
}

for _, test := range tests {
logp.TestingSetup()

// Create a filebeat config using the provided test parameters
config := ""
if test.queue_url != "" {
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
}
if test.region != "" {
config += fmt.Sprintf("region: %s \n", test.region)
}
if test.default_region != "" {
config += fmt.Sprintf("default_region: %s \n", test.default_region)
}
if test.endpoint != "" {
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
}

s3Input := createInput(t, conf.MustNewConfigFrom(config))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
time.AfterFunc(5*time.Second, func() {
cancel()
})

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
// assert that err == test.wantErr
if test.wantErr != nil {
continue
}
// Print the test name to help identify the failing test
t.Fatal(test.name, err)
}

// If the endpoint starts with s3, the endpoint resolver should be null at this point
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
// If the endpoint is not set, the endpoint resolver should be null
if test.endpoint == "" {
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else if strings.HasPrefix(test.endpoint, "https://s3") {
// S3 resolvers are added later in the code than this integration test covers
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else { // If the endpoint is specified but is not s3
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
}

assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
}
}

func TestInputRunSQS(t *testing.T) {
logp.TestingSetup()

Expand Down
16 changes: 14 additions & 2 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ func TestRegionSelection(t *testing.T) {
want: "us-west-3",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_no_region_term",
queueURL: "https://sqs.abc.xyz/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
Expand Down Expand Up @@ -118,7 +130,7 @@ func TestRegionSelection(t *testing.T) {
{
name: "non_aws_vpce_without_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
wantErr: errBadQueueURL,
want: "us-east-1",
},
{
name: "non_aws_vpce_with_region_override",
Expand Down
9 changes: 0 additions & 9 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,3 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
}
return "unknown"
}

type nonAWSBucketResolver struct {
endpoint string
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
//nolint:staticcheck // haven't migrated to the new interface yet
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}
27 changes: 14 additions & 13 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ const (

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL, endpoint string) string {
func getRegionFromQueueURL(queueURL string) string {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
// We use a simple heuristic that works for all essential cases:
// - If queue hostname is sqs.X.*, return region X
// - If queue hostname is X.sqs.Y.*, return region Y
// Hosts that don't follow this convention need the input config to
// specify a custom endpoint and an explicit region.
u, err := url.Parse(queueURL)
if err != nil {
return ""
}
hostSplit := strings.SplitN(u.Hostname(), ".", 5)

// check for sqs queue url
host := strings.SplitN(u.Host, ".", 3)
if len(host) == 3 && host[0] == "sqs" {
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
return host[1]
}
// check for sqs-style queue url
if len(hostSplit) >= 4 && hostSplit[0] == "sqs" {
return hostSplit[1]
}

// check for vpce url
host = strings.SplitN(u.Host, ".", 5)
if len(host) == 5 && host[1] == "sqs" {
if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) {
return host[2]
}
// check for vpce-style url
if len(hostSplit) == 5 && hostSplit[1] == "sqs" {
return hostSplit[2]
}

return ""
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/awss3/sqs_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ func (in *sqsReaderInput) setup(
in.log = inputContext.Logger.With("queue_url", in.config.QueueURL)
in.pipeline = pipeline

in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL)
if in.config.RegionName != "" {
// Configured region always takes precedence
in.awsConfig.Region = in.config.RegionName
} else if in.detectedRegion != "" {
// Only use detected region if there is no explicit region configured.
in.awsConfig.Region = in.detectedRegion
} else if in.config.AWSConfig.DefaultRegion != "" {
// If we can't find anything else, fall back on the default.
in.awsConfig.Region = in.config.AWSConfig.DefaultRegion
} else {
// If we can't get a region from the config or the URL, return an error.
return fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL)
// If we can't find a usable region, return an error
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", errBadQueueURL)
}

in.sqs = &awsSQSAPI{
Expand Down

0 comments on commit cf13781

Please sign in to comment.