Skip to content

Commit

Permalink
[exporter/elasticsearch] Add retry.retry_on_status config (open-telem…
Browse files Browse the repository at this point in the history
…etry#32585)

Previously, the status codes that trigger retries were hardcoded to be
429, 500, 502, 503, 504.
It is now configurable using `retry.retry_on_status`, and defaults to
`[429, 500, 502, 503, 504]` to avoid a breaking change.
To avoid duplicates, it is recommended to configure
`retry.retry_on_status` to `[429]`, which would be the default in a
future version.

Part of open-telemetry#32584

---------

Co-authored-by: Vishal Raj <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
3 people authored May 1, 2024
1 parent e6fd2f9 commit 7a19fb3
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 13 deletions.
30 changes: 30 additions & 0 deletions .chloggen/elasticsearchexporter_retry-on-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add retry.retry_on_status config

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32584]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Previously, the status codes that trigger retries were hardcoded to be 429, 500, 502, 503, 504.
It is now configurable using `retry.retry_on_status`, and defaults to `[429, 500, 502, 503, 504]` to avoid a breaking change.
To avoid duplicates, it is recommended to configure `retry.retry_on_status` to `[429]`, which would be the default in a future version.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](
- `max_requests` (default=3): Number of HTTP request retries.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future.
- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
- `mode` (default=none): The fields naming mode. valid modes are:
Expand Down
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ type RetrySettings struct {

// MaxInterval configures the max waiting time if consecutive requests failed.
MaxInterval time.Duration `mapstructure:"max_interval"`

// RetryOnStatus configures the status codes that trigger request or document level retries.
RetryOnStatus []int `mapstructure:"retry_on_status"`
}

type MappingsSettings struct {
Expand Down
10 changes: 10 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter

import (
"net/http"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +62,13 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
http.StatusTooManyRequests,
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
},
},
Mapping: MappingsSettings{
Mode: "none",
Expand Down Expand Up @@ -136,6 +144,7 @@ func TestLoadConfig(t *testing.T) {
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
},
Mapping: MappingsSettings{
Mode: "none",
Expand Down Expand Up @@ -186,6 +195,7 @@ func TestLoadConfig(t *testing.T) {
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
},
Mapping: MappingsSettings{
Mode: "none",
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/elasticsearch_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren
Header: headers,

// configure retry behavior
RetryOnStatus: retryOnStatus,
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: retryDisabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
Expand Down Expand Up @@ -175,7 +175,7 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati
}
}

func shouldRetryEvent(status int) bool {
func shouldRetryEvent(status int, retryOnStatus []int) bool {
for _, retryable := range retryOnStatus {
if status == retryable {
return true
Expand All @@ -184,15 +184,15 @@ func shouldRetryEvent(status int) bool {
return false
}

func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int) error {
func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int, retryOnStatus []int) error {
attempts := 1
body := bytes.NewReader(document)
item := esBulkIndexerItem{Action: createAction, Index: index, Body: body}
// Setup error handler. The handler handles the per item response status based on the
// selective ACKing in the bulk response.
item.OnFailure = func(ctx context.Context, item esBulkIndexerItem, resp esBulkIndexerResponseItem, err error) {
switch {
case attempts < maxAttempts && shouldRetryEvent(resp.Status):
case attempts < maxAttempts && shouldRetryEvent(resp.Status, retryOnStatus):
logger.Debug("Retrying to index",
zap.String("name", index),
zap.Int("attempt", attempts),
Expand Down
8 changes: 8 additions & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
import (
"context"
"fmt"
"net/http"
"runtime"
"time"

Expand Down Expand Up @@ -51,6 +52,13 @@ func createDefaultConfig() component.Config {
MaxRequests: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
http.StatusTooManyRequests,
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
},
},
Mapping: MappingsSettings{
Mode: "none",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume

if err := next.ConsumeLogs(context.Background(), logs); err != nil {
response.HasErrors = true
item.Status = http.StatusInternalServerError
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = err.Error()
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ type elasticsearchLogsExporter struct {
logstashFormat LogstashFormatSettings
dynamicIndex bool
maxAttempts int
retryOnStatus []int

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
model mappingModel
}

var retryOnStatus = []int{500, 502, 503, 504, 429}

const createAction = "create"

func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporter, error) {
Expand Down Expand Up @@ -71,6 +70,7 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte
index: indexStr,
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
maxAttempts: maxAttempts,
retryOnStatus: cfg.Retry.RetryOnStatus,
model: model,
logstashFormat: cfg.LogstashFormat,
}
Expand Down Expand Up @@ -129,5 +129,5 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts)
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts, e.retryOnStatus)
}
4 changes: 2 additions & 2 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func TestExporter_PushEvent(t *testing.T) {
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
if failures == 0 {
failures++
return nil, &httpTestError{message: "oops"}
return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"}
}

rec.Record(docs)
Expand Down Expand Up @@ -510,7 +510,7 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
}

func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string) {
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts)
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts, exporter.retryOnStatus)
require.NoError(t, err)
}

Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ elasticsearch/trace:
bytes: 10485760
retry:
max_requests: 5
retry_on_status:
- 429
- 500
elasticsearch/log:
tls:
insecure: false
Expand All @@ -38,6 +41,9 @@ elasticsearch/log:
bytes: 10485760
retry:
max_requests: 5
retry_on_status:
- 429
- 500
sending_queue:
enabled: true
elasticsearch/logstash_format:
Expand Down
4 changes: 3 additions & 1 deletion exporter/elasticsearchexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type elasticsearchTracesExporter struct {
logstashFormat LogstashFormatSettings
dynamicIndex bool
maxAttempts int
retryOnStatus []int

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
Expand Down Expand Up @@ -63,6 +64,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp
index: cfg.TracesIndex,
dynamicIndex: cfg.TracesDynamicIndex.Enabled,
maxAttempts: maxAttempts,
retryOnStatus: cfg.Retry.RetryOnStatus,
model: model,
logstashFormat: cfg.LogstashFormat,
}, nil
Expand Down Expand Up @@ -122,5 +124,5 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
}
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts)
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts, e.retryOnStatus)
}
4 changes: 2 additions & 2 deletions exporter/elasticsearchexporter/traces_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestExporter_PushTraceRecord(t *testing.T) {
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
if failures == 0 {
failures++
return nil, &httpTestError{message: "oops"}
return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"}
}

rec.Record(docs)
Expand Down Expand Up @@ -463,7 +463,7 @@ func withTestTracesExporterConfig(fns ...func(*Config)) func(string) *Config {
}

func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents string) {
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts)
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts, exporter.retryOnStatus)
require.NoError(t, err)
}

Expand Down

0 comments on commit 7a19fb3

Please sign in to comment.