diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 79a2d46dc3d4..0039b1fd893e 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -32,12 +33,17 @@ import ( ) const ( - // TestLogsIndex is used by the mock ES data receiver to indentify log events. + // TestLogsIndex is used by the mock ES data receiver to identify log events. // Exporter LogsIndex configuration must be configured with TestLogsIndex for // the data receiver to work properly TestLogsIndex = "logs-test-idx" - // TestTracesIndex is used by the mock ES data receiver to indentify trace + // TestMetricsIndex is used by the mock ES data receiver to identify metric events. + // Exporter MetricsIndex configuration must be configured with TestMetricsIndex for + // the data receiver to work properly + TestMetricsIndex = "metrics-test-idx" + + // TestTracesIndex is used by the mock ES data receiver to identify trace // events. Exporter TracesIndex configuration must be configured with // TestTracesIndex for the data receiver to work properly TestTracesIndex = "traces-test-idx" @@ -79,11 +85,12 @@ func withBatcherEnabled(enabled bool) dataReceiverOption { } } -func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { +func (es *esDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { factory := receiver.NewFactory( component.MustNewType("mockelasticsearch"), createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment), receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), ) esURL, err := url.Parse(es.endpoint) @@ -101,6 +108,10 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu if err != nil { return fmt.Errorf("failed to create logs receiver: %w", err) } + metricsReceiver, err := factory.CreateMetrics(context.Background(), set, cfg, mc) + if err != nil { + return fmt.Errorf("failed to create metrics receiver: %w", err) + } tracesReceiver, err := factory.CreateTraces(context.Background(), set, cfg, tc) if err != nil { return fmt.Errorf("failed to create traces receiver: %w", err) @@ -108,6 +119,7 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu // Since we use SharedComponent both receivers should be same require.Same(es.t, logsReceiver, tracesReceiver) + require.Same(es.t, logsReceiver, metricsReceiver) es.receiver = logsReceiver return es.receiver.Start(context.Background(), componenttest.NewNopHost()) @@ -126,7 +138,14 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { elasticsearch: endpoints: [%s] logs_index: %s + logs_dynamic_index: + enabled: false + metrics_index: %s + metrics_dynamic_index: + enabled: false traces_index: %s + traces_dynamic_index: + enabled: false sending_queue: enabled: true retry: @@ -134,7 +153,7 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { initial_interval: 100ms max_interval: 1s max_requests: 10000`, - es.endpoint, TestLogsIndex, TestTracesIndex, + es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex, ) if es.batcherEnabled == nil { @@ -189,6 +208,19 @@ func createLogsReceiver( return receiver, nil } +func createMetricsReceiver( + _ context.Context, + params receiver.Settings, + rawCfg component.Config, + next consumer.Metrics, +) (receiver.Metrics, error) { + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).metricsConsumer = next + return receiver, nil +} + func createTracesReceiver( _ context.Context, params receiver.Settings, @@ -206,8 +238,9 @@ type mockESReceiver struct { params receiver.Settings config *config - tracesConsumer consumer.Traces - logsConsumer consumer.Logs + tracesConsumer consumer.Traces + logsConsumer consumer.Logs + metricsConsumer consumer.Metrics server *http.Server } @@ -231,10 +264,12 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error // Ideally bulk request items should be converted to the corresponding event record // however, since we only assert count for now there is no need to do the actual - // translation. Instead we use a pre-initialized empty logs and traces model to + // translation. Instead we use a pre-initialized empty models to // reduce allocation impact on tests and benchmarks. emptyLogs := plog.NewLogs() emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + emptyMetrics := pmetric.NewMetrics() + emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() emptyTrace := ptrace.NewTraces() emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() @@ -260,6 +295,8 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error switch item.Index { case TestLogsIndex: consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + case TestMetricsIndex: + consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics) case TestTracesIndex: consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 50ed649dca0d..49106c7b084f 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -26,7 +26,7 @@ import ( ) func BenchmarkExporter(b *testing.B) { - for _, eventType := range []string{"logs", "traces"} { + for _, eventType := range []string{"logs", "metrics", "traces"} { for _, mappingMode := range []string{"none", "ecs", "raw"} { for _, tc := range []struct { name string @@ -41,6 +41,8 @@ func BenchmarkExporter(b *testing.B) { switch eventType { case "logs": benchmarkLogs(b, tc.batchSize, mappingMode) + case "metrics": + benchmarkMetrics(b, tc.batchSize, mappingMode) case "traces": benchmarkTraces(b, tc.batchSize, mappingMode) } @@ -79,6 +81,35 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, exporter.Shutdown(ctx)) } +func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exporterSettings := exportertest.NewNopSettings() + exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel)) + runnerCfg := prepareBenchmark(b, batchSize, mappingMode) + exporter, err := runnerCfg.factory.CreateMetrics( + ctx, exporterSettings, runnerCfg.esCfg, + ) + require.NoError(b, err) + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + for i := 0; i < b.N; i++ { + metrics, _ := runnerCfg.provider.GenerateMetrics() + b.StartTimer() + require.NoError(b, exporter.ConsumeMetrics(ctx, metrics)) + b.StopTimer() + } + b.ReportMetric( + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) + require.NoError(b, exporter.Shutdown(ctx)) +} + func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -134,7 +165,11 @@ func prepareBenchmark( cfg.esCfg.Mapping.Mode = mappingMode cfg.esCfg.Endpoints = []string{receiver.endpoint} cfg.esCfg.LogsIndex = TestLogsIndex + cfg.esCfg.LogsDynamicIndex.Enabled = false + cfg.esCfg.MetricsIndex = TestMetricsIndex + cfg.esCfg.MetricsDynamicIndex.Enabled = false cfg.esCfg.TracesIndex = TestTracesIndex + cfg.esCfg.TracesDynamicIndex.Enabled = false cfg.esCfg.Flush.Interval = 10 * time.Millisecond cfg.esCfg.NumWorkers = 1 diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index 013994898511..57b6f936713f 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -17,7 +17,7 @@ import ( ) func TestExporter(t *testing.T) { - for _, eventType := range []string{"logs", "traces"} { + for _, eventType := range []string{"logs", "metrics", "traces"} { for _, tc := range []struct { name string @@ -68,6 +68,8 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool switch eventType { case "logs": sender = testbed.NewOTLPLogsDataSender(host, port) + case "metrics": + sender = testbed.NewOTLPMetricDataSender(host, port) case "traces": sender = testbed.NewOTLPTraceDataSender(host, port) default: