Skip to content

Commit

Permalink
[exporter/elasticsearch] Limit bulk request size to avoid 413 Entity …
Browse files Browse the repository at this point in the history
…Too Large (open-telemetry#36396)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Limit the bulk request size to roughly `flush::bytes` for sync bulk
indexer.
Sync bulk indexer is used when `batcher::enabled` is either true or
false. In order words, sync bulk indexer is not used when batcher config
is undefined.
  Change `flush::bytes` to always measure in uncompressed bytes.
Change default `batcher::max_size_items` to `0` as bulk request size
limit is now more effectively enforced by `flush::bytes`.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#36163

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Modified BenchmarkExporter to run with `{name: "xxlarge_batch",
batchSize: 1000000},` and removed `batcher::max_size_items` and added a
log line for compressed and uncompressed buffer size to reproduce the
error.
```
logger.go:146: 2024-11-19T17:16:40.060Z	ERROR	Flush	{"s.bi.Len": 10382932, "s.bi.UncompressedLen": 532777786}
    logger.go:146: 2024-11-19T17:16:40.312Z	ERROR	bulk indexer flush error	{"error": "flush failed (413): [413 Request Entity Too Large] "}
```

With this PR, every flush logs and there is no error.
```
   logger.go:146: 2024-11-19T17:23:52.574Z	ERROR	Flush	{"s.bi.Len": 99148, "s.bi.UncompressedLen": 5000007}
```

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
2 people authored and AkhigbeEromo committed Jan 13, 2025
1 parent 922a394 commit 85763a1
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 31 deletions.
31 changes: 31 additions & 0 deletions .chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Respect `flush::bytes` in sync bulk indexer, `flush::bytes` measures uncompressed size, change default `batcher::max_size_items` to `0`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36163]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Limit the bulk request size to roughly `flush::bytes` for sync bulk indexer.
Sync bulk indexer is used when `batcher::enabled` is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined.
Change `flush::bytes` to always measure in uncompressed bytes.
Change default `batcher::max_size_items` to `0` as bulk request size limit is now more effectively enforced by `flush::bytes`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 9 additions & 8 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go).

- `batcher`:
- `enabled` (default=unset): Enable batching of requests into a single bulk request.
- `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately.
- `max_size_items` (default=10000): Maximum number of log records / spans in a request.
- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer.
- `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`.
- `min_size_items` (default=5000): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush.
- `max_size_items` (default=0): Maximum number of log records / spans / data points in a batched request. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size_items` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections.
- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer.

By default, the exporter will perform its own buffering and batching, as configured through the
`flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or
`false`, the exporter will not perform any of its own buffering or batching, and the `flush` config
will be ignored. In a future release when the `batcher` config is stable, and has feature parity
`false`, the exporter will not perform any of its own buffering or batching, and the `flush::interval` config
will be ignored.
In a future release when the `batcher` config is stable, and has feature parity
with the exporter's existing `flush` config, it will be enabled by default.

Using the common `batcher` functionality provides several benefits over the default behavior:
Expand Down Expand Up @@ -198,7 +199,7 @@ The behaviour of this bulk indexing can be configured with the following setting

- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently.
- `flush`: Event bulk indexer buffer flush settings
- `bytes` (default=5000000): Write buffer flush size limit.
- `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB.
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
Expand All @@ -209,7 +210,7 @@ The behaviour of this bulk indexing can be configured with the following setting
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.

> [!NOTE]
> The `flush` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`.
> The `flush::interval` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`.

### Elasticsearch node discovery

Expand Down
34 changes: 18 additions & 16 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config
return &syncBulkIndexer{
config: bulkIndexerConfig(client, config),
flushTimeout: config.Timeout,
flushBytes: config.Flush.Bytes,
retryConfig: config.Retry,
logger: logger,
}
Expand All @@ -96,6 +97,7 @@ func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config
type syncBulkIndexer struct {
config docappender.BulkIndexerConfig
flushTimeout time.Duration
flushBytes int
retryConfig RetrySettings
logger *zap.Logger
}
Expand Down Expand Up @@ -124,8 +126,17 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
if err != nil {
return err
}
// flush bytes should operate on uncompressed length
// as Elasticsearch http.max_content_length measures uncompressed length.
if s.bi.UncompressedLen() >= s.s.flushBytes {
return s.Flush(ctx)
}
return nil
}

