Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] feat: prom rw exporter add support for rw2 #35888

Open
wants to merge 96 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
8a0b38e
feat: prom rw exporter add support for rw2
jmichalek132 Oct 20, 2024
3d54298
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 21, 2024
4c4ef36
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
d5521bf
chore: try to reduce code duplication
jmichalek132 Oct 25, 2024
146bf31
chore: try to reduce code duplication 2
jmichalek132 Oct 25, 2024
07185a8
chore: removed batching for now to make PR smaller
jmichalek132 Oct 25, 2024
0cdd754
chore: addressed comments from PR
jmichalek132 Oct 25, 2024
b8052ce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
1b5d693
chore: implement enum for rw2 instead of bool
jmichalek132 Oct 25, 2024
c5518b0
chore: add feature flag for enabling support for rw2
jmichalek132 Oct 25, 2024
446e58a
chore: fix issues pointed out by linter
jmichalek132 Oct 25, 2024
a15354d
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Oct 26, 2024
da15f91
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
81805f3
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 28, 2024
e4ddae3
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
4340649
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
56c09c1
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
a44abba
refactor PushMetrics based on suggestion from review
jmichalek132 Oct 30, 2024
6e2a0d8
chore: undo unneeded changes
jmichalek132 Oct 30, 2024
8bd8bc7
chore: updated unit tests & added an rw2 case
jmichalek132 Oct 30, 2024
6b3bea6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
c8f832d
chore: ran make gci
jmichalek132 Oct 30, 2024
658ff26
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
cf476d6
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
eae3908
chore: fix typo in feature flag description
jmichalek132 Oct 31, 2024
7e18afb
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
aca5734
chore: small refactor based on review
jmichalek132 Oct 31, 2024
3abd46e
chore: update failing unit test
jmichalek132 Oct 31, 2024
9719de0
chore: updated readme first iteration
jmichalek132 Oct 31, 2024
7f2b253
chore: added changelog draft
jmichalek132 Oct 31, 2024
1770b82
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 31, 2024
047b4ba
chore: ran make generate
jmichalek132 Oct 31, 2024
48a3544
chore: moved v2 function to v2 file
jmichalek132 Oct 31, 2024
bc6c1dc
chore: delete empty file
jmichalek132 Oct 31, 2024
4f04e82
chore: import proto message from prometheus and validate it from config
jmichalek132 Oct 31, 2024
6be771d
chore: ran make generate
jmichalek132 Oct 31, 2024
cc1599c
chore: added check if flag enabled where missing
jmichalek132 Oct 31, 2024
a5bab73
chore: fix broken tests
jmichalek132 Oct 31, 2024
7917ee0
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 11, 2024
018fde1
chore: fixed go.sum after resolving conflicts
jmichalek132 Nov 11, 2024
7fd2376
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 11, 2024
a5cea35
chore: updqated changelong entry based on feedback
jmichalek132 Nov 11, 2024
0e8d921
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 11, 2024
7f283c0
chore: ran make gotidy
jmichalek132 Nov 11, 2024
e78df16
Update exporter/prometheusremotewriteexporter/factory.go
jmichalek132 Nov 12, 2024
5b7a338
chore: avoid setting feature flag for all tests
jmichalek132 Nov 12, 2024
9415ebe
chore: ran make gci
jmichalek132 Nov 12, 2024
5747e4a
chore: added log line with proto message
jmichalek132 Nov 12, 2024
f6eb6ab
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 12, 2024
e730d21
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 13, 2024
8180fa4
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
18f933d
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
1b6c001
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
4ed165b
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
322d135
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
7db3003
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
e53e521
chore: ran go mod tidy
jmichalek132 Nov 13, 2024
8e85d8c
chore: remove unnecessary config option
jmichalek132 Nov 13, 2024
b2198a8
chore: moved validation of proto message into correct place
jmichalek132 Nov 13, 2024
10345a6
chore: updated comment on exportV2 func
jmichalek132 Nov 13, 2024
d770a2c
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
40be25d
chore: ran make generate
jmichalek132 Nov 13, 2024
4b8ca99
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 14, 2024
f6505d1
chore: fix tests and v2 after rebase
jmichalek132 Nov 14, 2024
fdf6b7a
chore: fix linter errors
jmichalek132 Nov 14, 2024
95476c8
chore: ran make generate
jmichalek132 Nov 14, 2024
bfd072f
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 17, 2024
01d77a6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Nov 18, 2024
3ddd34e
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
50caa19
chore: cleanup for pipeline to pass after rebase
jmichalek132 Nov 20, 2024
f950fb4
chore: ran go mod tidy in prom recevier to fix pipeline
jmichalek132 Nov 20, 2024
a65bd59
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
56ba3ba
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
bf37b71
choreL ran make go tidy
jmichalek132 Nov 20, 2024
1dd7935
choreL ran make go tidy
jmichalek132 Nov 20, 2024
fe755ad
Update exporter/prometheusremotewriteexporter/exporter_v2.go
jmichalek132 Nov 27, 2024
217bf6a
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
71327df
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
4e11b39
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
0e29bad
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
cdb12dc
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
dcad87a
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
64f0223
chore: addressed feedback from review
jmichalek132 Nov 27, 2024
53dcdce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 27, 2024
23a5aae
chore: ran go mod tidy
jmichalek132 Nov 27, 2024
c031d95
chore: ran make gci
jmichalek132 Nov 27, 2024
1f9fac9
chore: explicitly put back buffer after usage
jmichalek132 Nov 27, 2024
deeb455
chore: fix linting issues
jmichalek132 Nov 27, 2024
b23d48d
chore: ran make generate
jmichalek132 Nov 27, 2024
fb4abec
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Dec 15, 2024
a037f25
chore: solving conflict
jmichalek132 Dec 15, 2024
e790e84
chore: solving conflict
jmichalek132 Dec 15, 2024
8a929c1
chore: solving conflict
jmichalek132 Dec 15, 2024
702ff70
chore: solving conflict
jmichalek132 Dec 15, 2024
6b8fafa
chore: solving conflict
jmichalek132 Dec 15, 2024
5312467
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent

