Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/elasticsearch] Test metrics in integration tests #36455

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -32,12 +33,17 @@ import (
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// TestLogsIndex is used by the mock ES data receiver to identify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// TestMetricsIndex is used by the mock ES data receiver to identify metric events.
// Exporter MetricsIndex configuration must be configured with TestMetricsIndex for
// the data receiver to work properly
TestMetricsIndex = "metrics-test-idx"

// TestTracesIndex is used by the mock ES data receiver to identify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
Expand Down Expand Up @@ -79,11 +85,12 @@ func withBatcherEnabled(enabled bool) dataReceiverOption {
}
}

func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
esURL, err := url.Parse(es.endpoint)
Expand All @@ -101,13 +108,18 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu
if err != nil {
return fmt.Errorf("failed to create logs receiver: %w", err)
}
metricsReceiver, err := factory.CreateMetrics(context.Background(), set, cfg, mc)
if err != nil {
return fmt.Errorf("failed to create metrics receiver: %w", err)
}
tracesReceiver, err := factory.CreateTraces(context.Background(), set, cfg, tc)
if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}

// Since we use SharedComponent both receivers should be same
require.Same(es.t, logsReceiver, tracesReceiver)
require.Same(es.t, logsReceiver, metricsReceiver)
es.receiver = logsReceiver

return es.receiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -126,15 +138,22 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
elasticsearch:
endpoints: [%s]
logs_index: %s
logs_dynamic_index:
enabled: false
metrics_index: %s
metrics_dynamic_index:
enabled: false
traces_index: %s
traces_dynamic_index:
enabled: false
sending_queue:
enabled: true
retry:
enabled: true
initial_interval: 100ms
max_interval: 1s
max_requests: 10000`,
es.endpoint, TestLogsIndex, TestTracesIndex,
es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex,
)

if es.batcherEnabled == nil {
Expand Down Expand Up @@ -189,6 +208,19 @@ func createLogsReceiver(
return receiver, nil
}

func createMetricsReceiver(
_ context.Context,
params receiver.Settings,
rawCfg component.Config,
next consumer.Metrics,
) (receiver.Metrics, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).metricsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.Settings,
Expand All @@ -206,8 +238,9 @@ type mockESReceiver struct {
params receiver.Settings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs
tracesConsumer consumer.Traces
logsConsumer consumer.Logs
metricsConsumer consumer.Metrics

server *http.Server
}
Expand All @@ -231,10 +264,12 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// translation. Instead we use a pre-initialized empty models to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyMetrics := pmetric.NewMetrics()
emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()

Expand All @@ -260,6 +295,8 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestMetricsIndex:
consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func BenchmarkExporter(b *testing.B) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, mappingMode := range []string{"none", "ecs", "raw"} {
for _, tc := range []struct {
name string
Expand All @@ -41,6 +41,8 @@ func BenchmarkExporter(b *testing.B) {
switch eventType {
case "logs":
benchmarkLogs(b, tc.batchSize, mappingMode)
case "metrics":
benchmarkMetrics(b, tc.batchSize, mappingMode)
case "traces":
benchmarkTraces(b, tc.batchSize, mappingMode)
}
Expand Down Expand Up @@ -79,6 +81,35 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) {
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) {
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exporterSettings := exportertest.NewNopSettings()
exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel))
runnerCfg := prepareBenchmark(b, batchSize, mappingMode)
exporter, err := runnerCfg.factory.CreateMetrics(
ctx, exporterSettings, runnerCfg.esCfg,
)
require.NoError(b, err)
require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost()))

b.ReportAllocs()
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
metrics, _ := runnerCfg.provider.GenerateMetrics()
b.StartTimer()
require.NoError(b, exporter.ConsumeMetrics(ctx, metrics))
b.StopTimer()
}
b.ReportMetric(
float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(),
"events/s",
)
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -134,7 +165,11 @@ func prepareBenchmark(
cfg.esCfg.Mapping.Mode = mappingMode
cfg.esCfg.Endpoints = []string{receiver.endpoint}
cfg.esCfg.LogsIndex = TestLogsIndex
cfg.esCfg.LogsDynamicIndex.Enabled = false
cfg.esCfg.MetricsIndex = TestMetricsIndex
cfg.esCfg.MetricsDynamicIndex.Enabled = false
cfg.esCfg.TracesIndex = TestTracesIndex
cfg.esCfg.TracesDynamicIndex.Enabled = false
cfg.esCfg.Flush.Interval = 10 * time.Millisecond
cfg.esCfg.NumWorkers = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func TestExporter(t *testing.T) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, tc := range []struct {
name string

Expand Down Expand Up @@ -68,6 +68,8 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool
switch eventType {
case "logs":
sender = testbed.NewOTLPLogsDataSender(host, port)
case "metrics":
sender = testbed.NewOTLPMetricDataSender(host, port)
case "traces":
sender = testbed.NewOTLPTraceDataSender(host, port)
default:
Expand Down