diff --git a/CHANGELOG.md b/CHANGELOG.md index 35069e81ed..f7068d100c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470 * [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 diff --git a/integration/ruler_test.go b/integration/ruler_test.go index d9e7362b5f..71015a54b5 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -1673,7 +1673,8 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) { for _, format := range []string{"protobuf", "json"} { t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) { queryFrontendFlag := mergeFlags(flags, map[string]string{ - "-ruler.query-response-format": format, + "-ruler.query-response-format": format, + "-frontend.query-stats-enabled": "true", }) queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "") require.NoError(t, s.Start(queryFrontend)) @@ -1726,6 +1727,14 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) { require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) // Check that cortex_query_frontend_queries_total went up require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + // check query stat metrics + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_seconds_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_series_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_scanned_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_peak_samples"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_chunks_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_data_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) }) } } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index bba995988d..68a0ae6144 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -112,23 +112,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_seconds_total", Help: "Total amount of wall clock time spend processing queries.", - }, []string{"user"}) + }, []string{"source", "user"}) h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_series_total", Help: "Number of series fetched to execute a query.", - }, []string{"user"}) + }, []string{"source", "user"}) h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_samples_total", Help: "Number of samples fetched to execute a query.", - }, []string{"user"}) + }, []string{"source", "user"}) // It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user. h.queryScannedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_samples_scanned_total", Help: "Number of samples scanned to execute a query.", - }, []string{"user"}) + }, []string{"source", "user"}) h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_query_peak_samples", @@ -136,24 +136,24 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, - }, []string{"user"}) + }, []string{"source", "user"}) h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_chunks_bytes_total", Help: "Size of all chunks fetched to execute a query in bytes.", - }, []string{"user"}) + }, []string{"source", "user"}) h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_data_bytes_total", Help: "Size of all data fetched to execute a query in bytes.", - }, []string{"user"}) + }, []string{"source", "user"}) h.rejectedQueries = promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Name: "cortex_rejected_queries_total", Help: "The total number of queries that were rejected.", }, - []string{"reason", "user"}, + []string{"reason", "source", "user"}, ) h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { @@ -218,7 +218,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } http.Error(w, err.Error(), statusCode) if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) { - f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc() + source := tripperware.GetSource(r.Header.Get("User-Agent")) + f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc() } return } @@ -261,7 +262,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - f.reportQueryStats(r, userID, queryString, queryResponseTime, stats, err, statusCode, resp) + source := tripperware.GetSource(r.Header.Get("User-Agent")) + f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp) } hs := w.Header() @@ -335,7 +337,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) } -func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) { +func (f *Handler) reportQueryStats(r *http.Request, source, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) { wallTime := stats.LoadWallTime() queryStorageWallTime := stats.LoadQueryStorageWallTime() numResponseSeries := stats.LoadResponseSeries() @@ -353,13 +355,13 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u dataSelectMinTime := stats.LoadDataSelectMinTime() // Track stats. - f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) - f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries)) - f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples)) - f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples)) - f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples)) - f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) - f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) + f.querySeconds.WithLabelValues(source, userID).Add(wallTime.Seconds()) + f.queryFetchedSeries.WithLabelValues(source, userID).Add(float64(numFetchedSeries)) + f.queryFetchedSamples.WithLabelValues(source, userID).Add(float64(numFetchedSamples)) + f.queryScannedSamples.WithLabelValues(source, userID).Add(float64(numScannedSamples)) + f.queryPeakSamples.WithLabelValues(source, userID).Observe(float64(numPeakSamples)) + f.queryChunkBytes.WithLabelValues(source, userID).Add(float64(numChunkBytes)) + f.queryDataBytes.WithLabelValues(source, userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) var ( @@ -468,7 +470,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u } } if len(reason) > 0 { - f.rejectedQueries.WithLabelValues(reason, userID).Inc() + f.rejectedQueries.WithLabelValues(reason, source, userID).Inc() stats.LimitHit = reason } } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 2c773d42d6..c21e1bf18e 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/querier/tripperware" util_api "github.com/cortexproject/cortex/pkg/util/api" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -210,7 +211,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusRequestEntityTooLarge, @@ -226,7 +227,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusTooManyRequests, @@ -242,7 +243,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -258,7 +259,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -274,7 +275,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -290,7 +291,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -306,7 +307,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -322,7 +323,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -338,7 +339,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -354,7 +355,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -370,7 +371,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -498,7 +499,7 @@ func TestReportQueryStatsFormat(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { req.Header = testData.header - handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp) + handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp) data, err := io.ReadAll(outputBuf) require.NoError(t, err) require.Equal(t, testData.expectedLog+"\n", string(data)) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index c6fcb18ce2..38945b9669 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -17,7 +17,6 @@ package tripperware import ( "context" - "fmt" "io" "net/http" "strings" @@ -157,7 +156,7 @@ func NewQueryTripperware( now := time.Now() userStr := tenant.JoinTenantIDs(tenantIDs) activeUsers.UpdateUserTimestamp(userStr, now) - source := getSource(r.Header.Get("User-Agent")) + source := GetSource(r.Header.Get("User-Agent")) queriesPerTenant.WithLabelValues(op, source, userStr).Inc() if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { @@ -243,11 +242,11 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) { return q.codec.DecodeResponse(ctx, response, r) } -func getSource(userAgent string) string { +func GetSource(userAgent string) string { if strings.Contains(userAgent, RulerUserAgent) { // caller is ruler return SourceRuler } return SourceAPI -} \ No newline at end of file +}