Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] Test metrics in integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Nov 19, 2024
1 parent b14f1d4 commit a8f6328
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
51 changes: 44 additions & 7 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -32,12 +33,17 @@ import (
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// TestLogsIndex is used by the mock ES data receiver to identify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// TestMetricsIndex is used by the mock ES data receiver to identify metric events.
// Exporter MetricsIndex configuration must be configured with TestMetricsIndex for
// the data receiver to work properly
TestMetricsIndex = "metrics-test-idx"

// TestTracesIndex is used by the mock ES data receiver to identify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
Expand Down Expand Up @@ -79,11 +85,12 @@ func withBatcherEnabled(enabled bool) dataReceiverOption {
}
}

func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
esURL, err := url.Parse(es.endpoint)
Expand All @@ -101,13 +108,18 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu
if err != nil {
return fmt.Errorf("failed to create logs receiver: %w", err)
}
metricsReceiver, err := factory.CreateMetrics(context.Background(), set, cfg, mc)
if err != nil {
return fmt.Errorf("failed to create metrics receiver: %w", err)
}
tracesReceiver, err := factory.CreateTraces(context.Background(), set, cfg, tc)
if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}

// Since we use SharedComponent both receivers should be same
require.Same(es.t, logsReceiver, tracesReceiver)
require.Same(es.t, logsReceiver, metricsReceiver)
es.receiver = logsReceiver

return es.receiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -126,15 +138,22 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
elasticsearch:
endpoints: [%s]
logs_index: %s
logs_dynamic_index:
enabled: false
metrics_index: %s
metrics_dynamic_index:
enabled: false
traces_index: %s
traces_dynamic_index:
enabled: false
sending_queue:
enabled: true
retry:
enabled: true
initial_interval: 100ms
max_interval: 1s
max_requests: 10000`,
es.endpoint, TestLogsIndex, TestTracesIndex,
es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex,
)

if es.batcherEnabled == nil {
Expand Down Expand Up @@ -189,6 +208,19 @@ func createLogsReceiver(
return receiver, nil
}

func createMetricsReceiver(
_ context.Context,
params receiver.Settings,
rawCfg component.Config,
next consumer.Metrics,
) (receiver.Metrics, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).metricsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.Settings,
Expand All @@ -206,8 +238,9 @@ type mockESReceiver struct {
params receiver.Settings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs
tracesConsumer consumer.Traces
logsConsumer consumer.Logs
metricsConsumer consumer.Metrics

server *http.Server
}
Expand All @@ -231,10 +264,12 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// translation. Instead we use a pre-initialized empty models to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyMetrics := pmetric.NewMetrics()
emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()

Expand All @@ -260,6 +295,8 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestMetricsIndex:
consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func BenchmarkExporter(b *testing.B) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, mappingMode := range []string{"none", "ecs", "raw"} {
for _, tc := range []struct {
name string
Expand All @@ -41,6 +41,8 @@ func BenchmarkExporter(b *testing.B) {
switch eventType {
case "logs":
benchmarkLogs(b, tc.batchSize, mappingMode)
case "metrics":
benchmarkMetrics(b, tc.batchSize, mappingMode)
case "traces":
benchmarkTraces(b, tc.batchSize, mappingMode)
}
Expand Down Expand Up @@ -79,6 +81,35 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) {
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exporterSettings := exportertest.NewNopSettings()
exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel))
runnerCfg := prepareBenchmark(b, batchSize, mappingMode)
exporter, err := runnerCfg.factory.CreateMetrics(
ctx, exporterSettings, runnerCfg.esCfg,
)
require.NoError(b, err)
require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost()))

b.ReportAllocs()
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
metrics, _ := runnerCfg.provider.GenerateMetrics()
b.StartTimer()
require.NoError(b, exporter.ConsumeMetrics(ctx, metrics))
b.StopTimer()
}
b.ReportMetric(
float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(),
"events/s",
)
require.NoError(b, exporter.Shutdown(ctx))
}

func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -134,7 +165,11 @@ func prepareBenchmark(
cfg.esCfg.Mapping.Mode = mappingMode
cfg.esCfg.Endpoints = []string{receiver.endpoint}
cfg.esCfg.LogsIndex = TestLogsIndex
cfg.esCfg.LogsDynamicIndex.Enabled = false
cfg.esCfg.MetricsIndex = TestMetricsIndex
cfg.esCfg.MetricsDynamicIndex.Enabled = false
cfg.esCfg.TracesIndex = TestTracesIndex
cfg.esCfg.TracesDynamicIndex.Enabled = false
cfg.esCfg.Flush.Interval = 10 * time.Millisecond
cfg.esCfg.NumWorkers = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func TestExporter(t *testing.T) {
for _, eventType := range []string{"logs", "traces"} {
for _, eventType := range []string{"logs", "metrics", "traces"} {
for _, tc := range []struct {
name string

Expand Down Expand Up @@ -68,6 +68,8 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool
switch eventType {
case "logs":
sender = testbed.NewOTLPLogsDataSender(host, port)
case "metrics":
sender = testbed.NewOTLPMetricDataSender(host, port)
case "traces":
sender = testbed.NewOTLPTraceDataSender(host, port)
default:
Expand Down

0 comments on commit a8f6328

Please sign in to comment.