diff --git a/.chloggen/prometheusremotewrite-reallow-multiple-workers.yaml b/.chloggen/prometheusremotewrite-reallow-multiple-workers.yaml new file mode 100644 index 000000000000..a8faf9000abe --- /dev/null +++ b/.chloggen/prometheusremotewrite-reallow-multiple-workers.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Re allows the configuration of multiple workers + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36134] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index 09a0f2feda7d..a073f79550b6 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -54,7 +54,7 @@ The following settings can be optionally configured: - `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes. - `enabled`: enable the sending queue (default: `true`) - `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`) - - `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5`) + - `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5` or default: `1` if `EnableMultipleWorkersFeatureGate` is enabled). - `resource_to_telemetry_conversion` - `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default. - `target_info`: customize `target_info` metric @@ -66,6 +66,8 @@ The following settings can be optionally configured: - `max_batch_size_bytes` (default = `3000000` -> `~2.861 mb`): Maximum size of a batch of samples to be sent to the remote write endpoint. If the batch size is larger than this value, it will be split into multiple batches. +- `max_batch_request_parallelism` (default = `5`): Maximum parallelism allowed for a single request bigger than `max_batch_size_bytes`. + This configuration is used only when feature gate `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is enabled. Example: @@ -101,12 +103,22 @@ Several helper files are leveraged to provide additional capabilities automatica - [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`. ### Feature gates + +#### RetryOn429 + This exporter has feature gate: `exporter.prometheusremotewritexporter.RetryOn429`. When this feature gate is enable the prometheus remote write exporter will retry on 429 http status code with the provided retry configuration. It currently doesn't support respecting the http header `Retry-After` if provided since the retry library used doesn't support this feature. To enable it run collector with enabled feature gate `exporter.prometheusremotewritexporter.RetryOn429`. This can be done by executing it with one additional parameter - `--feature-gates=telemetry.useOtelForInternalMetrics`. +#### EnableMultipleWorkersFeatureGate + +This exporter has feature gate: `+exporter.prometheusremotewritexporter.EnableMultipleWorkers`. + +When this feature gate is enabled, `num_consumers` will be used as the worker counter for handling batches from the queue, and `max_batch_request_parallelism` will be used for parallelism on single batch bigger than `max_batch_size_bytes`. +Enabling this feature gate, with `num_consumers` higher than 1 requires the target destination to supports ingestion of OutOfOrder samples. See [Multiple Consumers and OutOfOrder](#multiple-consumers-and-outoforder) for more info + ## Metric names and labels normalization OpenTelemetry metric names and attributes are normalized to be compliant with Prometheus naming rules. [Details on this normalization process are described in the Prometheus translator module](../../pkg/translator/prometheus/). @@ -149,3 +161,20 @@ sum by (namespace) (app_ads_ad_requests_total) [beta]:https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol + +## Multiple Consumers and OutOfOrder + +**DISCLAIMER**: This snippet applies only to Prometheus, other remote write destinations using Prometheus Protocol (ex: Thanos/Grafana Mimir/VictoriaMetrics) may have different settings. + +By default, Prometheus expects samples to be ingested sequentially, in temporal order. + +When multiple consumers are enabled, the temporal ordering of the samples written to the target destination is not deterministic, and temporal ordering can no longer be guaranteed. For example, one worker may push a sample for `t+30s`, and a second worker may push an additional sample but for `t+15s`. + +Vanilla Prometheus configurations will reject these unordered samples and you'll receive "out of order" errors. + +Out-of-order support in Prometheus must be enabled for multiple consumers. +This can be done by using the `tsdb.out_of_order_time_window: 10m` settings. Please choose an appropriate time window to support pushing the worst-case scenarios of a "queue" build-up on the sender side. + +See for more info: +- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb +- https://prometheus.io/docs/prometheus/latest/feature_flags/#remote-write-receiver diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index 23b8c5f1e4e3..fc093b43ce62 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -35,6 +35,9 @@ type Config struct { // maximum size in bytes of time series batch sent to remote storage MaxBatchSizeBytes int `mapstructure:"max_batch_size_bytes"` + // maximum amount of parallel requests to do when handling large batch request + MaxBatchRequestParallelism *int `mapstructure:"max_batch_request_parallelism"` + // ResourceToTelemetrySettings is the option for converting resource attributes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. @@ -87,6 +90,13 @@ var _ component.Config = (*Config)(nil) // Validate checks if the exporter configuration is valid func (cfg *Config) Validate() error { + if cfg.MaxBatchRequestParallelism != nil && *cfg.MaxBatchRequestParallelism < 1 { + return fmt.Errorf("max_batch_request_parallelism can't be set to below 1") + } + if enableMultipleWorkersFeatureGate.IsEnabled() && cfg.MaxBatchRequestParallelism == nil { + return fmt.Errorf("enabling featuregate `+exporter.prometheusremotewritexporter.EnableMultipleWorkers` requires setting `max_batch_request_parallelism` in the configuration") + } + if cfg.RemoteWriteQueue.QueueSize < 0 { return fmt.Errorf("remote write queue size can't be negative") } diff --git a/exporter/prometheusremotewriteexporter/config_test.go b/exporter/prometheusremotewriteexporter/config_test.go index 1f37f7972a5e..35979a7e4c00 100644 --- a/exporter/prometheusremotewriteexporter/config_test.go +++ b/exporter/prometheusremotewriteexporter/config_test.go @@ -56,8 +56,9 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "2"), expected: &Config{ - MaxBatchSizeBytes: 3000000, - TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(), + MaxBatchSizeBytes: 3000000, + MaxBatchRequestParallelism: toPtr(10), + TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(), BackOffConfig: configretry.BackOffConfig{ Enabled: true, InitialInterval: 10 * time.Second, @@ -90,6 +91,10 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "negative_num_consumers"), errorMessage: "remote write consumer number can't be negative", }, + { + id: component.NewIDWithName(metadata.Type, "less_than_1_max_batch_request_parallelism"), + errorMessage: "max_batch_request_parallelism can't be set to below 1", + }, } for _, tt := range tests { @@ -136,3 +141,8 @@ func TestDisabledTargetInfo(t *testing.T) { assert.False(t, cfg.(*Config).TargetInfo.Enabled) } + +func toPtr[T any](val T) *T { + ret := val + return &ret +} diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 74e1cff42b79..b7014ec232d8 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -120,13 +120,18 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version) + concurrency := cfg.RemoteWriteQueue.NumConsumers + if enableMultipleWorkersFeatureGate.IsEnabled() || cfg.MaxBatchRequestParallelism != nil { + concurrency = *cfg.MaxBatchRequestParallelism + } + prwe := &prwExporter{ endpointURL: endpointURL, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), userAgentHeader: userAgentHeader, maxBatchSizeBytes: cfg.MaxBatchSizeBytes, - concurrency: cfg.RemoteWriteQueue.NumConsumers, + concurrency: concurrency, clientSettings: &cfg.ClientConfig, settings: set.TelemetrySettings, retrySettings: cfg.BackOffConfig, diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 46c52d233d91..cd11e796f46b 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -26,6 +26,13 @@ var retryOn429FeatureGate = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterDescription("When enabled, the Prometheus remote write exporter will retry 429 http status code. Requires exporter.prometheusremotewritexporter.metrics.RetryOn429 to be enabled."), ) +var enableMultipleWorkersFeatureGate = featuregate.GlobalRegistry().MustRegister( + "exporter.prometheusremotewritexporter.EnableMultipleWorkers", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled and settings configured, the Prometheus remote exporter will"+ + " spawn multiple workers/goroutines to handle incoming metrics batches concurrently"), +) + // NewFactory creates a new Prometheus Remote Write exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -42,17 +49,19 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings, return nil, errors.New("invalid configuration") } + if !enableMultipleWorkersFeatureGate.IsEnabled() && prwCfg.RemoteWriteQueue.NumConsumers != 5 { + set.Logger.Warn("`remote_write_queue.num_consumers` is deprecated for controlling the single request parallelism. Please configure `max_batch_request_parallelism: X` for the same behavior.") + } + prwe, err := newPRWExporter(prwCfg, set) if err != nil { return nil, err } - // Don't allow users to configure the queue. - // See https://github.com/open-telemetry/opentelemetry-collector/issues/2949. - // Prometheus remote write samples needs to be in chronological - // order for each timeseries. If we shard the incoming metrics - // without considering this limitation, we experience - // "out of order samples" errors. + numConsumers := 1 + if enableMultipleWorkersFeatureGate.IsEnabled() { + numConsumers = prwCfg.RemoteWriteQueue.NumConsumers + } exporter, err := exporterhelper.NewMetrics( ctx, set, @@ -61,7 +70,7 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings, exporterhelper.WithTimeout(prwCfg.TimeoutSettings), exporterhelper.WithQueue(exporterhelper.QueueConfig{ Enabled: prwCfg.RemoteWriteQueue.Enabled, - NumConsumers: 1, + NumConsumers: numConsumers, QueueSize: prwCfg.RemoteWriteQueue.QueueSize, }), exporterhelper.WithStart(prwe.Start), @@ -83,10 +92,16 @@ func createDefaultConfig() component.Config { clientConfig.WriteBufferSize = 512 * 1024 clientConfig.Timeout = exporterhelper.NewDefaultTimeoutConfig().Timeout + numConsumers := 5 + if enableMultipleWorkersFeatureGate.IsEnabled() { + numConsumers = 1 + } return &Config{ Namespace: "", ExternalLabels: map[string]string{}, MaxBatchSizeBytes: 3000000, + // To set this as default once `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is removed + //MaxBatchRequestParallelism: 5, TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(), BackOffConfig: retrySettings, AddMetricSuffixes: true, @@ -96,7 +111,7 @@ func createDefaultConfig() component.Config { RemoteWriteQueue: RemoteWriteQueue{ Enabled: true, QueueSize: 10000, - NumConsumers: 5, + NumConsumers: numConsumers, }, TargetInfo: &TargetInfo{ Enabled: true, diff --git a/exporter/prometheusremotewriteexporter/testdata/config.yaml b/exporter/prometheusremotewriteexporter/testdata/config.yaml index 7ea0bdf3e3b1..8c80bf21409d 100644 --- a/exporter/prometheusremotewriteexporter/testdata/config.yaml +++ b/exporter/prometheusremotewriteexporter/testdata/config.yaml @@ -2,6 +2,7 @@ prometheusremotewrite: prometheusremotewrite/2: namespace: "test-space" + max_batch_request_parallelism: 10 retry_on_failure: enabled: true initial_interval: 10s @@ -38,6 +39,10 @@ prometheusremotewrite/negative_num_consumers: queue_size: 5 num_consumers: -1 +prometheusremotewrite/less_than_1_max_batch_request_parallelism: + endpoint: "localhost:8888" + max_batch_request_parallelism: 0 + prometheusremotewrite/disabled_target_info: endpoint: "localhost:8888" target_info: