Skip to content

Commit

Permalink
chore: add filelog cases for memory limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Nov 29, 2024
1 parent 539042d commit 667606c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 26 deletions.
2 changes: 2 additions & 0 deletions testbed/datasenders/stanza.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (f *FileLogWriter) GenConfigYAMLStr() string {
// We are testing stanza receiver here.
return fmt.Sprintf(`
filelog:
retry_on_failure:
enabled: true
include: [ %s ]
start_at: beginning
operators:
Expand Down
68 changes: 45 additions & 23 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,41 +370,63 @@ func TestLargeFileOnce(t *testing.T) {
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
tests := []struct {
name string
sender func() testbed.DataSender
receiver func() testbed.DataReceiver
}{
{
name: "otlp",
sender: func() testbed.DataSender {
return testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
},
},
{
name: "filelog",
sender: func() testbed.DataSender {
return datasenders.NewFileLogWriter()
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
},
},
}
ScenarioMemoryLimiterHit(
t,
test.sender(),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
})
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
}
30 changes: 27 additions & 3 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"path"
"path/filepath"
"regexp"
"testing"
"time"

Expand All @@ -23,6 +24,8 @@ import (
)

var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
var batchRegex = regexp.MustCompile(` batch_index=(\S+) `)
var itemRegex = regexp.MustCompile(` item_index=(\S+) `)

type ProcessorNameAndConfigBody struct {
Name string
Expand Down Expand Up @@ -654,9 +657,8 @@ func getLogsID(logToRetry []plog.Logs) []string {
logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for index := 0; index < logRecord.Len(); index++ {
logObj := logRecord.At(index)
itemIndex, _ := logObj.Attributes().Get("item_index")
batchIndex, _ := logObj.Attributes().Get("batch_index")
result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString()))
itemIndex, batchIndex := extractIdFromLog(logObj)
result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex))
}
}
return result
Expand All @@ -680,3 +682,25 @@ func allElementsExistInSlice(slice1, slice2 []string) bool {

return true
}

// in case of filelog receiver, the batch_index and item_index are a part of log body.
// we use regex to extract them
func extractIdFromLog(log plog.LogRecord) (string, string) {

Check failure on line 688 in testbed/tests/scenarios.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, other)

var-naming: func extractIdFromLog should be extractIDFromLog (revive)

Check failure on line 688 in testbed/tests/scenarios.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, other)

var-naming: func extractIdFromLog should be extractIDFromLog (revive)
var batch, item string
match := batchRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
match = itemRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
// in case of otlp recevier, batch_index and item_index are part of attributes.
if batchIndex, ok := log.Attributes().Get("batch_index"); ok {
batch = batchIndex.AsString()
}
if itemIndex, ok := log.Attributes().Get("item_index"); ok {
item = itemIndex.AsString()
}
return batch, item
}

0 comments on commit 667606c

Please sign in to comment.