From fda9b1cb43427265fdd8d1b0b0d111243942180e Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 Jul 2024 15:15:50 +0800 Subject: [PATCH] fix: Update `FlushBytes` parsing/defaults (#13576) Updates the `FlushBytes` setting to default to 1 mib and only override to 24kb if the user has explicitly set it to 24kb. Fixes #13024 --------- Signed-off-by: Marc Lopez Rubio (cherry picked from commit a453a88506722882a32a174f2eeecef693ff6957) Signed-off-by: Marc Lopez Rubio # Conflicts: # changelogs/head.asciidoc # internal/beater/beater.go --- changelogs/head.asciidoc | 32 +++++++++++++++ internal/beater/beater.go | 72 ++++++++++++++++++++++++++++++-- internal/beater/beater_test.go | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 changelogs/head.asciidoc diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc new file mode 100644 index 00000000000..1671420cde9 --- /dev/null +++ b/changelogs/head.asciidoc @@ -0,0 +1,32 @@ +[[release-notes-head]] +== APM version HEAD + +https://github.com/elastic/apm-server/compare/8.14\...main[View commits] + +[float] +==== Bug fixes + +- Avoid data race due to reuse of `bytes.Buffer` in ES bulk requests {pull}13155[13155] +- APM Server now relies on the Elasticsearch apm-data plugin's index templates, which reverts some unsafe uses of `flattened` field types {pull}12066[12066] +- Add `error.id` to jaeger errors {pull}13196[13196] +- Fix a performance regression for apm servers which don't specify `output.elasticsearch.flush_bytes` {pull}13576[13576] + +[float] +==== Breaking Changes + +[float] +==== Deprecations + +[float] +==== Intake API Changes + +[float] +==== Added + +- Add `elasticsearch.flushed.uncompressed.bytes` metric {pull}13155[13155] +- APM Server now relies on the Elasticsearch apm-data plugin's index templates, removing the requirement to install the APM integration package {pull}12066[12066] +- Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326] +- Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196] +- Add require data stream to bulk index requests {pull}13398[13398] +- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] +- Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472] diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 93a3d67abc0..4f6a7f9f8d2 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -652,7 +652,6 @@ func (s *Runner) newFinalBatchProcessor( newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, ) (modelpb.BatchProcessor, func(context.Context) error, error) { - monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { @@ -668,6 +667,7 @@ func (s *Runner) newFinalBatchProcessor( } monitoring.NewString(outputRegistry, "name").Set("elasticsearch") +<<<<<<< HEAD var esConfig struct { *elasticsearch.Config `config:",inline"` FlushBytes string `config:"flush_bytes"` @@ -702,13 +702,18 @@ func (s *Runner) newFinalBatchProcessor( flushBytes = minFlush } client, err := newElasticsearchClient(esConfig.Config) +======= + // Create the docappender and Elasticsearch config + appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit) +>>>>>>> a453a8850 (fix: Update `FlushBytes` parsing/defaults (#13576)) if err != nil { return nil, nil, err } - var scalingCfg docappender.ScalingConfig - if enabled := esConfig.Scaling.Enabled; enabled != nil { - scalingCfg.Disabled = !*enabled + client, err := newElasticsearchClient(esCfg) + if err != nil { + return nil, nil, err } +<<<<<<< HEAD opts := docappender.Config{ CompressionLevel: esConfig.CompressionLevel, FlushBytes: flushBytes, @@ -720,6 +725,9 @@ func (s *Runner) newFinalBatchProcessor( } opts = docappenderConfig(opts, memLimit, s.logger) appender, err := docappender.New(client, opts) +======= + appender, err := docappender.New(client, appenderCfg) +>>>>>>> a453a8850 (fix: Update `FlushBytes` parsing/defaults (#13576)) if err != nil { return nil, nil, err } @@ -782,6 +790,62 @@ func (s *Runner) newFinalBatchProcessor( return newDocappenderBatchProcessor(appender), appender.Close, nil } +func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( + docappender.Config, *elasticsearch.Config, error, +) { + var esConfig struct { + *elasticsearch.Config `config:",inline"` + FlushBytes string `config:"flush_bytes"` + FlushInterval time.Duration `config:"flush_interval"` + MaxRequests int `config:"max_requests"` + Scaling struct { + Enabled *bool `config:"enabled"` + } `config:"autoscaling"` + } + // Default to 1mib flushes, which is the default for go-docappender. + esConfig.FlushBytes = "1 mib" + esConfig.FlushInterval = time.Second + esConfig.Config = elasticsearch.DefaultConfig() + esConfig.MaxIdleConnsPerHost = 10 + + if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { + return docappender.Config{}, nil, err + } + + var flushBytes int + if esConfig.FlushBytes != "" { + b, err := humanize.ParseBytes(esConfig.FlushBytes) + if err != nil { + return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err) + } + flushBytes = int(b) + } + minFlush := 24 * 1024 + if esConfig.CompressionLevel != 0 && flushBytes < minFlush { + s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush) + flushBytes = minFlush + } + var scalingCfg docappender.ScalingConfig + if enabled := esConfig.Scaling.Enabled; enabled != nil { + scalingCfg.Disabled = !*enabled + } + cfg := docappenderConfig(docappender.Config{ + CompressionLevel: esConfig.CompressionLevel, + FlushBytes: flushBytes, + FlushInterval: esConfig.FlushInterval, + Tracer: tracer, + MaxRequests: esConfig.MaxRequests, + Scaling: scalingCfg, + Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), + RequireDataStream: true, + }, memLimit, s.logger) + if cfg.MaxRequests != 0 { + esConfig.MaxIdleConnsPerHost = cfg.MaxRequests + } + + return cfg, esConfig.Config, nil +} + func docappenderConfig( opts docappender.Config, memLimit float64, logger *logp.Logger, ) docappender.Config { diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index ce968f02eba..3e911b35d8b 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -29,10 +29,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/apm/v2/apmtest" + "go.uber.org/zap" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/apm-server/internal/elasticsearch" + agentconfig "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/go-docappender/v2" ) func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { @@ -152,3 +156,74 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C require.NoError(t, err) return client } + +func TestRunnerNewDocappenderConfig(t *testing.T) { + var tc = []struct { + memSize float64 + wantMaxRequests int + wantDocBufSize int + }{ + {memSize: 1, wantMaxRequests: 11, wantDocBufSize: 819}, + {memSize: 2, wantMaxRequests: 13, wantDocBufSize: 1638}, + {memSize: 4, wantMaxRequests: 16, wantDocBufSize: 3276}, + {memSize: 8, wantMaxRequests: 22, wantDocBufSize: 6553}, + } + for _, c := range tc { + t.Run(fmt.Sprintf("default/%vgb", c.memSize), func(t *testing.T) { + r := Runner{ + elasticsearchOutputConfig: agentconfig.NewConfig(), + logger: logp.NewLogger("test"), + } + docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + require.NoError(t, err) + assert.Equal(t, docappender.Config{ + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: time.Second, + FlushBytes: 1024 * 1024, + MaxRequests: c.wantMaxRequests, + DocumentBufferSize: c.wantDocBufSize, + }, docCfg) + assert.Equal(t, &elasticsearch.Config{ + Hosts: elasticsearch.Hosts{"localhost:9200"}, + Backoff: elasticsearch.DefaultBackoffConfig, + Protocol: "http", + CompressionLevel: 5, + Timeout: 5 * time.Second, + MaxRetries: 3, + MaxIdleConnsPerHost: c.wantMaxRequests, + }, esCfg) + }) + t.Run(fmt.Sprintf("override/%vgb", c.memSize), func(t *testing.T) { + r := Runner{ + elasticsearchOutputConfig: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "flush_bytes": "500 kib", + "flush_interval": "2s", + "max_requests": 50, + }), + logger: logp.NewLogger("test"), + } + docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + require.NoError(t, err) + assert.Equal(t, docappender.Config{ + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: 2 * time.Second, + FlushBytes: 500 * 1024, + MaxRequests: 50, + DocumentBufferSize: c.wantDocBufSize, + }, docCfg) + assert.Equal(t, &elasticsearch.Config{ + Hosts: elasticsearch.Hosts{"localhost:9200"}, + Backoff: elasticsearch.DefaultBackoffConfig, + Protocol: "http", + CompressionLevel: 5, + Timeout: 5 * time.Second, + MaxRetries: 3, + MaxIdleConnsPerHost: 50, + }, esCfg) + }) + } +}