Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][testbed] add filelog as a part of test cases #36603

Merged
merged 8 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,41 +371,66 @@ func TestLargeFileOnce(t *testing.T) {
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
tests := []struct {
name string
sender func() testbed.DataSender
receiver func() testbed.DataReceiver
}{
{
name: "otlp",
sender: func() testbed.DataSender {
return testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
},
},
{
name: "filelog",
sender: func() testbed.DataSender {
return datasenders.NewFileLogWriter().WithRetry(`
retry_on_failure:
enabled: true
`)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
},
},
}
ScenarioMemoryLimiterHit(
t,
test.sender(),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
})
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
}
34 changes: 30 additions & 4 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"path"
"path/filepath"
"regexp"
"testing"
"time"

Expand All @@ -22,7 +23,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
var (
batchRegex = regexp.MustCompile(` batch_index=(\S+) `)
itemRegex = regexp.MustCompile(` item_index=(\S+) `)
performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
)

type ProcessorNameAndConfigBody struct {
Name string
Expand Down Expand Up @@ -658,9 +663,8 @@ func getLogsID(logToRetry []plog.Logs) []string {
logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for index := 0; index < logRecord.Len(); index++ {
logObj := logRecord.At(index)
itemIndex, _ := logObj.Attributes().Get("item_index")
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
batchIndex, _ := logObj.Attributes().Get("batch_index")
result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString()))
itemIndex, batchIndex := extractIDFromLog(logObj)
result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex))
}
}
return result
Expand All @@ -684,3 +688,25 @@ func allElementsExistInSlice(slice1, slice2 []string) bool {

return true
}

// in case of filelog receiver, the batch_index and item_index are a part of log body.
// we use regex to extract them
func extractIDFromLog(log plog.LogRecord) (string, string) {
var batch, item string
match := batchRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
match = itemRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
// in case of otlp receiver, batch_index and item_index are part of attributes.
if batchIndex, ok := log.Attributes().Get("batch_index"); ok {
batch = batchIndex.AsString()
}
if itemIndex, ok := log.Attributes().Get("item_index"); ok {
item = itemIndex.AsString()
}
return batch, item
}
Loading