Skip to content

Commit

Permalink
Fix partition key computation for aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: James Elias Sigurdarson <[email protected]>
  • Loading branch information
jamiees2 authored and zhonghui12 committed Aug 26, 2021
1 parent 08798bb commit 35f3cc9
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 82 deletions.
51 changes: 40 additions & 11 deletions aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/md5"
"fmt"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
Expand All @@ -30,52 +31,80 @@ type Aggregator struct {
records []*Record
aggSize int // Size of both records, and partitionKeys in bytes
maxAggRecordSize int
stringGen *util.RandomStringGenerator
}

// NewAggregator create a new aggregator
func NewAggregator() *Aggregator {
func NewAggregator(stringGen *util.RandomStringGenerator) *Aggregator {

return &Aggregator{
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
stringGen: stringGen,
}
}

// AddRecord to the aggregate buffer.
// Will return a kinesis PutRecordsRequest once buffer is full, or if the data exceeds the aggregate limit.
func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {
func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {

partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
if hasPartitionKey {
partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
}
}

dataSize := len(data)

// If this is a very large record, then don't aggregate it.
if dataSize >= a.maxAggRecordSize {
if !hasPartitionKey {
partitionKey = a.stringGen.RandomString()
}
return &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
}, nil
}

if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take any partition key from the map, as long as one exists
for k, _ := range a.partitionKeys {
partitionKey = k
break
}
} else {
partitionKey = a.stringGen.RandomString()
}
}

// Check if we need to add a new partition key, and if we do how much space it will take
pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)

// data field size is proto size of data + data field number size
// partition key field size is varint of index size + field number size
recordSize := protowire.SizeBytes(dataSize) + fieldNumberSize + protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is proto size of data + field number of parent proto
addedSize := protowire.SizeBytes(recordSize) + fieldNumberSize
dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize
pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is byte size of data + pkey field + field number of parent proto

if a.getSize()+addedSize+pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return
if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return if error
entry, err = a.AggregateRecords()
if err != nil {
return entry, err
}

if !hasPartitionKey {
// choose a new partition key if needed now that we've aggregated the previous records
partitionKey = a.stringGen.RandomString()
}
// Recompute field size, since it changed
pKeyIdx, _ = a.checkPartitionKey(partitionKey)
pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
}

// Add new record, and update aggSize
Expand All @@ -86,7 +115,7 @@ func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis
PartitionKeyIndex: &partitionKeyIndex,
})

a.aggSize += addedSize
a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize

return entry, err
}
Expand Down
21 changes: 18 additions & 3 deletions aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,34 @@ package aggregate
import (
"testing"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/stretchr/testify/assert"
)

const concurrencyRetryLimit = 4

func TestAddRecordCalculatesCorrectSize(t *testing.T) {
aggregator := NewAggregator()
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)

_, err := aggregator.AddRecord("test partition key", []byte("test value"))
_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size")

_, err = aggregator.AddRecord("test partition key 2", []byte("test value 2"))
_, err = aggregator.AddRecord("test partition key 2", true, []byte("test value 2"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 76, aggregator.aggSize, "Expected aggregator to compute correct size")
}

func TestAddRecordDoesNotAddNewRandomPartitionKey(t *testing.T) {
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)

_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size")

_, err = aggregator.AddRecord("", false, []byte("test value 2"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 1, len(aggregator.partitionKeys), "Expected aggregator to reuse partitionKey value")
}
2 changes: 1 addition & 1 deletion fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
}

if isAggregate && partitionKey != "" {
logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simaltaniously", pluginID)
logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simultaneously", pluginID)
}

var concurrencyInt, concurrencyRetriesInt int
Expand Down
76 changes: 28 additions & 48 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"bytes"
"compress/zlib"
"fmt"
"math/rand"
"os"
"strings"
"sync/atomic"
"time"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/aggregate"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
Expand All @@ -42,8 +42,7 @@ import (
)

const (
partitionKeyCharset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
truncatedSuffix = "[Truncated...]"
truncatedSuffix = "[Truncated...]"
)

