Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] feat: prom rw exporter add support for rw2 #35888

Open
wants to merge 96 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
8a0b38e
feat: prom rw exporter add support for rw2
jmichalek132 Oct 20, 2024
3d54298
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 21, 2024
4c4ef36
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
d5521bf
chore: try to reduce code duplication
jmichalek132 Oct 25, 2024
146bf31
chore: try to reduce code duplication 2
jmichalek132 Oct 25, 2024
07185a8
chore: removed batching for now to make PR smaller
jmichalek132 Oct 25, 2024
0cdd754
chore: addressed comments from PR
jmichalek132 Oct 25, 2024
b8052ce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
1b5d693
chore: implement enum for rw2 instead of bool
jmichalek132 Oct 25, 2024
c5518b0
chore: add feature flag for enabling support for rw2
jmichalek132 Oct 25, 2024
446e58a
chore: fix issues pointed out by linter
jmichalek132 Oct 25, 2024
a15354d
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Oct 26, 2024
da15f91
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
81805f3
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 28, 2024
e4ddae3
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
4340649
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
56c09c1
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
a44abba
refactor PushMetrics based on suggestion from review
jmichalek132 Oct 30, 2024
6e2a0d8
chore: undo unneeded changes
jmichalek132 Oct 30, 2024
8bd8bc7
chore: updated unit tests & added an rw2 case
jmichalek132 Oct 30, 2024
6b3bea6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
c8f832d
chore: ran make gci
jmichalek132 Oct 30, 2024
658ff26
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
cf476d6
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
eae3908
chore: fix typo in feature flag description
jmichalek132 Oct 31, 2024
7e18afb
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
aca5734
chore: small refactor based on review
jmichalek132 Oct 31, 2024
3abd46e
chore: update failing unit test
jmichalek132 Oct 31, 2024
9719de0
chore: updated readme first iteration
jmichalek132 Oct 31, 2024
7f2b253
chore: added changelog draft
jmichalek132 Oct 31, 2024
1770b82
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 31, 2024
047b4ba
chore: ran make generate
jmichalek132 Oct 31, 2024
48a3544
chore: moved v2 function to v2 file
jmichalek132 Oct 31, 2024
bc6c1dc
chore: delete empty file
jmichalek132 Oct 31, 2024
4f04e82
chore: import proto message from prometheus and validate it from config
jmichalek132 Oct 31, 2024
6be771d
chore: ran make generate
jmichalek132 Oct 31, 2024
cc1599c
chore: added check if flag enabled where missing
jmichalek132 Oct 31, 2024
a5bab73
chore: fix broken tests
jmichalek132 Oct 31, 2024
7917ee0
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 11, 2024
018fde1
chore: fixed go.sum after resolving conflicts
jmichalek132 Nov 11, 2024
7fd2376
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 11, 2024
a5cea35
chore: updqated changelong entry based on feedback
jmichalek132 Nov 11, 2024
0e8d921
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 11, 2024
7f283c0
chore: ran make gotidy
jmichalek132 Nov 11, 2024
e78df16
Update exporter/prometheusremotewriteexporter/factory.go
jmichalek132 Nov 12, 2024
5b7a338
chore: avoid setting feature flag for all tests
jmichalek132 Nov 12, 2024
9415ebe
chore: ran make gci
jmichalek132 Nov 12, 2024
5747e4a
chore: added log line with proto message
jmichalek132 Nov 12, 2024
f6eb6ab
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 12, 2024
e730d21
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 13, 2024
8180fa4
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
18f933d
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
1b6c001
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
4ed165b
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
322d135
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
7db3003
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
e53e521
chore: ran go mod tidy
jmichalek132 Nov 13, 2024
8e85d8c
chore: remove unnecessary config option
jmichalek132 Nov 13, 2024
b2198a8
chore: moved validation of proto message into correct place
jmichalek132 Nov 13, 2024
10345a6
chore: updated comment on exportV2 func
jmichalek132 Nov 13, 2024
d770a2c
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
40be25d
chore: ran make generate
jmichalek132 Nov 13, 2024
4b8ca99
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 14, 2024
f6505d1
chore: fix tests and v2 after rebase
jmichalek132 Nov 14, 2024
fdf6b7a
chore: fix linter errors
jmichalek132 Nov 14, 2024
95476c8
chore: ran make generate
jmichalek132 Nov 14, 2024
bfd072f
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 17, 2024
01d77a6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Nov 18, 2024
3ddd34e
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
50caa19
chore: cleanup for pipeline to pass after rebase
jmichalek132 Nov 20, 2024
f950fb4
chore: ran go mod tidy in prom recevier to fix pipeline
jmichalek132 Nov 20, 2024
a65bd59
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
56ba3ba
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
bf37b71
choreL ran make go tidy
jmichalek132 Nov 20, 2024
1dd7935
choreL ran make go tidy
jmichalek132 Nov 20, 2024
fe755ad
Update exporter/prometheusremotewriteexporter/exporter_v2.go
jmichalek132 Nov 27, 2024
217bf6a
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
71327df
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
4e11b39
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
0e29bad
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
cdb12dc
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
dcad87a
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
64f0223
chore: addressed feedback from review
jmichalek132 Nov 27, 2024
53dcdce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 27, 2024
23a5aae
chore: ran go mod tidy
jmichalek132 Nov 27, 2024
c031d95
chore: ran make gci
jmichalek132 Nov 27, 2024
1f9fac9
chore: explicitly put back buffer after usage
jmichalek132 Nov 27, 2024
deeb455
chore: fix linting issues
jmichalek132 Nov 27, 2024
b23d48d
chore: ran make generate
jmichalek132 Nov 27, 2024
fb4abec
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Dec 15, 2024
a037f25
chore: solving conflict
jmichalek132 Dec 15, 2024
e790e84
chore: solving conflict
jmichalek132 Dec 15, 2024
8a929c1
chore: solving conflict
jmichalek132 Dec 15, 2024
702ff70
chore: solving conflict
jmichalek132 Dec 15, 2024
6b8fafa
chore: solving conflict
jmichalek132 Dec 15, 2024
5312467
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Config struct {

// SendMetadata controls whether prometheus metadata will be generated and sent
SendMetadata bool `mapstructure:"send_metadata"`

// SendRW2 controls whether prometheus remote write v1 or v2 is sent
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
SendRW2 bool `mapstructure:"send_remote_write_v2"`
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
}