// End is a no-op.
Expand Down Expand Up @@ -170,16 +181,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
numWorkers = runtime.NumCPU()
}

flushInterval := config.Flush.Interval
if flushInterval == 0 {
flushInterval = 30 * time.Second
}

flushBytes := config.Flush.Bytes
if flushBytes == 0 {
flushBytes = 5e+6
}

pool := &asyncBulkIndexer{
wg: sync.WaitGroup{},
items: make(chan docappender.BulkIndexerItem, config.NumWorkers),
Expand All @@ -195,9 +196,9 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
w := asyncBulkIndexerWorker{
indexer: bi,
items: pool.items,
flushInterval: flushInterval,
flushInterval: config.Flush.Interval,
flushTimeout: config.Timeout,
flushBytes: flushBytes,
flushBytes: config.Flush.Bytes,
logger: logger,
stats: &pool.stats,
}
Expand Down Expand Up @@ -298,8 +299,9 @@ func (w *asyncBulkIndexerWorker) run() {
w.logger.Error("error adding item to bulk indexer", zap.Error(err))
}

// w.indexer.Len() can be either compressed or uncompressed bytes
if w.indexer.Len() >= w.flushBytes {
// flush bytes should operate on uncompressed length
// as Elasticsearch http.max_content_length measures uncompressed length.
if w.indexer.UncompressedLen() >= w.flushBytes {
w.flush()
flushTick.Reset(w.flushInterval)
}
Expand Down
30 changes: 30 additions & 0 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -122,6 +123,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingECS.String()},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
wantRequireDataStream: false,
},
Expand All @@ -130,6 +132,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingOTel.String()},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
wantRequireDataStream: true,
},
Expand Down Expand Up @@ -252,13 +255,15 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "none"},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
},
{
name: "compression gzip",
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "gzip"},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
},
}
Expand Down Expand Up @@ -316,3 +321,28 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie

return bulkIndexer
}

func TestSyncBulkIndexer_flushBytes(t *testing.T) {
var reqCnt atomic.Int64
cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}}
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: func(r *http.Request) (*http.Response, error) {
if r.URL.Path == "/_bulk" {
reqCnt.Add(1)
}
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
}})
require.NoError(t, err)

bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg)
session, err := bi.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
assert.NoError(t, bi.Close(context.Background()))
}
15 changes: 9 additions & 6 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestConfig(t *testing.T) {
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
Bytes: 10485760,
Interval: 30 * time.Second,
},
Retry: RetrySettings{
Enabled: true,
Expand All @@ -117,7 +118,7 @@ func TestConfig(t *testing.T) {
MinSizeItems: 5000,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: 10000,
MaxSizeItems: 0,
},
},
},
Expand Down Expand Up @@ -164,7 +165,8 @@ func TestConfig(t *testing.T) {
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
Bytes: 10485760,
Interval: 30 * time.Second,
},
Retry: RetrySettings{
Enabled: true,
Expand All @@ -188,7 +190,7 @@ func TestConfig(t *testing.T) {
MinSizeItems: 5000,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: 10000,
MaxSizeItems: 0,
},
},
},
Expand Down Expand Up @@ -235,7 +237,8 @@ func TestConfig(t *testing.T) {
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
Bytes: 10485760,
Interval: 30 * time.Second,
},
Retry: RetrySettings{
Enabled: true,
Expand All @@ -259,7 +262,7 @@ func TestConfig(t *testing.T) {
MinSizeItems: 5000,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: 10000,
MaxSizeItems: 0,
},
},
},
Expand Down
6 changes: 5 additions & 1 deletion exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ func createDefaultConfig() component.Config {
MinSizeItems: 5000,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: 10000,
MaxSizeItems: 0,
},
},
Flush: FlushSettings{
Bytes: 5e+6,
Interval: 30 * time.Second,
},
}
}

Expand Down

0 comments on commit 85763a1

Please sign in to comment.