Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source label to query stats metrics #6470

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
11 changes: 10 additions & 1 deletion integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,8 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
for _, format := range []string{"protobuf", "json"} {
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
queryFrontendFlag := mergeFlags(flags, map[string]string{
"-ruler.query-response-format": format,
"-ruler.query-response-format": format,
"-frontend.query-stats-enabled": "true",
})
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
require.NoError(t, s.Start(queryFrontend))
Expand Down Expand Up @@ -1726,6 +1727,14 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_query_frontend_queries_total went up
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
// check query stat metrics
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_seconds_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_series_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_scanned_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_peak_samples"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_chunks_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_data_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
})
}
}
Expand Down
40 changes: 21 additions & 19 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,48 +112,48 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_total",
Help: "Number of samples fetched to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
h.queryScannedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_scanned_total",
Help: "Number of samples scanned to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_peak_samples",
Help: "Highest count of samples considered to execute a query.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"user"})
}, []string{"source", "user"})

h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_data_bytes_total",
Help: "Size of all data fetched to execute a query in bytes.",
}, []string{"user"})
}, []string{"source", "user"})

h.rejectedQueries = promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_rejected_queries_total",
Help: "The total number of queries that were rejected.",
},
[]string{"reason", "user"},
[]string{"reason", "source", "user"},
)

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
Expand Down Expand Up @@ -218,7 +218,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
http.Error(w, err.Error(), statusCode)
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc()
source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc()
Comment on lines +221 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow any different "user-agent" as source, we may have cardinality problem that could cause metrics explosions.

Maybe we could just sub categorize the ruler UA and Others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source := tripperware.GetSource(r.Header.Get("User-Agent"))

GetSource already categorizes it to API and Ruler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice! LGTM

}
return
}
Expand Down Expand Up @@ -261,7 +262,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

f.reportQueryStats(r, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
}

hs := w.Header()
Expand Down Expand Up @@ -335,7 +337,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
func (f *Handler) reportQueryStats(r *http.Request, source, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
wallTime := stats.LoadWallTime()
queryStorageWallTime := stats.LoadQueryStorageWallTime()
numResponseSeries := stats.LoadResponseSeries()
Expand All @@ -353,13 +355,13 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
dataSelectMinTime := stats.LoadDataSelectMinTime()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples))
f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples))
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.querySeconds.WithLabelValues(source, userID).Add(wallTime.Seconds())
f.queryFetchedSeries.WithLabelValues(source, userID).Add(float64(numFetchedSeries))
f.queryFetchedSamples.WithLabelValues(source, userID).Add(float64(numFetchedSamples))
f.queryScannedSamples.WithLabelValues(source, userID).Add(float64(numScannedSamples))
f.queryPeakSamples.WithLabelValues(source, userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(source, userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(source, userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

var (
Expand Down Expand Up @@ -468,7 +470,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
}
}
if len(reason) > 0 {
f.rejectedQueries.WithLabelValues(reason, userID).Inc()
f.rejectedQueries.WithLabelValues(reason, source, userID).Inc()
stats.LimitHit = reason
}
}
Expand Down
25 changes: 13 additions & 12 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/codes"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
util_api "github.com/cortexproject/cortex/pkg/util/api"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusRequestEntityTooLarge,
Expand All @@ -226,7 +227,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusTooManyRequests,
Expand All @@ -242,7 +243,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -258,7 +259,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -274,7 +275,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -290,7 +291,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -306,7 +307,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -322,7 +323,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -338,7 +339,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -354,7 +355,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -370,7 +371,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand Down Expand Up @@ -498,7 +499,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
req.Header = testData.header
handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
data, err := io.ReadAll(outputBuf)
require.NoError(t, err)
require.Equal(t, testData.expectedLog+"\n", string(data))
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package tripperware

import (
"context"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -157,7 +156,7 @@ func NewQueryTripperware(
now := time.Now()
userStr := tenant.JoinTenantIDs(tenantIDs)
activeUsers.UpdateUserTimestamp(userStr, now)
source := getSource(r.Header.Get("User-Agent"))
source := GetSource(r.Header.Get("User-Agent"))
queriesPerTenant.WithLabelValues(op, source, userStr).Inc()

if maxSubQuerySteps > 0 && (isQuery || isQueryRange) {
Expand Down Expand Up @@ -243,8 +242,7 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) {
return q.codec.DecodeResponse(ctx, response, r)
}

func getSource(userAgent string) string {
fmt.Println("userAgent", userAgent)
func GetSource(userAgent string) string {
if strings.Contains(userAgent, RulerUserAgent) {
// caller is ruler
return SourceRuler
Expand Down
Loading