From a4f0692e2e6e4c1d8e5d6d70b661f435889471a4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 15 May 2020 15:17:25 -0700 Subject: [PATCH 01/40] Remove exponential backoff code --- fluent-bit-kinesis.go | 12 ++++-------- kinesis/kinesis.go | 34 ++++++++++++++++------------------ kinesis/kinesis_test.go | 5 ++--- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 5d2fba6..96dfd63 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -144,10 +144,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } count++ } - err := kinesisOutput.Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err) - return output.FLB_ERROR + retCode := kinesisOutput.Flush() + if retCode != output.FLB_OK { + return retCode } logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) @@ -158,10 +157,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin for i := range pluginInstances { - err := pluginInstances[i].Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err) - } + pluginInstances[i].Flush() } return output.FLB_OK diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index fd02b78..22005f7 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -82,7 +82,6 @@ type OutputPlugin struct { client PutRecordsClient records []*kinesis.PutRecordsRequestEntry dataLength int - backoff *plugins.Backoff timer *plugins.Timeout PluginID int random *random @@ -134,7 +133,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey: timeKey, fmtStrftime: timeFormatter, lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), timer: timer, PluginID: pluginID, random: random, @@ -199,12 +197,11 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, newRecordSize := len(data) + len(partitionKey) if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - err = outputPlugin.sendCurrentBatch() + retCode, err := outputPlugin.sendCurrentBatch() if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - // If FluentBit fails to send logs, it will retry rather than discarding the logs - return fluentbit.FLB_RETRY } + return retCode } outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ @@ -216,8 +213,13 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, } // Flush sends the current buffer of log records -func (outputPlugin *OutputPlugin) Flush() error { - return outputPlugin.sendCurrentBatch() +// Returns FLB_OK, FLB_RETRY, FLB_ERROR +func (outputPlugin *OutputPlugin) Flush() int { + retCode, err := outputPlugin.sendCurrentBatch() + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + return retCode } func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { @@ -251,12 +253,11 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() error { +func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } - outputPlugin.backoff.Wait() outputPlugin.timer.Check() response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ @@ -269,11 +270,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - // Backoff and Retry - outputPlugin.backoff.StartBackoff() } } - return err + return fluentbit.FLB_RETRY, err } logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) @@ -282,12 +281,12 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error { // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) error { +func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { outputPlugin.timer.Start() - return fmt.Errorf("PutRecords request returned with no records successfully recieved") + return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) @@ -299,8 +298,8 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord failedRecords = append(failedRecords, outputPlugin.records[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - // Backoff and Retry - outputPlugin.backoff.StartBackoff() + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + return fluentbit.FLB_RETRY, nil } } @@ -313,11 +312,10 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.backoff.Reset() outputPlugin.records = outputPlugin.records[:0] outputPlugin.dataLength = 0 } - return nil + return fluentbit.FLB_OK, nil } // randomString generates a random string of length 8 diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 9769c28..1cff431 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -40,7 +40,6 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug dataKeys: "", partitionKey: "", lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), timer: timer, PluginID: 0, random: random, @@ -98,6 +97,6 @@ func TestAddRecordAndFlush(t *testing.T) { retCode := outputPlugin.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - err := outputPlugin.Flush() - assert.NoError(t, err, "Unexpected error calling flush") + retCode = outputPlugin.Flush() + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") } From b9e09755947a5edb82f50e65c7a82ebd8fe253de Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:04:42 -0700 Subject: [PATCH 02/40] experimental: return immediately and send data in a goroutine --- fluent-bit-kinesis.go | 21 +++++++++++++++++--- kinesis/kinesis.go | 45 ++++++++++++++++++++----------------------- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 96dfd63..35fdb55 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -22,10 +22,16 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis" + kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +const ( + // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maximumRecordsPerPut = 500 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -107,6 +113,11 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { + go pluginConcurrentFlush(ctx, data, length, tag) + return output.FLB_OK +} + +func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { var count int var ret int var ts interface{} @@ -120,6 +131,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + // Each flush must have its own output buffer, since flushes can be concurrent + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -138,13 +152,13 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(record, ×tamp) + retCode := kinesisOutput.AddRecord(records, record, ×tamp) if retCode != output.FLB_OK { return retCode } count++ } - retCode := kinesisOutput.Flush() + retCode := kinesisOutput.Flush(records) if retCode != output.FLB_OK { return retCode } @@ -155,9 +169,10 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Before final exit, call Flush() for all the instances of the Output Plugin for i := range pluginInstances { - pluginInstances[i].Flush() + pluginInstances[i].Flush(records) } return output.FLB_OK diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 22005f7..c22467e 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -80,7 +80,6 @@ type OutputPlugin struct { fmtStrftime *strftime.Strftime lastInvalidPartitionKeyIndex int client PutRecordsClient - records []*kinesis.PutRecordsRequestEntry dataLength int timer *plugins.Timeout PluginID int @@ -94,7 +93,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, return nil, err } - records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String()) logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID) @@ -126,7 +124,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, return &OutputPlugin{ stream: stream, client: client, - records: records, dataKeys: dataKeys, partitionKey: partitionKey, appendNewline: appendNewline, @@ -175,7 +172,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full // the return value is one of: FLB_OK FLB_RETRY // API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int { +func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -186,7 +183,7 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(record) + partitionKey := outputPlugin.getPartitionKey(records, record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -196,15 +193,15 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, newRecordSize := len(data) + len(partitionKey) - if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - retCode, err := outputPlugin.sendCurrentBatch() + if len(records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } return retCode } - outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ + records = append(records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) @@ -214,8 +211,8 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR -func (outputPlugin *OutputPlugin) Flush() int { - retCode, err := outputPlugin.sendCurrentBatch() +func (outputPlugin *OutputPlugin) Flush(records []*kinesis.PutRecordsRequestEntry) int { + retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } @@ -253,15 +250,15 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) + logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: outputPlugin.records, + Records: records, StreamName: aws.String(outputPlugin.stream), }) if err != nil { @@ -274,17 +271,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { } return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(records)) - return outputPlugin.processAPIResponse(response) + return outputPlugin.processAPIResponse(records, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(records)) { outputPlugin.timer.Start() return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } @@ -295,7 +292,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, outputPlugin.records[i]) + failedRecords = append(failedRecords, records[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) @@ -303,16 +300,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord } } - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.records = append(outputPlugin.records, failedRecords...) + records = records[:0] + records = append(records, failedRecords...) outputPlugin.dataLength = 0 - for _, record := range outputPlugin.records { + for _, record := range records { outputPlugin.dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.records = outputPlugin.records[:0] + records = records[:0] outputPlugin.dataLength = 0 } return fluentbit.FLB_OK, nil @@ -329,7 +326,7 @@ func (outputPlugin *OutputPlugin) randomString() string { // getPartitionKey returns the value for a given valid key // if the given key is emapty or invalid, it returns a random string -func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { +func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}) string { partitionKey := outputPlugin.partitionKey if partitionKey != "" { for k, v := range record { @@ -344,7 +341,7 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(outputPlugin.records) % maximumRecordsPerPut + outputPlugin.lastInvalidPartitionKeyIndex = len(records) % maximumRecordsPerPut } return outputPlugin.randomString() } From d3096cf27ee1eb912c1ff1a1c703eeac2797f19d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:13:22 -0700 Subject: [PATCH 03/40] Add retries to goroutine/concurrent mode --- fluent-bit-kinesis.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 35fdb55..b3d76d2 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -32,6 +32,10 @@ const ( maximumRecordsPerPut = 500 ) +const ( + retries = 2 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -113,10 +117,19 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - go pluginConcurrentFlush(ctx, data, length, tag) + go flushWithRetries(ctx, data, length, tag, retries) return output.FLB_OK } +func flushWithRetries(ctx, data unsafe.Pointer, length C.int, tag *C.char, retries int) { + for i := 0; i < retries; i++ { + retCode := pluginConcurrentFlush(ctx, data, length, tag) + if retCode != output.FLB_RETRY { + break + } + } +} + func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { var count int var ret int From a5c845238f872d60c7b4b61ded0e4d346f1fd433 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:24:28 -0700 Subject: [PATCH 04/40] flush on exit does not make sense in goroutine/concurrent mode --- fluent-bit-kinesis.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index b3d76d2..9ba9ece 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -182,11 +182,10 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) //export FLBPluginExit func FLBPluginExit() int { - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Before final exit, call Flush() for all the instances of the Output Plugin - for i := range pluginInstances { - pluginInstances[i].Flush(records) - } + // for i := range pluginInstances { + // pluginInstances[i].Flush(records) + // } return output.FLB_OK } From 195d299b2608aba718d3b987cdf5869256d2a51b Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:41:55 -0700 Subject: [PATCH 05/40] tmp --- kinesis/kinesis.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index c22467e..d41701f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -251,7 +251,12 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface } func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { + if len(records) == 0 { + return fluentbit.FLB_OK, nil + } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { + logrus.Infof("lastInvalidPartitionKeyIndex: %d\n", outputPlugin.lastInvalidPartitionKeyIndex) + logrus.Infof("len(records): %d\n", len(records)) logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } From 4282eb702952b58ec35a2ef8e406c0c1209889cc Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:03:22 -0700 Subject: [PATCH 06/40] its not working... --- kinesis/kinesis.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d41701f..5fa6b8f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -254,18 +254,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords if len(records) == 0 { return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Infof("lastInvalidPartitionKeyIndex: %d\n", outputPlugin.lastInvalidPartitionKeyIndex) - logrus.Infof("len(records): %d\n", len(records)) + if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() - + logrus.Infof("About to send %d records to %s", len(records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ Records: records, StreamName: aws.String(outputPlugin.stream), }) + logrus.Infof("Tried send %d records", len(records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From aee0813a149c0f42cc9eeb4c5faa8dd8fe72fc75 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:10:24 -0700 Subject: [PATCH 07/40] Hmmm... --- kinesis/kinesis.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 5fa6b8f..f3c58a4 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -252,6 +252,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { if len(records) == 0 { + logrus.Info("No records") return fluentbit.FLB_OK, nil } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { @@ -264,7 +265,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords Records: records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried send %d records", len(records)) + logrus.Infof("Tried to send %d records", len(records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From d4fdcb7f99e9cc720cf97fbef674bb185e262e82 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:35:09 -0700 Subject: [PATCH 08/40] Need to use a pointer to a slice --- fluent-bit-kinesis.go | 4 ++-- kinesis/kinesis.go | 40 ++++++++++++++++++++-------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 9ba9ece..1747edb 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -165,13 +165,13 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(records, record, ×tamp) + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) if retCode != output.FLB_OK { return retCode } count++ } - retCode := kinesisOutput.Flush(records) + retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index f3c58a4..9834d36 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -172,7 +172,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full // the return value is one of: FLB_OK FLB_RETRY // API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { +func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -183,7 +183,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(records, record) + partitionKey := outputPlugin.getPartitionKey(*records, record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -193,7 +193,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest newRecordSize := len(data) + len(partitionKey) - if len(records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { + if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -201,7 +201,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest return retCode } - records = append(records, &kinesis.PutRecordsRequestEntry{ + *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) @@ -211,7 +211,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR -func (outputPlugin *OutputPlugin) Flush(records []*kinesis.PutRecordsRequestEntry) int { +func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -250,22 +250,22 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { - if len(records) == 0 { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry) (int, error) { + if len(*records) == 0 { logrus.Info("No records") return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) + if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { + logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, (*records)[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() - logrus.Infof("About to send %d records to %s", len(records), outputPlugin.stream) + logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: records, + Records: *records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried to send %d records", len(records)) + logrus.Infof("Tried to send %d records", len(*records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() @@ -276,17 +276,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords } return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) return outputPlugin.processAPIResponse(records, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { outputPlugin.timer.Start() return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } @@ -297,7 +297,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecor for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, records[i]) + failedRecords = append(failedRecords, (*records)[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) @@ -305,16 +305,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecor } } - records = records[:0] - records = append(records, failedRecords...) + *records = (*records)[:0] + *records = append(*records, failedRecords...) outputPlugin.dataLength = 0 - for _, record := range records { + for _, record := range *records { outputPlugin.dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - records = records[:0] + *records = (*records)[:0] outputPlugin.dataLength = 0 } return fluentbit.FLB_OK, nil From ab8243bcd813ee6c49d5c4738679acb64703d026 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:44:53 -0700 Subject: [PATCH 09/40] Fix bug from backoff code change --- kinesis/kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 9834d36..24638c2 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -197,8 +197,8 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + return retCode } - return retCode } *records = append(*records, &kinesis.PutRecordsRequestEntry{ From b6bee2dc400af5440487e8b0f298a69c8829b0b0 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 18 May 2020 23:41:11 -0700 Subject: [PATCH 10/40] Concurrency fix: process incoming C data structures before returning --- fluent-bit-kinesis.go | 52 +++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 1747edb..db3c71b 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -117,36 +117,33 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - go flushWithRetries(ctx, data, length, tag, retries) + events, timestamps := unpackRecords(data, length) + go flushWithRetries(ctx, tag, events, timestamps, retries) return output.FLB_OK } -func flushWithRetries(ctx, data unsafe.Pointer, length C.int, tag *C.char, retries int) { +func flushWithRetries(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, data, length, tag) + retCode := pluginConcurrentFlush(ctx, tag, events, timestamps) if retCode != output.FLB_RETRY { break } } } -func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { +func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time) { var count int var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + records = make([]map[interface{}]interface{}, int(length)) + timestamps = make([]time.Time, int(length)) + // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) - kinesisOutput := getPluginInstance(ctx) - fluentTag := C.GoString(tag) - logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - - // Each flush must have its own output buffer, since flushes can be concurrent - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -165,17 +162,44 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(&records, record, ×tamp) + records = append(records, record) + timestamps = append(timestamps, timestamp) + + count++ + } + + return records, timestamps +} + +func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time) int { + var i int = 0 + var timestamp time.Time + var event map[interface{}]interface{} + + kinesisOutput := getPluginInstance(ctx) + fluentTag := C.GoString(tag) + logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + + // Each flush must have its own output buffe r, since flushes can be concurrent + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + + for { + if i >= len(events) || i >= len(timestamps) { + break + } + event = events[i] + timestamp = timestamps[i] + retCode := kinesisOutput.AddRecord(&records, event, ×tamp) if retCode != output.FLB_OK { return retCode } - count++ + i++ } retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) + logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, i, fluentTag) return output.FLB_OK } From dcdc6e059f99231c56090ec13e876fbe7306a84e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 00:55:21 -0700 Subject: [PATCH 11/40] Fix record length bug --- fluent-bit-kinesis.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index db3c71b..86626e2 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -117,29 +117,29 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - events, timestamps := unpackRecords(data, length) - go flushWithRetries(ctx, tag, events, timestamps, retries) + events, timestamps, count := unpackRecords(data, length) + go flushWithRetries(ctx, tag, count, events, timestamps, retries) return output.FLB_OK } -func flushWithRetries(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { +func flushWithRetries(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, tag, events, timestamps) + retCode := pluginConcurrentFlush(ctx, tag, count, events, timestamps) if retCode != output.FLB_RETRY { break } } } -func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time) { - var count int +func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time, count int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + count = 0 - records = make([]map[interface{}]interface{}, int(length)) - timestamps = make([]time.Time, int(length)) + records = make([]map[interface{}]interface{}, 100) + timestamps = make([]time.Time, 100) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) @@ -168,11 +168,10 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} count++ } - return records, timestamps + return records, timestamps, count } -func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time) int { - var i int = 0 +func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time) int { var timestamp time.Time var event map[interface{}]interface{} @@ -183,10 +182,7 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interfa // Each flush must have its own output buffe r, since flushes can be concurrent records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - for { - if i >= len(events) || i >= len(timestamps) { - break - } + for i := 0; i < count; i++ { event = events[i] timestamp = timestamps[i] retCode := kinesisOutput.AddRecord(&records, event, ×tamp) @@ -199,7 +195,7 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interfa if retCode != output.FLB_OK { return retCode } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, i, fluentTag) + logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) return output.FLB_OK } From 62c7464d24fa8f004266e4ea84c874f16d579c6e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 01:47:52 -0700 Subject: [PATCH 12/40] Debugging --- kinesis/kinesis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 24638c2..ad220b3 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -191,6 +191,8 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } + logrus.Infof("Processing record %\ns", string(data)) + newRecordSize := len(data) + len(partitionKey) if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { From 1dd656eca8ac1bbcf00121fa8e0e1083738ab8ad Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:23:26 -0700 Subject: [PATCH 13/40] Debugging... --- fluent-bit-kinesis.go | 13 +++++++++++++ kinesis/kinesis.go | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 86626e2..9320b8f 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -26,6 +26,7 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +import jsoniter "github.com/json-iterator/go" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords @@ -162,6 +163,18 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} timestamp = time.Now() } + if record == nil { + logrus.Info("unpack: null record") + } else { + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err != nil { + logrus.Infof("unpack: %s\n", err) + } else { + logrus.Infof("unpack: %s\n", string(data)) + } + } + records = append(records, record) timestamps = append(timestamps, timestamp) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index ad220b3..7a86ba2 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -191,7 +191,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Processing record %\ns", string(data)) + logrus.Infof("Processing record %s\n", string(data)) newRecordSize := len(data) + len(partitionKey) From f95a0df44acdaafa6c7132cd644f9f22616d0583 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:32:43 -0700 Subject: [PATCH 14/40] Debugging... --- fluent-bit-kinesis.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 9320b8f..ebb55bf 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -138,6 +138,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} var timestamp time.Time var record map[interface{}]interface{} count = 0 + all_good := true records = make([]map[interface{}]interface{}, 100) timestamps = make([]time.Time, 100) @@ -165,11 +166,15 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} if record == nil { logrus.Info("unpack: null record") + all_good = false } else { var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err != nil { - logrus.Infof("unpack: %s\n", err) + if len(data) == 0 { + logrus.Info("unpack: record has zero length") + all_good = false + } } else { logrus.Infof("unpack: %s\n", string(data)) } @@ -180,6 +185,12 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} count++ } + logrus.Infof("Processed %d records", count) + if all_good { + logrus.Info("All good") + } else { + logrus.Info("Not all good") + } return records, timestamps, count } From ce3e513987a239865b892abe04d68970dbef9d1b Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:39:13 -0700 Subject: [PATCH 15/40] Oops --- fluent-bit-kinesis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index ebb55bf..3c0cfce 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -175,8 +175,6 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: record has zero length") all_good = false } - } else { - logrus.Infof("unpack: %s\n", string(data)) } } From 20229de0c79375c6ed0a5a48b094af9fcc54fded Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:51:14 -0700 Subject: [PATCH 16/40] hmmm --- fluent-bit-kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 3c0cfce..ae8f4a6 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -170,7 +170,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} } else { var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) - if err != nil { + if err == nil { if len(data) == 0 { logrus.Info("unpack: record has zero length") all_good = false From cb5cff191f6fcd58586e7dcb207bbee19232ddf3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 20:10:43 -0700 Subject: [PATCH 17/40] debugging --- fluent-bit-kinesis.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index ae8f4a6..54099a7 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -175,6 +175,9 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: record has zero length") all_good = false } + } else { + logrus.Info("unpack: unmarshal error") + all_good = false } } From 8f2af76b3b826af5bfc7b435260a868eb4f742f8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 22:56:12 -0700 Subject: [PATCH 18/40] What is going on --- fluent-bit-kinesis.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 54099a7..2750638 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -193,6 +193,21 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("Not all good") } + for i := 0; i < count; i++ { + record = records[i] + if record == nil { + logrus.Infof("unpack: %d is null\n", i) + continue + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err == nil { + logrus.Infof("unpack: %s\n", string(data)) + } else { + logrus.Info("unpack 2: unmarshal error") + } + } + return records, timestamps, count } @@ -207,6 +222,21 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events [] // Each flush must have its own output buffe r, since flushes can be concurrent records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + for i := 0; i < count; i++ { + event = events[i] + if event == nil { + logrus.Infof("flush: %d is null\n", i) + continue + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(event) + if err == nil { + logrus.Infof("flush: %s\n", string(data)) + } else { + logrus.Info("flush: unmarshal error") + } + } + for i := 0; i < count; i++ { event = events[i] timestamp = timestamps[i] From c2b6133b3684ea06623cd239697fa5f853718ab3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:09:18 -0700 Subject: [PATCH 19/40] makes no sense at all --- fluent-bit-kinesis.go | 1 - 1 file changed, 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 2750638..7b4fd3c 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -197,7 +197,6 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} record = records[i] if record == nil { logrus.Infof("unpack: %d is null\n", i) - continue } var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) From ab590103cab2e5a4afa9d02f56ba36f18d8dbd42 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:17:28 -0700 Subject: [PATCH 20/40] WAT THE --- fluent-bit-kinesis.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 7b4fd3c..304a481 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -167,18 +167,13 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} if record == nil { logrus.Info("unpack: null record") all_good = false + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err == nil { + logrus.Infof("unpack 2: %s\n", string(data)) } else { - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - if len(data) == 0 { - logrus.Info("unpack: record has zero length") - all_good = false - } - } else { - logrus.Info("unpack: unmarshal error") - all_good = false - } + logrus.Info("unpack 2: unmarshal error") } records = append(records, record) @@ -196,12 +191,12 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} for i := 0; i < count; i++ { record = records[i] if record == nil { - logrus.Infof("unpack: %d is null\n", i) + logrus.Infof("unpack 2: %d is null\n", i) } var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err == nil { - logrus.Infof("unpack: %s\n", string(data)) + logrus.Infof("unpack 2: %s\n", string(data)) } else { logrus.Info("unpack 2: unmarshal error") } From 070ef3b57201c9d2bb7d2fc0b9ce5917942ee386 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:26:29 -0700 Subject: [PATCH 21/40] cats --- fluent-bit-kinesis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 304a481..7a0f8ce 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -168,6 +168,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: null record") all_good = false } + logrus.Info("unpack: %v", record) var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err == nil { @@ -190,6 +191,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} for i := 0; i < count; i++ { record = records[i] + logrus.Info("unpack 2: %v", record) if record == nil { logrus.Infof("unpack 2: %d is null\n", i) } From 2f45335f6e666cef2d9dfb96da7c158381a24e3d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:00:40 -0700 Subject: [PATCH 22/40] Call AddRecord before spawning goroutine --- fluent-bit-kinesis.go | 94 ++++++++----------------------------------- kinesis/kinesis.go | 53 +++++++++++++----------- 2 files changed, 46 insertions(+), 101 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 7a0f8ce..c90221f 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -26,7 +26,6 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) -import jsoniter "github.com/json-iterator/go" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords @@ -118,30 +117,32 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - events, timestamps, count := unpackRecords(data, length) - go flushWithRetries(ctx, tag, count, events, timestamps, retries) + kinesisOutput := getPluginInstance(ctx) + events, count, retCode := unpackRecords(kinesisOutput, data, length) + if retCode != output.FLB_OK { + return retCode + } + go flushWithRetries(kinesisOutput, tag, count, events, retries) return output.FLB_OK } -func flushWithRetries(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { +func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, tag, count, events, timestamps) + retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) if retCode != output.FLB_RETRY { break } } } -func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time, count int) { +func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} - count = 0 - all_good := true + count := 0 - records = make([]map[interface{}]interface{}, 100) - timestamps = make([]time.Time, 100) + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) @@ -164,84 +165,21 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} timestamp = time.Now() } - if record == nil { - logrus.Info("unpack: null record") - all_good = false - } - logrus.Info("unpack: %v", record) - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - logrus.Infof("unpack 2: %s\n", string(data)) - } else { - logrus.Info("unpack 2: unmarshal error") + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) + if retCode != output.FLB_OK { + return nil, 0, retCode } - records = append(records, record) - timestamps = append(timestamps, timestamp) - count++ } - logrus.Infof("Processed %d records", count) - if all_good { - logrus.Info("All good") - } else { - logrus.Info("Not all good") - } - - for i := 0; i < count; i++ { - record = records[i] - logrus.Info("unpack 2: %v", record) - if record == nil { - logrus.Infof("unpack 2: %d is null\n", i) - } - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - logrus.Infof("unpack 2: %s\n", string(data)) - } else { - logrus.Info("unpack 2: unmarshal error") - } - } - return records, timestamps, count + return records, count, output.FLB_OK } -func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time) int { - var timestamp time.Time - var event map[interface{}]interface{} - - kinesisOutput := getPluginInstance(ctx) +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry) int { fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - // Each flush must have its own output buffe r, since flushes can be concurrent - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - - for i := 0; i < count; i++ { - event = events[i] - if event == nil { - logrus.Infof("flush: %d is null\n", i) - continue - } - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(event) - if err == nil { - logrus.Infof("flush: %s\n", string(data)) - } else { - logrus.Info("flush: unmarshal error") - } - } - - for i := 0; i < count; i++ { - event = events[i] - timestamp = timestamps[i] - retCode := kinesisOutput.AddRecord(&records, event, ×tamp) - if retCode != output.FLB_OK { - return retCode - } - i++ - } retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 7a86ba2..0f7569e 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -80,7 +80,6 @@ type OutputPlugin struct { fmtStrftime *strftime.Strftime lastInvalidPartitionKeyIndex int client PutRecordsClient - dataLength int timer *plugins.Timeout PluginID int random *random @@ -169,9 +168,8 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki return client, nil } -// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full -// the return value is one of: FLB_OK FLB_RETRY -// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned +// AddRecord accepts a record and adds it to the buffer +// the return value is one of: FLB_OK FLB_RETRY FLB_ERROR func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) @@ -191,30 +189,39 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Processing record %s\n", string(data)) - - newRecordSize := len(data) + len(partitionKey) - - if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - retCode, err := outputPlugin.sendCurrentBatch(records) - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - return retCode - } - } + logrus.Infof("Adding record %s\n", string(data)) *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) - outputPlugin.dataLength += newRecordSize return fluentbit.FLB_OK } // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { - retCode, err := outputPlugin.sendCurrentBatch(records) + // Each flush must have its own output buffe r, since flushes can be concurrent + requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + dataLength := 0 + + for _, record := range *records { + newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) + + if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + return retCode + } + } + + requestBuf = append(requestBuf, record) + dataLength += newRecordSize + } + + // send any remaining records + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } @@ -252,7 +259,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry) (int, error) { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { if len(*records) == 0 { logrus.Info("No records") return fluentbit.FLB_OK, nil @@ -280,12 +287,12 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord } logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) - return outputPlugin.processAPIResponse(records, response) + return outputPlugin.processAPIResponse(records, dataLength, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { @@ -309,15 +316,15 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco *records = (*records)[:0] *records = append(*records, failedRecords...) - outputPlugin.dataLength = 0 + *dataLength = 0 for _, record := range *records { - outputPlugin.dataLength += len(record.Data) + *dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() *records = (*records)[:0] - outputPlugin.dataLength = 0 + *dataLength = 0 } return fluentbit.FLB_OK, nil } From 4b35c4d5f685c90300cac550bac5afdfbcc9204a Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:15:40 -0700 Subject: [PATCH 23/40] Remove log message used for debug --- kinesis/kinesis.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 0f7569e..d984e83 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -189,8 +189,6 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Adding record %s\n", string(data)) - *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), @@ -261,7 +259,6 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { if len(*records) == 0 { - logrus.Info("No records") return fluentbit.FLB_OK, nil } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { From f63d185f281c585e7d5ddb97dd6da1751390bd46 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:26:52 -0700 Subject: [PATCH 24/40] Remove invalid partition key field from output plugin- todo: fix later --- kinesis/kinesis.go | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d984e83..9ff3452 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -75,14 +75,13 @@ type OutputPlugin struct { // Partition key decides in which shard of your stream the data belongs to partitionKey string // Decides whether to append a newline after each data record - appendNewline bool - timeKey string - fmtStrftime *strftime.Strftime - lastInvalidPartitionKeyIndex int - client PutRecordsClient - timer *plugins.Timeout - PluginID int - random *random + appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime + client PutRecordsClient + timer *plugins.Timeout + PluginID int + random *random } // NewOutputPlugin creates an OutputPlugin object @@ -121,17 +120,16 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } return &OutputPlugin{ - stream: stream, - client: client, - dataKeys: dataKeys, - partitionKey: partitionKey, - appendNewline: appendNewline, - timeKey: timeKey, - fmtStrftime: timeFormatter, - lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: pluginID, - random: random, + stream: stream, + client: client, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, + timer: timer, + PluginID: pluginID, + random: random, }, nil } @@ -261,10 +259,6 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord if len(*records) == 0 { return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, (*records)[outputPlugin.lastInvalidPartitionKeyIndex].Data) - outputPlugin.lastInvalidPartitionKeyIndex = -1 - } outputPlugin.timer.Check() logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ @@ -352,7 +346,6 @@ func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsR } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(records) % maximumRecordsPerPut } return outputPlugin.randomString() } From a7d796282a00d4e7c113794af1533ee61813ebab Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 22 May 2020 00:10:33 -0700 Subject: [PATCH 25/40] Fix bug in error handling logic --- fluent-bit-kinesis.go | 1 + kinesis/kinesis.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index c90221f..82c4508 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -128,6 +128,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { for i := 0; i < retries; i++ { + // TODO: Would probably want to backoff before retrying? retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) if retCode != output.FLB_RETRY { break diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 9ff3452..4530230 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -208,6 +208,8 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + if retCode != fluentbit.FLB_OK { return retCode } } From 949dd4e4520250d023979113e52398f3832b4b46 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 22 May 2020 00:36:12 -0700 Subject: [PATCH 26/40] Remove unneeded info statements --- kinesis/kinesis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 4530230..b8fd64f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -262,12 +262,10 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord return fluentbit.FLB_OK, nil } outputPlugin.timer.Check() - logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ Records: *records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried to send %d records", len(*records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From 7f3209b92d9f47454a604050931a684214e4b908 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 29 May 2020 13:09:55 -0400 Subject: [PATCH 27/40] Fix retry logic for concurrent flushes. Fixes duplicate logs when kinesis limits are exceeded. Reduces the likelihood of dropped logs when kinesis provides backpressure (improvements in retries). Adds log messages to indicate when and how many records are dropped due to retry timeouts. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 42 +++++++++++++++++++++++++++++++++++------- kinesis/kinesis.go | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 82c4508..8b59d25 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -33,7 +33,8 @@ const ( ) const ( - retries = 2 + retries = 6 + concurrentRetryLimit = 4 ) var ( @@ -118,8 +119,16 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { kinesisOutput := getPluginInstance(ctx) + + curRetries := kinesisOutput.GetConcurrentRetries() + if curRetries > concurrentRetryLimit { + logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries) + return output.FLB_RETRY + } + events, count, retCode := unpackRecords(kinesisOutput, data, length) if retCode != output.FLB_OK { + logrus.Errorf("[kinesis] failed to unpackRecords\n") return retCode } go flushWithRetries(kinesisOutput, tag, count, events, retries) @@ -127,12 +136,32 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { - for i := 0; i < retries; i++ { - // TODO: Would probably want to backoff before retrying? - retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) + var retCode, tries int + + backoff := kinesisOutput.GetConcurrentRetries() + + for tries = 0; tries < retries; tries++ { + if backoff > 0 { + // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff + time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + } + + logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } + backoff = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + } + if tries > 0 { + kinesisOutput.AddConcurrentRetries(int64(-tries)) + } + if retCode == output.FLB_ERROR { + logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records)) + } + if retCode == output.FLB_RETRY { + logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries) } } @@ -177,11 +206,10 @@ func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, len return records, count, output.FLB_OK } -func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry) int { +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int { fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - - retCode := kinesisOutput.Flush(&records) + retCode := kinesisOutput.Flush(records) if retCode != output.FLB_OK { return retCode } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index b8fd64f..d11b7ae 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -82,6 +83,8 @@ type OutputPlugin struct { timer *plugins.Timeout PluginID int random *random + // Used to implement backoff for concurrent flushes + concurrentRetries int64 } // NewOutputPlugin creates an OutputPlugin object @@ -201,7 +204,7 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) dataLength := 0 - for _, record := range *records { + for i, record := range *records { newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { @@ -210,6 +213,10 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } if retCode != fluentbit.FLB_OK { + unsent := (*records)[i:] + // requestBuf will contain records sendCurrentBatch failed to send, + // combine those with the records yet to be sent/batched + *records = append(requestBuf, unsent...) return retCode } } @@ -223,6 +230,8 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } + // requestBuf will contain records sendCurrentBatch failed to send + *records = requestBuf return retCode } @@ -284,6 +293,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { + + var retCode int = fluentbit.FLB_OK + if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { @@ -291,7 +303,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } - logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records for i, record := range response.Records { @@ -299,12 +311,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) failedRecords = append(failedRecords, (*records)[i]) } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - return fluentbit.FLB_RETRY, nil + retCode = fluentbit.FLB_RETRY } } + if retCode == fluentbit.FLB_RETRY { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + } + *records = (*records)[:0] *records = append(*records, failedRecords...) *dataLength = 0 @@ -317,7 +333,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco *records = (*records)[:0] *dataLength = 0 } - return fluentbit.FLB_OK, nil + return retCode, nil } // randomString generates a random string of length 8 @@ -361,3 +377,13 @@ func stringOrByteArray(v interface{}) string { return "" } } + +// GetConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) GetConcurrentRetries() int64 { + return atomic.LoadInt64(&outputPlugin.concurrentRetries) +} + +// AddConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) AddConcurrentRetries(val int64) int64 { + return atomic.AddInt64(&outputPlugin.concurrentRetries, int64(val)) +} From a25c093628aaa69c70646b4fdfc660ffc609ca75 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Mon, 1 Jun 2020 13:55:29 -0400 Subject: [PATCH 28/40] Address code review. Rename backoff to currentRetries for clarity. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 8b59d25..5516498 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -138,21 +138,21 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { var retCode, tries int - backoff := kinesisOutput.GetConcurrentRetries() + currentRetries := kinesisOutput.GetConcurrentRetries() for tries = 0; tries < retries; tries++ { - if backoff > 0 { - // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff - time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond) } - logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } - backoff = kinesisOutput.AddConcurrentRetries(1) - logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + currentRetries = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) } if tries > 0 { kinesisOutput.AddConcurrentRetries(int64(-tries)) From ada60a398ada3ddb99da09a65cfe01bf689dff35 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Tue, 2 Jun 2020 14:14:48 -0400 Subject: [PATCH 29/40] Address code review. Make limitsExceeded var to ensure message is valid. Signed-off-by: Zack Wine --- kinesis/kinesis.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d11b7ae..dd12574 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -295,6 +295,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { var retCode int = fluentbit.FLB_OK + var limitsExceeded bool if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) @@ -314,10 +315,11 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { retCode = fluentbit.FLB_RETRY + limitsExceeded = true } } - if retCode == fluentbit.FLB_RETRY { + if limitsExceeded { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) } From 96dd60a90dbe56f5cde8d5a30b2aa20c565b41f6 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 29 May 2020 13:09:55 -0400 Subject: [PATCH 30/40] Fix retry logic for concurrent flushes. Fixes duplicate logs when kinesis limits are exceeded. Reduces the likelihood of dropped logs when kinesis provides backpressure (improvements in retries). Adds log messages to indicate when and how many records are dropped due to retry timeouts. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 42 +++++++++++++++++++++++++++++++++++------- kinesis/kinesis.go | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 82c4508..8b59d25 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -33,7 +33,8 @@ const ( ) const ( - retries = 2 + retries = 6 + concurrentRetryLimit = 4 ) var ( @@ -118,8 +119,16 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { kinesisOutput := getPluginInstance(ctx) + + curRetries := kinesisOutput.GetConcurrentRetries() + if curRetries > concurrentRetryLimit { + logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries) + return output.FLB_RETRY + } + events, count, retCode := unpackRecords(kinesisOutput, data, length) if retCode != output.FLB_OK { + logrus.Errorf("[kinesis] failed to unpackRecords\n") return retCode } go flushWithRetries(kinesisOutput, tag, count, events, retries) @@ -127,12 +136,32 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { - for i := 0; i < retries; i++ { - // TODO: Would probably want to backoff before retrying? - retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) + var retCode, tries int + + backoff := kinesisOutput.GetConcurrentRetries() + + for tries = 0; tries < retries; tries++ { + if backoff > 0 { + // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff + time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + } + + logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } + backoff = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + } + if tries > 0 { + kinesisOutput.AddConcurrentRetries(int64(-tries)) + } + if retCode == output.FLB_ERROR { + logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records)) + } + if retCode == output.FLB_RETRY { + logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries) } } @@ -177,11 +206,10 @@ func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, len return records, count, output.FLB_OK } -func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry) int { +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int { fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - - retCode := kinesisOutput.Flush(&records) + retCode := kinesisOutput.Flush(records) if retCode != output.FLB_OK { return retCode } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index b8fd64f..d11b7ae 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -82,6 +83,8 @@ type OutputPlugin struct { timer *plugins.Timeout PluginID int random *random + // Used to implement backoff for concurrent flushes + concurrentRetries int64 } // NewOutputPlugin creates an OutputPlugin object @@ -201,7 +204,7 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) dataLength := 0 - for _, record := range *records { + for i, record := range *records { newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { @@ -210,6 +213,10 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } if retCode != fluentbit.FLB_OK { + unsent := (*records)[i:] + // requestBuf will contain records sendCurrentBatch failed to send, + // combine those with the records yet to be sent/batched + *records = append(requestBuf, unsent...) return retCode } } @@ -223,6 +230,8 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } + // requestBuf will contain records sendCurrentBatch failed to send + *records = requestBuf return retCode } @@ -284,6 +293,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { + + var retCode int = fluentbit.FLB_OK + if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { @@ -291,7 +303,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } - logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records for i, record := range response.Records { @@ -299,12 +311,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) failedRecords = append(failedRecords, (*records)[i]) } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - return fluentbit.FLB_RETRY, nil + retCode = fluentbit.FLB_RETRY } } + if retCode == fluentbit.FLB_RETRY { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + } + *records = (*records)[:0] *records = append(*records, failedRecords...) *dataLength = 0 @@ -317,7 +333,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco *records = (*records)[:0] *dataLength = 0 } - return fluentbit.FLB_OK, nil + return retCode, nil } // randomString generates a random string of length 8 @@ -361,3 +377,13 @@ func stringOrByteArray(v interface{}) string { return "" } } + +// GetConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) GetConcurrentRetries() int64 { + return atomic.LoadInt64(&outputPlugin.concurrentRetries) +} + +// AddConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) AddConcurrentRetries(val int64) int64 { + return atomic.AddInt64(&outputPlugin.concurrentRetries, int64(val)) +} From 87c39b447cfc60e75726ca774792977a8a496beb Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Mon, 1 Jun 2020 13:55:29 -0400 Subject: [PATCH 31/40] Address code review. Rename backoff to currentRetries for clarity. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 8b59d25..5516498 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -138,21 +138,21 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { var retCode, tries int - backoff := kinesisOutput.GetConcurrentRetries() + currentRetries := kinesisOutput.GetConcurrentRetries() for tries = 0; tries < retries; tries++ { - if backoff > 0 { - // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff - time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond) } - logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } - backoff = kinesisOutput.AddConcurrentRetries(1) - logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + currentRetries = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) } if tries > 0 { kinesisOutput.AddConcurrentRetries(int64(-tries)) From 898263f7f6166ff711f207811cd9ca3bf0fb095e Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Tue, 2 Jun 2020 14:14:48 -0400 Subject: [PATCH 32/40] Address code review. Make limitsExceeded var to ensure message is valid. Signed-off-by: Zack Wine --- kinesis/kinesis.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d11b7ae..dd12574 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -295,6 +295,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { var retCode int = fluentbit.FLB_OK + var limitsExceeded bool if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) @@ -314,10 +315,11 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { retCode = fluentbit.FLB_RETRY + limitsExceeded = true } } - if retCode == fluentbit.FLB_RETRY { + if limitsExceeded { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) } From 1ea9c1757ace116f3db6b0fd9fa15ca7f3f90f6c Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Wed, 10 Jun 2020 15:24:11 -0400 Subject: [PATCH 33/40] Add concurrency option, and make concurrent goroutine flushes optional. --- README.md | 3 +- fluent-bit-kinesis.go | 62 ++++------------------- kinesis/kinesis.go | 115 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 119 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 69d94d5..effddca 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,8 @@ If you think you’ve found a potential security issue, please do not post it in * `endpoint`: Specify a custom endpoint for the Kinesis Streams API. * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. -* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. +* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. +* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `concurrency` is set to `0` and records are flushed in the context of the calling fluentbit coroutine. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 5516498..3cfb54e 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -33,8 +33,7 @@ const ( ) const ( - retries = 6 - concurrentRetryLimit = 4 + retries = 6 ) var ( @@ -77,6 +76,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) + concurrency := output.FLBPluginConfigKey(ctx, "concurrency") + logrus.Infof("[firehose %d] plugin parameter concurrency = '%s'\n", pluginID, concurrency) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -94,7 +95,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if strings.ToLower(appendNewline) == "true" { appendNL = true } - return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID) + return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, concurrency, timeKeyFmt, appendNL, pluginID) } // The "export" comments have syntactic meaning @@ -120,49 +121,20 @@ func FLBPluginInit(ctx unsafe.Pointer) int { func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { kinesisOutput := getPluginInstance(ctx) - curRetries := kinesisOutput.GetConcurrentRetries() - if curRetries > concurrentRetryLimit { - logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries) - return output.FLB_RETRY - } + fluentTag := C.GoString(tag) events, count, retCode := unpackRecords(kinesisOutput, data, length) if retCode != output.FLB_OK { - logrus.Errorf("[kinesis] failed to unpackRecords\n") + logrus.Errorf("[kinesis %d] failed to unpackRecords with tag: %s\n", kinesisOutput.PluginID, fluentTag) return retCode } - go flushWithRetries(kinesisOutput, tag, count, events, retries) - return output.FLB_OK -} - -func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { - var retCode, tries int - - currentRetries := kinesisOutput.GetConcurrentRetries() - for tries = 0; tries < retries; tries++ { - if currentRetries > 0 { - // Wait if other goroutines are retrying, as well as implement a progressive backoff - time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond) - } - - logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) - retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) - if retCode != output.FLB_RETRY { - break - } - currentRetries = kinesisOutput.AddConcurrentRetries(1) - logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) - } - if tries > 0 { - kinesisOutput.AddConcurrentRetries(int64(-tries)) - } - if retCode == output.FLB_ERROR { - logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records)) - } - if retCode == output.FLB_RETRY { - logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries) + logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag) + if kinesisOutput.Concurrency > 0 { + return kinesisOutput.FlushConcurrent(count, events, retries) } + + return kinesisOutput.Flush(&events) } func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) { @@ -206,18 +178,6 @@ func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, len return records, count, output.FLB_OK } -func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int { - fluentTag := C.GoString(tag) - logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - retCode := kinesisOutput.Flush(records) - if retCode != output.FLB_OK { - return retCode - } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) - - return output.FLB_OK -} - //export FLBPluginExit func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index dd12574..f0453f3 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "strconv" "sync/atomic" "time" @@ -31,6 +32,7 @@ import ( "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/fluent/fluent-bit-go/output" fluentbit "github.com/fluent/fluent-bit-go/output" jsoniter "github.com/json-iterator/go" "github.com/lestrrat-go/strftime" @@ -83,12 +85,15 @@ type OutputPlugin struct { timer *plugins.Timeout PluginID int random *random + Concurrency int32 + // Concurrency is the limit, goroutineCount represents the running goroutines + goroutineCount int32 // Used to implement backoff for concurrent flushes - concurrentRetries int64 + concurrentRetries uint32 } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, concurrency, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordsClient(roleARN, region, endpoint) if err != nil { return nil, err @@ -122,6 +127,19 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } } + var concurrencyInt int + if concurrency != "" { + concurrencyInt, err = strconv.Atoi(concurrency) + if err != nil { + logrus.Errorf("[kinesis %d] Invalid 'concurrency' value %s specified: %v", pluginID, concurrency, err) + return nil, err + } + if concurrencyInt < 0 { + logrus.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) + return nil, err + } + } + return &OutputPlugin{ stream: stream, client: client, @@ -133,6 +151,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timer: timer, PluginID: pluginID, random: random, + Concurrency: int32(concurrencyInt), }, nil } @@ -200,7 +219,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { - // Each flush must have its own output buffe r, since flushes can be concurrent + // Use a different buffer to batch the logs requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) dataLength := 0 @@ -230,11 +249,79 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } + + if retCode == output.FLB_OK { + logrus.Debugf("[kinesis %d] Flushed %d logs\n", outputPlugin.PluginID, len(*records)) + } + // requestBuf will contain records sendCurrentBatch failed to send *records = requestBuf return retCode } +// FlushWithRetries sends the current buffer of log records, with retries +func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry, retries int) { + var retCode, tries int + + currentRetries := outputPlugin.getConcurrentRetries() + outputPlugin.addGoroutineCount(1) + + for tries = 0; tries < retries; tries++ { + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + if currentRetries > uint32(retries) { + time.Sleep(time.Duration((1< 0 { + outputPlugin.addConcurrentRetries(-tries) + } + + switch retCode { + case output.FLB_ERROR: + logrus.Errorf("[kinesis %d] Failed to send (%d) records with error", outputPlugin.PluginID, len(records)) + case output.FLB_RETRY: + logrus.Errorf("[kinesis %d] Failed to send (%d) records after retries %d", outputPlugin.PluginID, len(records), retries) + case output.FLB_OK: + logrus.Debugf("[kinesis %d] Flushed %d records\n", outputPlugin.PluginID, count) + } +} + +// FlushConcurrent sends the current buffer of log records in a goroutine with retries +// Returns FLB_OK, FLB_RETRY +// Will return FLB_RETRY if the limit of concurrency has been reached +func (outputPlugin *OutputPlugin) FlushConcurrent(count int, records []*kinesis.PutRecordsRequestEntry, retries int) int { + + runningGoRoutines := outputPlugin.getGoroutineCount() + if runningGoRoutines+1 > outputPlugin.Concurrency { + logrus.Infof("[kinesis %d] flush returning retry, concurrency limit reached (%d)\n", outputPlugin.PluginID, runningGoRoutines) + return output.FLB_RETRY + } + + curRetries := outputPlugin.getConcurrentRetries() + if curRetries > 0 { + logrus.Infof("[kinesis %d] flush returning retry, kinesis retries in progress (%d)\n", outputPlugin.PluginID, curRetries) + return output.FLB_RETRY + } + + go outputPlugin.FlushWithRetries(count, records, retries) + + return output.FLB_OK +} + func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { if outputPlugin.dataKeys != "" { record = plugins.DataKeys(outputPlugin.dataKeys, record) @@ -380,12 +467,22 @@ func stringOrByteArray(v interface{}) string { } } -// GetConcurrentRetries value (goroutine safe) -func (outputPlugin *OutputPlugin) GetConcurrentRetries() int64 { - return atomic.LoadInt64(&outputPlugin.concurrentRetries) +// getConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) getConcurrentRetries() uint32 { + return atomic.LoadUint32(&outputPlugin.concurrentRetries) +} + +// addConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) addConcurrentRetries(val int) uint32 { + return atomic.AddUint32(&outputPlugin.concurrentRetries, uint32(val)) +} + +// getConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) getGoroutineCount() int32 { + return atomic.LoadInt32(&outputPlugin.goroutineCount) } -// AddConcurrentRetries will update the value (goroutine safe) -func (outputPlugin *OutputPlugin) AddConcurrentRetries(val int64) int64 { - return atomic.AddInt64(&outputPlugin.concurrentRetries, int64(val)) +// addConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) addGoroutineCount(val int) int32 { + return atomic.AddInt32(&outputPlugin.goroutineCount, int32(val)) } From bf432b4aef3a44bf30e4f9d6a6ee0f2c6daa22bd Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Mon, 15 Jun 2020 15:13:11 -0400 Subject: [PATCH 34/40] Address code review. Update docs, and make retries configurable. --- README.md | 3 ++- fluent-bit-kinesis.go | 44 +++++++++++++++++++++++++++++++++++-------- kinesis/kinesis.go | 38 +++++++++++++------------------------ 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index effddca..ebc80ec 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,8 @@ If you think you’ve found a potential security issue, please do not post it in * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. -* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `concurrency` is set to `0` and records are flushed in the context of the calling fluentbit coroutine. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. +* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit as whole. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. +* `concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index c0f608d..4b8f644 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -26,14 +26,12 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +import "strconv" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords - maximumRecordsPerPut = 500 -) - -const ( - retries = 6 + maximumRecordsPerPut = 500 + defaultConcurrentRetries = 4 ) var ( @@ -78,6 +76,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) concurrency := output.FLBPluginConfigKey(ctx, "concurrency") logrus.Infof("[firehose %d] plugin parameter concurrency = '%s'\n", pluginID, concurrency) + concurrencyRetries := output.FLBPluginConfigKey(ctx, "concurrency_retries") + logrus.Infof("[firehose %d] plugin parameter concurrency_retries = '%s'\n", pluginID, concurrencyRetries) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -95,7 +95,36 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if strings.ToLower(appendNewline) == "true" { appendNL = true } - return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, concurrency, timeKeyFmt, appendNL, pluginID) + + var concurrencyInt, concurrencyRetriesInt int + var err error + if concurrency != "" { + concurrencyInt, err = strconv.Atoi(concurrency) + if err != nil { + logrus.Errorf("[kinesis %d] Invalid 'concurrency' value %s specified: %v", pluginID, concurrency, err) + return nil, err + } + if concurrencyInt < 0 { + logrus.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) + return nil, err + } + } + + if concurrencyRetries != "" { + concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries) + if err != nil { + logrus.Errorf("[kinesis %d] Invalid 'concurrency_retries' value %s specified: %v", pluginID, concurrencyRetries, err) + return nil, err + } + if concurrencyRetriesInt < 0 { + logrus.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) + return nil, err + } + } else { + concurrencyRetriesInt = defaultConcurrentRetries + } + + return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID) } // The "export" comments have syntactic meaning @@ -132,7 +161,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag) if kinesisOutput.Concurrency > 0 { - return kinesisOutput.FlushConcurrent(count, events, retries) + return kinesisOutput.FlushConcurrent(count, events) } return kinesisOutput.Flush(&events) @@ -179,7 +208,6 @@ func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, len return records, count, output.FLB_OK } - //export FLBPluginExit func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index f0453f3..97ee51f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,7 +21,6 @@ import ( "fmt" "math/rand" "os" - "strconv" "sync/atomic" "time" @@ -85,7 +84,8 @@ type OutputPlugin struct { timer *plugins.Timeout PluginID int random *random - Concurrency int32 + Concurrency int + retryLimit int // Concurrency is the limit, goroutineCount represents the running goroutines goroutineCount int32 // Used to implement backoff for concurrent flushes @@ -93,7 +93,7 @@ type OutputPlugin struct { } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, concurrency, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, concurrency, retryLimit int, appendNewline bool, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordsClient(roleARN, region, endpoint) if err != nil { return nil, err @@ -127,19 +127,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } } - var concurrencyInt int - if concurrency != "" { - concurrencyInt, err = strconv.Atoi(concurrency) - if err != nil { - logrus.Errorf("[kinesis %d] Invalid 'concurrency' value %s specified: %v", pluginID, concurrency, err) - return nil, err - } - if concurrencyInt < 0 { - logrus.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) - return nil, err - } - } - return &OutputPlugin{ stream: stream, client: client, @@ -151,7 +138,8 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timer: timer, PluginID: pluginID, random: random, - Concurrency: int32(concurrencyInt), + Concurrency: concurrency, + retryLimit: retryLimit, }, nil } @@ -260,17 +248,17 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt } // FlushWithRetries sends the current buffer of log records, with retries -func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry, retries int) { +func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry) { var retCode, tries int currentRetries := outputPlugin.getConcurrentRetries() outputPlugin.addGoroutineCount(1) - for tries = 0; tries < retries; tries++ { + for tries = 0; tries < outputPlugin.retryLimit; tries++ { if currentRetries > 0 { // Wait if other goroutines are retrying, as well as implement a progressive backoff - if currentRetries > uint32(retries) { - time.Sleep(time.Duration((1< uint32(outputPlugin.retryLimit) { + time.Sleep(time.Duration((1< outputPlugin.Concurrency { + if runningGoRoutines+1 > int32(outputPlugin.Concurrency) { logrus.Infof("[kinesis %d] flush returning retry, concurrency limit reached (%d)\n", outputPlugin.PluginID, runningGoRoutines) return output.FLB_RETRY } @@ -317,7 +305,7 @@ func (outputPlugin *OutputPlugin) FlushConcurrent(count int, records []*kinesis. return output.FLB_RETRY } - go outputPlugin.FlushWithRetries(count, records, retries) + go outputPlugin.FlushWithRetries(count, records) return output.FLB_OK } From a824c334f5f97d0db0a897f794209acdf8436089 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Tue, 23 Jun 2020 18:13:19 -0400 Subject: [PATCH 35/40] Address code review items. Added upper limit of 10 to concurrency. Removed the unused records parameter from getPartitionKey. Fixed improved error handling of configuration parameters. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 14 ++++++++------ kinesis/kinesis.go | 6 +++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index b0fd350..1d4788b 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -31,6 +31,7 @@ import "strconv" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords maximumRecordsPerPut = 500 + maximumConcurrency = 10 defaultConcurrentRetries = 4 ) @@ -105,20 +106,21 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, return nil, err } if concurrencyInt < 0 { - logrus.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) - return nil, err + return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) + } + + if concurrencyInt > maximumConcurrency { + return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency) } } if concurrencyRetries != "" { concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries) if err != nil { - logrus.Errorf("[kinesis %d] Invalid 'concurrency_retries' value %s specified: %v", pluginID, concurrencyRetries, err) - return nil, err + return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err) } if concurrencyRetriesInt < 0 { - logrus.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) - return nil, err + return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) } } else { concurrencyRetriesInt = defaultConcurrentRetries diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 29e34c1..242a873 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -189,7 +189,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(*records, record) + partitionKey := outputPlugin.getPartitionKey(record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -424,8 +424,8 @@ func (outputPlugin *OutputPlugin) randomString() string { } // getPartitionKey returns the value for a given valid key -// if the given key is emapty or invalid, it returns a random string -func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}) string { +// if the given key is empty or invalid, it returns a random string +func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { partitionKey := outputPlugin.partitionKey if partitionKey != "" { for k, v := range record { From e714a07303fc149b532ce8661e64e25eb41bc3ec Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Wed, 24 Jun 2020 14:20:48 -0400 Subject: [PATCH 36/40] Address code review items (2). Rename concurrency retires variable to concurrencyRetryLimit to be more consistent. Update docs to mention concurrency option maximum value (10). Signed-off-by: Zack Wine --- README.md | 2 +- kinesis/kinesis.go | 50 +++++++++++++++++++++++----------------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index ebc80ec..4e9f34d 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ If you think you’ve found a potential security issue, please do not post it in * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. -* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit as whole. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. +* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit as whole. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `concurrency` option is `10`. * `concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted. ### Permissions diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 242a873..7be85a4 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -77,15 +77,15 @@ type OutputPlugin struct { // Partition key decides in which shard of your stream the data belongs to partitionKey string // Decides whether to append a newline after each data record - appendNewline bool - timeKey string - fmtStrftime *strftime.Strftime - client PutRecordsClient - timer *plugins.Timeout - PluginID int - random *random - Concurrency int - retryLimit int + appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime + client PutRecordsClient + timer *plugins.Timeout + PluginID int + random *random + Concurrency int + concurrencyRetryLimit int // Concurrency is the limit, goroutineCount represents the running goroutines goroutineCount int32 // Used to implement backoff for concurrent flushes @@ -128,18 +128,18 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } return &OutputPlugin{ - stream: stream, - client: client, - dataKeys: dataKeys, - partitionKey: partitionKey, - appendNewline: appendNewline, - timeKey: timeKey, - fmtStrftime: timeFormatter, - timer: timer, - PluginID: pluginID, - random: random, - Concurrency: concurrency, - retryLimit: retryLimit, + stream: stream, + client: client, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, + timer: timer, + PluginID: pluginID, + random: random, + Concurrency: concurrency, + concurrencyRetryLimit: retryLimit, }, nil } @@ -254,11 +254,11 @@ func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis currentRetries := outputPlugin.getConcurrentRetries() outputPlugin.addGoroutineCount(1) - for tries = 0; tries < outputPlugin.retryLimit; tries++ { + for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ { if currentRetries > 0 { // Wait if other goroutines are retrying, as well as implement a progressive backoff - if currentRetries > uint32(outputPlugin.retryLimit) { - time.Sleep(time.Duration((1< uint32(outputPlugin.concurrencyRetryLimit) { + time.Sleep(time.Duration((1< Date: Thu, 9 Jul 2020 14:45:01 -0400 Subject: [PATCH 37/40] Address code review. Add data loss warning for concurrency. --- README.md | 2 +- fluent-bit-kinesis.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4e9f34d..3f519ab 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ If you think you’ve found a potential security issue, please do not post it in * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. -* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit as whole. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `concurrency` option is `10`. +* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `concurrency` option is `10`. WARNING: Enabling `concurrency` can lead to data loss if the retry count is reached. * `concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 1d4788b..b6b71ec 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -79,6 +79,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[firehose %d] plugin parameter concurrency = '%s'\n", pluginID, concurrency) concurrencyRetries := output.FLBPluginConfigKey(ctx, "concurrency_retries") logrus.Infof("[firehose %d] plugin parameter concurrency_retries = '%s'\n", pluginID, concurrencyRetries) + aggregated := output.FLBPluginConfigKey(ctx, "aggregated") + logrus.Infof("[firehose %d] plugin parameter aggregated = '%s'\n", pluginID, aggregated) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -112,6 +114,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if concurrencyInt > maximumConcurrency { return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency) } + + if concurrencyInt > 0 { + logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'concurrency_retries' is reached data will be lost.", pluginID) + } } if concurrencyRetries != "" { From 53cd75776fcee8f2e1e0da949289be54c9850124 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 10 Jul 2020 15:18:38 -0400 Subject: [PATCH 38/40] Address code review. Rename the concurrency feature experimental. --- README.md | 4 ++-- fluent-bit-kinesis.go | 22 ++++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 3f519ab..78fa1b7 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ If you think you’ve found a potential security issue, please do not post it in * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. -* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `concurrency` option is `10`. WARNING: Enabling `concurrency` can lead to data loss if the retry count is reached. -* `concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted. +* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU). +* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index b6b71ec..cdc74e7 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -75,12 +75,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) - concurrency := output.FLBPluginConfigKey(ctx, "concurrency") - logrus.Infof("[firehose %d] plugin parameter concurrency = '%s'\n", pluginID, concurrency) - concurrencyRetries := output.FLBPluginConfigKey(ctx, "concurrency_retries") - logrus.Infof("[firehose %d] plugin parameter concurrency_retries = '%s'\n", pluginID, concurrencyRetries) - aggregated := output.FLBPluginConfigKey(ctx, "aggregated") - logrus.Infof("[firehose %d] plugin parameter aggregated = '%s'\n", pluginID, aggregated) + concurrency := output.FLBPluginConfigKey(ctx, "experimental_concurrency") + logrus.Infof("[firehose %d] plugin parameter experimental_concurrency = '%s'\n", pluginID, concurrency) + concurrencyRetries := output.FLBPluginConfigKey(ctx, "experimental_concurrency_retries") + logrus.Infof("[firehose %d] plugin parameter experimental_concurrency_retries = '%s'\n", pluginID, concurrencyRetries) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -104,29 +102,29 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if concurrency != "" { concurrencyInt, err = strconv.Atoi(concurrency) if err != nil { - logrus.Errorf("[kinesis %d] Invalid 'concurrency' value %s specified: %v", pluginID, concurrency, err) + logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err) return nil, err } if concurrencyInt < 0 { - return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) } if concurrencyInt > maximumConcurrency { - return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency) + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency) } if concurrencyInt > 0 { - logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'concurrency_retries' is reached data will be lost.", pluginID) + logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'experimental_concurrency_retries' is reached data will be lost.", pluginID) } } if concurrencyRetries != "" { concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries) if err != nil { - return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err) + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err) } if concurrencyRetriesInt < 0 { - return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) } } else { concurrencyRetriesInt = defaultConcurrentRetries From 8d294d584492349ac54dfca5c2be06f4f0a60d1a Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 10 Jul 2020 15:55:23 -0400 Subject: [PATCH 39/40] Clean up imports. --- fluent-bit-kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index cdc74e7..aa8f1f5 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -16,6 +16,7 @@ package main import ( "C" "fmt" + "strconv" "strings" "time" "unsafe" @@ -26,7 +27,6 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) -import "strconv" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords From 7d52cb244a1f4410b2e0527603f5bfbec0aedd95 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 10 Jul 2020 16:42:27 -0400 Subject: [PATCH 40/40] Fix unit tests. --- kinesis/kinesis_test.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 1cff431..20149a3 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -18,7 +18,6 @@ import ( // newMockOutputPlugin creates an mock OutputPlugin object func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlugin, error) { - records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) timer, _ := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis] timeout threshold reached: Failed to send logs for %v", d) @@ -34,15 +33,13 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug } return &OutputPlugin{ - stream: "stream", - client: client, - records: records, - dataKeys: "", - partitionKey: "", - lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: 0, - random: random, + stream: "stream", + client: client, + dataKeys: "", + partitionKey: "", + timer: timer, + PluginID: 0, + random: random, }, nil } @@ -67,6 +64,8 @@ func TestStringOrByteArray(t *testing.T) { } func TestAddRecord(t *testing.T) { + records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) + record := map[interface{}]interface{}{ "testkey": []byte("test value"), } @@ -74,12 +73,14 @@ func TestAddRecord(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(nil) timeStamp := time.Now() - retCode := outputPlugin.AddRecord(record, &timeStamp) + retCode := outputPlugin.AddRecord(&records, record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record") + assert.Len(t, records, 1, "Expected output to contain 1 record") } func TestAddRecordAndFlush(t *testing.T) { + records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) + record := map[interface{}]interface{}{ "testkey": []byte("test value"), } @@ -94,9 +95,9 @@ func TestAddRecordAndFlush(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(mockKinesis) timeStamp := time.Now() - retCode := outputPlugin.AddRecord(record, &timeStamp) + retCode := outputPlugin.AddRecord(&records, record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - retCode = outputPlugin.Flush() + retCode = outputPlugin.Flush(&records) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") }