Skip to content

Commit

Permalink
Add sts_endpoint param to kinesis plugin (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
PettitWesley authored Jul 17, 2020
1 parent b6f852c commit ae47001
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
8 changes: 5 additions & 3 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[kinesis %d] plugin parameter partition_key = '%s'", pluginID, partitionKey)
roleARN := output.FLBPluginConfigKey(ctx, "role_arn")
logrus.Infof("[kinesis %d] plugin parameter role_arn = '%s'", pluginID, roleARN)
endpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint)
kinesisEndpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, kinesisEndpoint)
stsEndpoint := output.FLBPluginConfigKey(ctx, "sts_endpoint")
logrus.Infof("[kinesis %d] plugin parameter sts_endpoint = '%s'", pluginID, stsEndpoint)
appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
Expand Down Expand Up @@ -130,7 +132,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
concurrencyRetriesInt = defaultConcurrentRetries
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID)
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID)
}

// The "export" comments have syntactic meaning
Expand Down
36 changes: 19 additions & 17 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, concurrency, retryLimit int, appendNewline bool, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, endpoint)
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt string, concurrency, retryLimit int, appendNewline bool, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,28 +144,30 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint,
}

// newPutRecordsClient creates the Kinesis client for calling the PutRecords method
func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*kinesis.Kinesis, error) {
func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string) (*kinesis.Kinesis, error) {
customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == endpoints.KinesisServiceID && kinesisEndpoint != "" {
return endpoints.ResolvedEndpoint{
URL: kinesisEndpoint,
}, nil
} else if service == endpoints.StsServiceID && stsEndpoint != "" {
return endpoints.ResolvedEndpoint{
URL: stsEndpoint,
}, nil
}
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(awsRegion),
Region: aws.String(awsRegion),
EndpointResolver: endpoints.ResolverFunc(customResolverFn),
CredentialsChainVerboseErrors: aws.Bool(true),
})
if err != nil {
return nil, err
}

svcConfig := &aws.Config{}
if endpoint != "" {
defaultResolver := endpoints.DefaultResolver()
cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == "kinesis" {
return endpoints.ResolvedEndpoint{
URL: endpoint,
}, nil
}
return defaultResolver.EndpointFor(service, region, optFns...)
}
svcConfig.EndpointResolver = endpoints.ResolverFunc(cwCustomResolverFn)
}

if roleARN != "" {
creds := stscreds.NewCredentials(sess, roleARN)
svcConfig.Credentials = creds
Expand Down

0 comments on commit ae47001

Please sign in to comment.