type CreatedMetric struct {
Expand Down
39 changes: 28 additions & 11 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
ExportCreatedMetric: cfg.CreatedMetric.Enabled,
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
SendRW2: cfg.SendRW2,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
Expand Down Expand Up @@ -172,22 +173,38 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:
if !prwe.exporterSettings.SendRW2 {
// RW1 case
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
} else {
// RW2 case

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RW2 case

tsMap, symbolsTable, err := prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
// TODO handle metadata

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
return prwe.handleExportV2(ctx, tsMap, symbolsTable)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand Down
168 changes: 168 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"

import (
"bytes"
"context"
"fmt"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"io"
"math"
"net/http"
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/multierr"
)

func (prwe *prwExporter) handleExportV2(ctx context.Context, tsMap map[string]*writev2.TimeSeries, symbolsTable writev2.SymbolsTable) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeriesV2(tsMap, symbolsTable, prwe.maxBatchSizeBytes, &prwe.batchTimeSeriesState)
if err != nil {
return err
}

// TODO implement WAl support, can be done after #15277 is fixed

return prwe.exportV2(ctx, requests)
}

// TODO update comment
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Request) error {
input := make(chan *writev2.Request, len(requests))
for _, request := range requests {
input <- request
}
close(input)

var wg sync.WaitGroup

concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests))))
wg.Add(concurrencyLimit) // used to wait for workers to be finished

var mu sync.Mutex
var errs error
// Run concurrencyLimit of workers until there
// is no more requests to execute in the input channel.
for i := 0; i < concurrencyLimit; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
return

case request, ok := <-input:
if !ok {
return
}
if errExecute := prwe.executeV2(ctx, request); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
}
}
}
}()
}
wg.Wait()

return errs
}

func (prwe *prwExporter) executeV2(ctx context.Context, writeReq *writev2.Request) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Therefore we always let Snappy decide the size of the buffer.
compressedData := snappy.Encode(nil, data)

// executeFunc can be used for backoff and non backoff scenarios.
executeFunc := func() error {
// check there was no timeout in the component level to avoid retries
// to continue to run after a timeout
select {
case <-ctx.Done():
return backoff.Permanent(ctx.Err())
default:
// continue
}

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return backoff.Permanent(consumererror.NewPermanent(err))
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

resp, err := prwe.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}

body, err := io.ReadAll(io.LimitReader(resp.Body, 256))
rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body)
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
return rerr
}

// 429 errors are recoverable and the exporter should retry if RetryOnHTTP429 enabled
// Reference: https://github.com/prometheus/prometheus/pull/12677
if prwe.retryOnHTTP429 && resp.StatusCode == 429 {
return rerr
}

return backoff.Permanent(consumererror.NewPermanent(rerr))
}

var err error
if prwe.retrySettings.Enabled {
// Use the BackOff instance to retry the func with exponential backoff.
err = backoff.Retry(executeFunc, &backoff.ExponentialBackOff{
InitialInterval: prwe.retrySettings.InitialInterval,
RandomizationFactor: prwe.retrySettings.RandomizationFactor,
Multiplier: prwe.retrySettings.Multiplier,
MaxInterval: prwe.retrySettings.MaxInterval,
MaxElapsedTime: prwe.retrySettings.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
})
} else {
err = executeFunc()
}

if err != nil {
return consumererror.NewPermanent(err)
}

return err
}
Loading
Loading