const (
Expand All @@ -65,11 +64,6 @@ type PutRecordsClient interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

type random struct {
seededRandom *rand.Rand
buffer []byte
}

// CompressionType indicates the type of compression to apply to each record
type CompressionType string

Expand Down Expand Up @@ -98,16 +92,15 @@ type OutputPlugin struct {
client PutRecordsClient
timer *plugins.Timeout
PluginID int
random *random
stringGen *util.RandomStringGenerator
Concurrency int
concurrencyRetryLimit int
// Concurrency is the limit, goroutineCount represents the running goroutines
goroutineCount int32
goroutineCount int32
// Used to implement backoff for concurrent flushes
concurrentRetries uint32
isAggregate bool
aggregator *aggregate.Aggregator
aggregatePartitionKey string
compression CompressionType
// If specified, dots in key names should be replaced with other symbols
replaceDots string
Expand All @@ -130,11 +123,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
return nil, err
}

seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
random := &random{
seededRandom: seededRand,
buffer: make([]byte, 8),
}
stringGen := util.NewRandomStringGenerator(8)

var timeFormatter *strftime.Strftime
if timeKey != "" {
Expand All @@ -150,7 +139,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd

var aggregator *aggregate.Aggregator
if isAggregate {
aggregator = aggregate.NewAggregator()
aggregator = aggregate.NewAggregator(stringGen)
}

return &OutputPlugin{
Expand All @@ -164,7 +153,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
logKey: logKey,
timer: timer,
PluginID: pluginID,
random: random,
stringGen: stringGen,
Concurrency: concurrency,
concurrencyRetryLimit: retryLimit,
isAggregate: isAggregate,
Expand Down Expand Up @@ -249,23 +238,30 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques
record[outputPlugin.timeKey] = buf.String()
}

partitionKey := outputPlugin.getPartitionKey(record)
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
data, err := outputPlugin.processRecord(record, partitionKey)
partitionKey, hasPartitionKey := outputPlugin.getPartitionKey(record)
var partitionKeyLen = len(partitionKey)
if !hasPartitionKey {
partitionKeyLen = outputPlugin.stringGen.Size
}
data, err := outputPlugin.processRecord(record, partitionKeyLen)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
return fluentbit.FLB_OK
}

if !outputPlugin.isAggregate {
if !hasPartitionKey {
partitionKey = outputPlugin.stringGen.RandomString()
}
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
*records = append(*records, &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
})
} else {
// Use the KPL aggregator to buffer records isAggregate is true
aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, data)
aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, hasPartitionKey, data)
if err != nil {
logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
Expand All @@ -275,7 +271,6 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques
// If aggRecord isn't nil, then a full kinesis record has been aggregated
if aggRecord != nil {
*records = append(*records, aggRecord)
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}
}

Expand All @@ -294,7 +289,6 @@ func (outputPlugin *OutputPlugin) FlushAggregatedRecords(records *[]*kinesis.Put

if aggRecord != nil {
*records = append(*records, aggRecord)
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}

return fluentbit.FLB_OK
Expand Down Expand Up @@ -426,7 +420,7 @@ func replaceDots(obj map[interface{}]interface{}, replacement string) map[interf
return obj
}

func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) {
func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKeyLen int) ([]byte, error) {
if outputPlugin.dataKeys != "" {
record = plugins.DataKeys(outputPlugin.dataKeys, record)
}
Expand Down Expand Up @@ -473,9 +467,9 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
}
}

if len(data)+len(partitionKey) > maximumRecordSize {
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+len(partitionKey), outputPlugin.stream)
data = data[:maximumRecordSize-len(partitionKey)-len(truncatedSuffix)]
if len(data)+partitionKeyLen > maximumRecordSize {
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream)
data = data[:maximumRecordSize-partitionKeyLen-len(truncatedSuffix)]
data = append(data, []byte(truncatedSuffix)...)
}

Expand Down Expand Up @@ -554,15 +548,6 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco
return retCode, nil
}

// randomString generates a random string of length 8
// it uses the math/rand library
func (outputPlugin *OutputPlugin) randomString() string {
for i := range outputPlugin.random.buffer {
outputPlugin.random.buffer[i] = partitionKeyCharset[outputPlugin.random.seededRandom.Intn(len(partitionKeyCharset))]
}
return string(outputPlugin.random.buffer)
}

func getFromMap(dataKey string, record map[interface{}]interface{}) interface{} {
for k, v := range record {
currentKey := stringOrByteArray(k)
Expand All @@ -575,8 +560,9 @@ func getFromMap(dataKey string, record map[interface{}]interface{}) interface{}
}

// getPartitionKey returns the value for a given valid key
// if the given key is empty or invalid, it returns a random string
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string {
// if the given key is empty or invalid, it returns empty
// second return value indicates whether a partition key was found or not
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) (string, bool) {
partitionKey := outputPlugin.partitionKey
if partitionKey != "" {
partitionKeys := strings.Split(partitionKey, "->")
Expand All @@ -589,25 +575,19 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa
if len(value) > partitionKeyMaxLength {
value = value[0:partitionKeyMaxLength]
}
return value
return value, true
}
}
_, ok := newRecord.(map[interface{}]interface{})
if ok {
record = newRecord.(map[interface{}]interface{})
} else {
logrus.Errorf("[kinesis %d] The partition key could not be found in the record, using a random string instead", outputPlugin.PluginID)
return outputPlugin.randomString()
return "", false
}
}
}
if outputPlugin.isAggregate {
if outputPlugin.aggregatePartitionKey == "" {
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}
return outputPlugin.aggregatePartitionKey
}
return outputPlugin.randomString()
return "", false
}

func zlibCompress(data []byte) ([]byte, error) {
Expand Down
Loading

0 comments on commit 35f3cc9

Please sign in to comment.