From e419b95c42d398811380cb24076310ed4e4e6214 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 8 Jul 2024 16:56:05 +0800 Subject: [PATCH 1/2] output: Retry document-level `429`s by default (#13620) Updates the APM Server to automatically retry document-level `429`s from Elasticsearch to avoid dropping data. It can be configured/overwritten by `output.elasticsearch.max_retries`, and defaults to `3`. It uses the default backoff configuration, which could wait up to 1m if enough retries are configured, but can be overwritten as well. --------- Signed-off-by: Marc Lopez Rubio Co-authored-by: Carson Ip (cherry picked from commit 28436dc03ae341010c24b09d4734d53ff2f3d806) # Conflicts: # changelogs/head.asciidoc --- changelogs/head.asciidoc | 4 ++++ internal/beater/beater.go | 15 ++++++++++----- internal/beater/beater_test.go | 32 ++++++++++++++++++-------------- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index c7ba3087091..a13985e5e85 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -23,9 +23,13 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits] [float] ==== Added +<<<<<<< HEAD - Add `elasticsearch.flushed.uncompressed.bytes` metric {pull}13155[13155] - APM Server now relies on the Elasticsearch apm-data plugin's index templates, removing the requirement to install the APM integration package {pull}12066[12066] - Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326] - Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196] - Add require data stream to bulk index requests {pull}13398[13398] - Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] +======= +- APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620] +>>>>>>> 28436dc03 (output: Retry document-level `429`s by default (#13620)) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 2071e11f84b..0c715af5223 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -723,7 +723,7 @@ func (s *Runner) newFinalBatchProcessor( func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( docappender.Config, *elasticsearch.Config, error, ) { - var esConfig struct { + esConfig := struct { *elasticsearch.Config `config:",inline"` FlushBytes string `config:"flush_bytes"` FlushInterval time.Duration `config:"flush_interval"` @@ -731,11 +731,12 @@ func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( Scaling struct { Enabled *bool `config:"enabled"` } `config:"autoscaling"` + }{ + // Default to 1mib flushes, which is the default for go-docappender. + FlushBytes: "1 mib", + FlushInterval: time.Second, + Config: elasticsearch.DefaultConfig(), } - // Default to 1mib flushes, which is the default for go-docappender. - esConfig.FlushBytes = "1 mib" - esConfig.FlushInterval = time.Second - esConfig.Config = elasticsearch.DefaultConfig() esConfig.MaxIdleConnsPerHost = 10 if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { @@ -768,6 +769,10 @@ func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( Scaling: scalingCfg, Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), RequireDataStream: true, + // Use the output's max_retries to configure the go-docappender's + // document level retries. + MaxDocumentRetries: esConfig.MaxRetries, + RetryOnDocumentStatus: []int{429}, // Only retry "safe" 429 responses. }, memLimit, s.logger) if cfg.MaxRequests != 0 { esConfig.MaxIdleConnsPerHost = cfg.MaxRequests diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 3e911b35d8b..b035fe9c8cf 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -177,13 +177,15 @@ func TestRunnerNewDocappenderConfig(t *testing.T) { docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) require.NoError(t, err) assert.Equal(t, docappender.Config{ - Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), - CompressionLevel: 5, - RequireDataStream: true, - FlushInterval: time.Second, - FlushBytes: 1024 * 1024, - MaxRequests: c.wantMaxRequests, - DocumentBufferSize: c.wantDocBufSize, + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: time.Second, + FlushBytes: 1024 * 1024, + MaxRequests: c.wantMaxRequests, + DocumentBufferSize: c.wantDocBufSize, + MaxDocumentRetries: 3, + RetryOnDocumentStatus: []int{429}, }, docCfg) assert.Equal(t, &elasticsearch.Config{ Hosts: elasticsearch.Hosts{"localhost:9200"}, @@ -207,13 +209,15 @@ func TestRunnerNewDocappenderConfig(t *testing.T) { docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) require.NoError(t, err) assert.Equal(t, docappender.Config{ - Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), - CompressionLevel: 5, - RequireDataStream: true, - FlushInterval: 2 * time.Second, - FlushBytes: 500 * 1024, - MaxRequests: 50, - DocumentBufferSize: c.wantDocBufSize, + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: 2 * time.Second, + FlushBytes: 500 * 1024, + MaxRequests: 50, + DocumentBufferSize: c.wantDocBufSize, + MaxDocumentRetries: 3, + RetryOnDocumentStatus: []int{429}, }, docCfg) assert.Equal(t, &elasticsearch.Config{ Hosts: elasticsearch.Hosts{"localhost:9200"}, From 7c05d56690012cf1a29ac9be57a3607b573a8194 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 9 Jul 2024 16:03:31 +0800 Subject: [PATCH 2/2] Fix conflicts, update changelog Signed-off-by: Marc Lopez Rubio --- changelogs/8.15.asciidoc | 1 + changelogs/head.asciidoc | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/changelogs/8.15.asciidoc b/changelogs/8.15.asciidoc index 2c2eae9120b..f3f4e495947 100644 --- a/changelogs/8.15.asciidoc +++ b/changelogs/8.15.asciidoc @@ -35,3 +35,4 @@ https://github.com/elastic/apm-server/compare/v8.14.2\...v8.15.0[View commits] - Add require data stream to bulk index requests {pull}13398[13398] - Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] - Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472] +- APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620] diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index a13985e5e85..c7ba3087091 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -23,13 +23,9 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits] [float] ==== Added -<<<<<<< HEAD - Add `elasticsearch.flushed.uncompressed.bytes` metric {pull}13155[13155] - APM Server now relies on the Elasticsearch apm-data plugin's index templates, removing the requirement to install the APM integration package {pull}12066[12066] - Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326] - Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196] - Add require data stream to bulk index requests {pull}13398[13398] - Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] -======= -- APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620] ->>>>>>> 28436dc03 (output: Retry document-level `429`s by default (#13620))