From ace990a8b2d29dcdbf799cedfd2d50dce7315fa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Fri, 13 Sep 2024 17:53:13 +0200 Subject: [PATCH] [testbed] Add batcher performance tests --- .chloggen/test_testbed-batcher.yaml | 27 +++ cmd/oteltestbedcol/builder-config.yaml | 3 +- testbed/stabilitytests/metric_test.go | 4 + testbed/stabilitytests/trace_test.go | 6 + testbed/testbed/receivers.go | 9 +- testbed/tests/batcher_test.go | 259 +++++++++++++++++++++++++ testbed/tests/log_test.go | 1 + testbed/tests/metric_test.go | 1 + testbed/tests/scenarios.go | 17 +- testbed/tests/trace_test.go | 1 + 10 files changed, 319 insertions(+), 9 deletions(-) create mode 100644 .chloggen/test_testbed-batcher.yaml create mode 100644 testbed/tests/batcher_test.go diff --git a/.chloggen/test_testbed-batcher.yaml b/.chloggen/test_testbed-batcher.yaml new file mode 100644 index 000000000000..7d3c989362cd --- /dev/null +++ b/.chloggen/test_testbed-batcher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: testbed + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add batcher performance tests + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36206] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/oteltestbedcol/builder-config.yaml b/cmd/oteltestbedcol/builder-config.yaml index 14f880b25b51..ea92051d377c 100644 --- a/cmd/oteltestbedcol/builder-config.yaml +++ b/cmd/oteltestbedcol/builder-config.yaml @@ -33,7 +33,8 @@ processors: - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.113.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.113.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.113.0 - + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.113.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.113.0 receivers: - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.113.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.113.0 diff --git a/testbed/stabilitytests/metric_test.go b/testbed/stabilitytests/metric_test.go index 6a68229d4c6d..05d310bfdc2c 100644 --- a/testbed/stabilitytests/metric_test.go +++ b/testbed/stabilitytests/metric_test.go @@ -26,6 +26,7 @@ func TestStabilityMetricsOTLP(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -42,6 +43,7 @@ func TestStabilityMetricsOpenCensus(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -58,6 +60,7 @@ func TestStabilityMetricsCarbon(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -74,5 +77,6 @@ func TestStabilityMetricsSignalFx(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } diff --git a/testbed/stabilitytests/trace_test.go b/testbed/stabilitytests/trace_test.go index 2006b664b855..85eeca8d366e 100644 --- a/testbed/stabilitytests/trace_test.go +++ b/testbed/stabilitytests/trace_test.go @@ -52,6 +52,7 @@ func TestStabilityTracesOpenCensus(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -68,6 +69,7 @@ func TestStabilityTracesSAPM(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -84,6 +86,7 @@ func TestStabilityTracesOTLP(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -100,6 +103,7 @@ func TestStabilityTracesJaegerGRPC(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -116,6 +120,7 @@ func TestStabilityTracesZipkin(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -132,5 +137,6 @@ func TestStabilityTracesDatadog(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 3c7b161e2547..4b6c7e994ef1 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct { compression string retry string sendingQueue string + batcher string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec return bor } +func (bor *BaseOTLPDataReceiver) WithBatcher(batcher string) *BaseOTLPDataReceiver { + bor.batcher = batcher + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { endpoint: "%s" %s %s + %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.batcher) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/tests/batcher_test.go b/testbed/tests/batcher_test.go new file mode 100644 index 000000000000..4952914230fa --- /dev/null +++ b/testbed/tests/batcher_test.go @@ -0,0 +1,259 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package tests contains test cases. To run the tests go to tests directory and run: +// RUN_TESTBED=1 go test -v + +//go:build batcher +// +build batcher + +package tests + +// The tests in this file measure the effect of batching on collector performance. +// Their primary intent is to measure the performance impact of https://github.com/open-telemetry/opentelemetry-collector/issues/8122. + +import ( + "fmt" + "slices" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" +) + +type batcherTestSpec struct { + name string + withQueue bool + withBatchProcessor bool + withExporterBatcher bool + batchSize int + processors []ProcessorNameAndConfigBody + resourceSpec testbed.ResourceSpec + extensions map[string]string +} + +func TestLog10kDPSNoProcessors(t *testing.T) { + tests := []batcherTestSpec{ + { + name: "No batching, no queue", + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "No batching, queue", + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, no queue", + batchSize: 1000, + withBatchProcessor: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, queue", + batchSize: 1000, + withBatchProcessor: true, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, no queue", + withExporterBatcher: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, queue", + withExporterBatcher: true, + withQueue: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: true +`) + if test.withQueue { + receiver.WithQueue(` + sending_queue: + enabled: true +`) + } + + if test.withExporterBatcher { + receiver.WithBatcher(fmt.Sprintf(` + batcher: + enabled: true + min_size_items: %d +`, test.batchSize)) + } + + processors := slices.Clone(test.processors) + if test.withBatchProcessor { + processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ + Name: "batch", + Body: fmt.Sprintf(` + batch: + send_batch_size: %d +`, test.batchSize), + }) + } + loadOptions := &testbed.LoadOptions{ + Parallel: 10, + ItemsPerBatch: 10, + } + Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, processors, test.extensions, loadOptions) + }) + } +} + +func TestLog10kDPSWithProcessors(t *testing.T) { + processors := []ProcessorNameAndConfigBody{ + { + Name: "filter", + Body: ` + filter: + logs: + log_record: + - not IsMatch(attributes["batch_index"], "batch_.+") +`, + }, + { + Name: "transform", + Body: ` + transform: + log_statements: + - context: log + statements: + - set(resource.attributes["batch_index"], attributes["batch_index"]) + - set(attributes["counter"], ExtractPatterns(body, "Load Generator Counter (?P.+)")) +`, + }, + } + tests := []batcherTestSpec{ + { + name: "No batching, no queue", + processors: processors, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "No batching, queue", + processors: processors, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, no queue", + processors: processors, + batchSize: 1000, + withBatchProcessor: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, queue", + batchSize: 1000, + withBatchProcessor: true, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, no queue", + processors: processors, + withExporterBatcher: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, queue", + processors: processors, + withExporterBatcher: true, + withQueue: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: true +`) + if test.withQueue { + receiver.WithQueue(` + sending_queue: + enabled: true + queue_size: 10 +`) + } + + if test.withExporterBatcher { + receiver.WithBatcher(fmt.Sprintf(` + batcher: + enabled: true + min_size_items: %d +`, test.batchSize)) + } + + processors := slices.Clone(test.processors) + if test.withBatchProcessor { + processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ + Name: "batch", + Body: fmt.Sprintf(` + batch: + send_batch_size: %d +`, test.batchSize), + }) + } + loadOptions := &testbed.LoadOptions{ + Parallel: 10, + ItemsPerBatch: 10, + } + Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, test.processors, test.extensions, loadOptions) + }) + } +} diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 41cb83ed0797..b3f003f59b84 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -178,6 +178,7 @@ func TestLog10kDPS(t *testing.T) { performanceResultsSummary, processors, test.extensions, + nil, ) }) } diff --git a/testbed/tests/metric_test.go b/testbed/tests/metric_test.go index 6eb8b7fd9829..2de78612cceb 100644 --- a/testbed/tests/metric_test.go +++ b/testbed/tests/metric_test.go @@ -88,6 +88,7 @@ func TestMetric10kDPS(t *testing.T) { performanceResultsSummary, nil, nil, + nil, ) }) } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 6e8efe229a4e..f83dee3d8ca7 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -137,15 +137,18 @@ func Scenario10kItemsPerSecond( resultsSummary testbed.TestResultsSummary, processors []ProcessorNameAndConfigBody, extensions map[string]string, -) { + loadOptions *testbed.LoadOptions) { resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) - options := testbed.LoadOptions{ - DataItemsPerSecond: 10_000, - ItemsPerBatch: 100, - Parallel: 1, + if loadOptions == nil { + loadOptions = &testbed.LoadOptions{ + ItemsPerBatch: 100, + Parallel: 1, + } } + loadOptions.DataItemsPerSecond = 10_000 + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions) @@ -153,7 +156,7 @@ func Scenario10kItemsPerSecond( require.NoError(t, err) defer configCleanup() - dataProvider := testbed.NewPerfTestDataProvider(options) + dataProvider := testbed.NewPerfTestDataProvider(*loadOptions) tc := testbed.NewTestCase( t, dataProvider, @@ -169,7 +172,7 @@ func Scenario10kItemsPerSecond( tc.StartBackend() tc.StartAgent() - tc.StartLoad(options) + tc.StartLoad(*loadOptions) tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 8f53311d5419..7b6007727e26 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -149,6 +149,7 @@ func TestTrace10kSPS(t *testing.T) { performanceResultsSummary, processors, nil, + nil, ) }) }