From 1c2b59010cced049228a7fbc7fbfd2cc02402732 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Tue, 7 Mar 2023 10:52:32 +0100 Subject: [PATCH] feat prometheusIngester: add offset setting for query (#101) * feat prometheusIngester: add offset setting for query Signed-off-by: Martin Chodur * chore: changelog update Signed-off-by: Martin Chodur * fix: comment * Update CHANGELOG.md --------- Signed-off-by: Martin Chodur --- CHANGELOG.md | 4 ++ docs/modules/prometheus_ingester.md | 12 +++-- examples/prometheus/slo_exporter.yaml | 23 +++++--- .../prometheus_ingester.go | 22 ++++---- .../prometheus_ingester_test.go | 52 +++++++++++++++++-- pkg/prometheus_ingester/query_executor.go | 21 ++++---- 6 files changed, 96 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 968b1b1..4a61be0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## [v6.13.0] 2023-03-07 +### Added +- [#101](https://github.com/seznam/slo-exporter/pull/101) prometheusIngester add `offset` option to query, allowing to query bit older data to ensure consistency on prometheus compatible systems using remote write. + ## [v6.12.1] 2022-10-14 ### Added - [#95](https://github.com/seznam/slo-exporter/pull/95) prometheusIngester sends user-agent header diff --git a/docs/modules/prometheus_ingester.md b/docs/modules/prometheus_ingester.md index a5a8f96..2efdc44 100644 --- a/docs/modules/prometheus_ingester.md +++ b/docs/modules/prometheus_ingester.md @@ -1,10 +1,10 @@ # Prometheus ingester -| | | -|----------------|-------------------------| -| `moduleName` | `prometheusIngester` | -| Module type | `producer` | -| Output event | `raw` | +| | | +| ------------ | -------------------- | +| `moduleName` | `prometheusIngester` | +| Module type | `producer` | +| Output event | `raw` | Prometheus ingester generates events based on results of provided Prometheus queries. For its usage example see [the prometheus example](/examples/prometheus). @@ -56,6 +56,8 @@ type: '' resultAsQuantity: false # How often to execute the query. interval: +# Query data with given offset. Useful to ensure consistency when querying data coming from remote write. +offset: # Names of the labels that should be dropped from the result. dropLabels: - diff --git a/examples/prometheus/slo_exporter.yaml b/examples/prometheus/slo_exporter.yaml index af3e1df..82c873b 100644 --- a/examples/prometheus/slo_exporter.yaml +++ b/examples/prometheus/slo_exporter.yaml @@ -1,6 +1,14 @@ webServerListenAddress: "0.0.0.0:8080" -pipeline: ["prometheusIngester", "relabel", "eventKeyGenerator", "dynamicClassifier", "sloEventProducer", "prometheusExporter"] +pipeline: + [ + "prometheusIngester", + "relabel", + "eventKeyGenerator", + "dynamicClassifier", + "sloEventProducer", + "prometheusExporter", + ] modules: prometheusIngester: @@ -8,22 +16,23 @@ modules: httpHeaders: - name: X-Scope-OrgID value: "myOrganization" - - name: Authorization - valueFromEnv: - name: "SLO_EXPORTER_AUTH_TOKEN" - valuePrefix: "Bearer " + # - name: Authorization + # valueFromEnv: + # name: "SLO_EXPORTER_AUTH_TOKEN" + # valuePrefix: "Bearer " queryTimeout: 30s queries: # Generate events from counter for every HTTP request with status code for availability SLO. - type: counter_increase - query: 'prometheus_http_requests_total' + query: "prometheus_http_requests_total" interval: 30s + offset: 5m additionalLabels: event_type: http_request_result # Generate events from histogram for every HTTP request for latency SLO. - type: histogram_increase - query: 'prometheus_http_request_duration_seconds_bucket' + query: "prometheus_http_request_duration_seconds_bucket" interval: 30s additionalLabels: event_type: http_request_latency diff --git a/pkg/prometheus_ingester/prometheus_ingester.go b/pkg/prometheus_ingester/prometheus_ingester.go index 0538755..609ae1c 100644 --- a/pkg/prometheus_ingester/prometheus_ingester.go +++ b/pkg/prometheus_ingester/prometheus_ingester.go @@ -70,6 +70,7 @@ func validateQueryType(queryType queryType) error { type queryOptions struct { Query string Interval time.Duration + Offset time.Duration DropLabels []string AdditionalLabels stringmap.StringMap Type queryType @@ -135,7 +136,7 @@ type PrometheusIngesterConfig struct { } type PrometheusIngester struct { - queryExecutors *[]queryExecutor + queryExecutors []*queryExecutor queryTimeout time.Duration client api.Client api v1.API @@ -210,10 +211,7 @@ func newFalse() *bool { } func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*PrometheusIngester, error) { - var ( - queryExecutors = []queryExecutor{} - ingester = PrometheusIngester{} - ) + var ingester = PrometheusIngester{} headers, err := initConfig.HttpHeaders.toMap() if err != nil { @@ -232,7 +230,7 @@ func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*Prome } ingester = PrometheusIngester{ - queryExecutors: &queryExecutors, + queryExecutors: []*queryExecutor{}, queryTimeout: initConfig.QueryTimeout, client: client, api: v1.NewAPI(client), @@ -256,9 +254,9 @@ func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*Prome q.ResultAsQuantity = newFalse() } } - queryExecutors = append( - queryExecutors, - queryExecutor{ + ingester.queryExecutors = append( + ingester.queryExecutors, + &queryExecutor{ Query: q, queryTimeout: ingester.queryTimeout, eventsChan: ingester.outputChannel, @@ -285,11 +283,9 @@ func (i *PrometheusIngester) Run() { var wg sync.WaitGroup // Start all queries - for _, queryExecutor := range *i.queryExecutors { + for _, queryExecutor := range i.queryExecutors { wg.Add(1) - // declare local scope variable to prevent shadowing by the next iterations - qe := queryExecutor - go qe.run(queriesContext, &wg) + go queryExecutor.run(queriesContext, &wg) } <-i.shutdownChannel diff --git a/pkg/prometheus_ingester/prometheus_ingester_test.go b/pkg/prometheus_ingester/prometheus_ingester_test.go index 9a4f865..d8818e3 100644 --- a/pkg/prometheus_ingester/prometheus_ingester_test.go +++ b/pkg/prometheus_ingester/prometheus_ingester_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "os" "strings" "testing" @@ -21,8 +22,9 @@ import ( ) type MockedRoundTripper struct { - t *testing.T - result model.Value + t *testing.T + result model.Value + expectedTimestamp string } func (m *MockedRoundTripper) resultFabricator() string { @@ -46,6 +48,12 @@ func (m *MockedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error m.t.Error(err) return nil, err } + vals, err := url.ParseQuery(buf.String()) + assert.NoError(m.t, err) + + if m.expectedTimestamp != "" { + assert.Equal(m.t, m.expectedTimestamp, vals.Get("time")) + } response := m.resultFabricator() @@ -209,7 +217,7 @@ func Test_Ingests_Various_ModelTypes(t *testing.T) { } // Prepare the string interpretation of the actual results - assert.ElementsMatchf(t, tc.eventsProduced, actualEventResult, "Produced events doesnt match expected events", "actual", HttpRequestsToString(actualEventResult)) + assert.ElementsMatchf(t, tc.eventsProduced, actualEventResult, "Produced events doesn't match expected events", "actual", HttpRequestsToString(actualEventResult)) } } @@ -852,3 +860,41 @@ func Test_httpHeader_getValue(t *testing.T) { }) } } + +func Test_queryOffset(t *testing.T) { + type testCase struct { + name string + queryOpts queryOptions + expectError bool + } + + cases := []testCase{ + {name: "no offset expected", queryOpts: queryOptions{Query: "up", Interval: time.Second, Type: "simple"}}, + {name: "offset expected", queryOpts: queryOptions{Query: "up", Interval: time.Second, Type: "simple", Offset: time.Minute}}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ts := model.Now() + roundTripper := &MockedRoundTripper{ + t: t, + expectedTimestamp: ts.Add(-tc.queryOpts.Offset).String(), + result: &model.Scalar{Value: 1, Timestamp: 0}, + } + + ingester, err := New(PrometheusIngesterConfig{ + RoundTripper: roundTripper, + QueryTimeout: 400 * time.Millisecond, + Queries: []queryOptions{ + tc.queryOpts, + }, + }, logrus.New()) + if err != nil { + t.Error(err) + return + } + _, _, err = ingester.queryExecutors[0].execute(ts.Time()) + assert.NoError(t, err) + }) + } +} diff --git a/pkg/prometheus_ingester/query_executor.go b/pkg/prometheus_ingester/query_executor.go index 8777270..2540967 100644 --- a/pkg/prometheus_ingester/query_executor.go +++ b/pkg/prometheus_ingester/query_executor.go @@ -3,8 +3,6 @@ package prometheus_ingester import ( "context" "fmt" - "github.com/hashicorp/go-multierror" - "go.uber.org/atomic" "math" "sort" "strconv" @@ -12,6 +10,9 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/seznam/slo-exporter/pkg/event" @@ -73,8 +74,9 @@ func (q *queryExecutor) withRangeSelector(ts time.Time) string { return q.Query.Query + fmt.Sprintf("[%ds]", int64(rangeSelector.Seconds())) } -// execute query at provided timestamp ts -func (q *queryExecutor) execute(ts time.Time) (model.Value, error) { +// execute query at provided timestamp ts taking the configured query offser into account. Returns the actual timestamp with offset applied. +func (q *queryExecutor) execute(ts time.Time) (model.Value, time.Time, error) { + ts = ts.Add(-q.Query.Offset) q.queryInProgress.Store(true) defer q.queryInProgress.Store(false) timeoutCtx, cancel := context.WithTimeout(context.Background(), q.queryTimeout) @@ -93,7 +95,7 @@ func (q *queryExecutor) execute(ts time.Time) (model.Value, error) { case simpleQueryType: query = q.Query.Query default: - return nil, fmt.Errorf("unknown query type: '%s'", q.Query.Type) + return nil, ts, fmt.Errorf("unknown query type: '%s'", q.Query.Type) } start := time.Now() result, warnings, err = q.api.Query(timeoutCtx, query, ts) @@ -104,7 +106,7 @@ func (q *queryExecutor) execute(ts time.Time) (model.Value, error) { q.logger.WithField("query", query).Warnf("warnings in query execution: %+v", warnings) } - return result, err + return result, ts, err } func (q *queryExecutor) run(ctx context.Context, wg *sync.WaitGroup) { @@ -123,14 +125,13 @@ func (q *queryExecutor) run(ctx context.Context, wg *sync.WaitGroup) { q.logger.Warn("skipping query execution, previous query still in progress...") continue } - ts := time.Now() - result, err := q.execute(ts) + result, queryTs, err := q.execute(time.Now()) if err != nil { prometheusQueryFail.WithLabelValues(string(q.Query.Type)).Inc() q.logger.WithField("query", q.Query.Query).Errorf("failed querying Prometheus: '%+v'", err) continue } - err = q.ProcessResult(result, ts) + err = q.ProcessResult(result, queryTs) if err != nil { q.logger.WithField("query", q.Query.Query).Errorf("failed processing the query result: '%+v'", err) } @@ -246,7 +247,7 @@ func (q *queryExecutor) processHistogramIncrease(matrix model.Matrix, ts time.Ti for _, metric := range metricBucketIncreases { metricBuckets := make([]float64, len(metric)) i := 0 - for bucket, _ := range metric { + for bucket := range metric { metricBuckets[i] = bucket i++ }