import (
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -52,6 +53,9 @@ type Config struct {

// SendMetadata controls whether prometheus metadata will be generated and sent
SendMetadata bool `mapstructure:"send_metadata"`

// SendRW2 controls whether prometheus remote write v1 or v2 is sent
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
RemoteWriteProtoMsg prometheusremotewrite.RemoteWriteProtoMsg `mapstructure:"protobuf_message,omitempty"`
dashpole marked this conversation as resolved.
Show resolved Hide resolved
}

type CreatedMetric struct {
Expand Down
71 changes: 50 additions & 21 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
Expand Down Expand Up @@ -123,6 +124,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
ExportCreatedMetric: cfg.CreatedMetric.Enabled,
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
RemoteWriteProtoMsg: cfg.RemoteWriteProtoMsg,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
Expand Down Expand Up @@ -172,22 +174,43 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:
var tsMap map[string]*prompb.TimeSeries
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
var tsMapv2 map[string]*writev2.TimeSeries
var symbolsTable writev2.SymbolsTable
var m []*prompb.MetricMetadata
var err error

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
switch prwe.exporterSettings.RemoteWriteProtoMsg {
case prometheusremotewrite.RemoteWriteProtoMsgV1:
// RW1 case
tsMap, err = prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
}
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}
// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
case prometheusremotewrite.RemoteWriteProtoMsgV2:
// RW2 case

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RW2 case

tsMapv2, symbolsTable, err = prometheusremotewrite.FromMetricsV2(md, prwe.exporterSettings)

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMapv2))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
}
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMapv2)))
// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExportV2(ctx, symbolsTable, tsMapv2)
default:
return errors.New("unexpected rw protobuf message")
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand All @@ -208,7 +231,6 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
if err != nil {
Expand Down Expand Up @@ -256,7 +278,12 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
if !ok {
return
}
if errExecute := prwe.execute(ctx, request); errExecute != nil {
// Uses proto.Marshal to convert the WriteRequest into bytes array
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
data, errMarshal := proto.Marshal(request)
if errMarshal != nil {
errs = multierr.Append(errs, consumererror.NewPermanent(errMarshal))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: risk of race here (errs shared by multiple goroutines and not locked)

}
if errExecute := prwe.execute(ctx, data); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
Expand All @@ -270,12 +297,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
return errs
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
func (prwe *prwExporter) execute(ctx context.Context, data []byte) error {
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Therefore we always let Snappy decide the size of the buffer.
compressedData := snappy.Encode(nil, data)
Expand All @@ -300,10 +322,17 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

switch prwe.exporterSettings.RemoteWriteProtoMsg {
case prometheusremotewrite.RemoteWriteProtoMsgV1:
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
case prometheusremotewrite.RemoteWriteProtoMsgV2:
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
}

resp, err := prwe.client.Do(req)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,8 +1169,8 @@ func TestRetries(t *testing.T) {
Enabled: true,
},
}

err = exporter.execute(tt.ctx, &prompb.WriteRequest{})
data, _ := proto.Marshal(&prompb.WriteRequest{})
err = exporter.execute(tt.ctx, data)
tt.assertError(t, err)
tt.assertErrorType(t, err)
assert.Equal(t, tt.expectedAttempts, totalAttempts)
Expand Down
95 changes: 95 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"

import (
"context"
"math"
"sync"

"github.com/gogo/protobuf/proto"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/multierr"
)

// TODO update comment
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) exportV2(ctx context.Context, requests []*writev2.Request) error {
input := make(chan *writev2.Request, len(requests))
for _, request := range requests {
input <- request
}
close(input)

var wg sync.WaitGroup

concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests))))
wg.Add(concurrencyLimit) // used to wait for workers to be finished

var mu sync.Mutex
var errs error
// Run concurrencyLimit of workers until there
// is no more requests to execute in the input channel.
for i := 0; i < concurrencyLimit; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
return

case request, ok := <-input:
if !ok {
return
}

// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(request)
if errMarshal != nil {
errs = multierr.Append(errs, consumererror.NewPermanent(errMarshal))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't mu taken here, as on line 75? Is it an omission?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the lock is even necessary will test it and either remove it or add it to both.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's a race if you write to shared errs from multiple goroutines. You need to either lock it, or use channel or some other concurrency structure like errgroup. Locking like you did in 75 is fine.

}

if errExecute := prwe.execute(ctx, data); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
}
}
}
}()
}
wg.Wait()

return errs
}

func (prwe *prwExporter) handleExportV2(ctx context.Context, symbolsTable writev2.SymbolsTable, tsMap map[string]*writev2.TimeSeries) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
}

// TODO implement batching
requests := make([]*writev2.Request, 0)
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
tsArray := make([]writev2.TimeSeries, 0, len(tsMap))
for _, v := range tsMap {
tsArray = append(tsArray, *v)
}

requests = append(requests, &writev2.Request{
// TODO sort
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
//Timeseries: orderBySampleTimestamp(tsArray),
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
Timeseries: tsArray,
Symbols: symbolsTable.Symbols(),
})

// TODO implement WAl support, can be done after #15277 is fixed

return prwe.exportV2(ctx, requests)
}
Loading
Loading