Skip to content

Commit

Permalink
Add a source label to query stat metrics
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 2, 2025
1 parent 5020140 commit 9f10b72
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
11 changes: 10 additions & 1 deletion integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,8 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
for _, format := range []string{"protobuf", "json"} {
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
queryFrontendFlag := mergeFlags(flags, map[string]string{
"-ruler.query-response-format": format,
"-ruler.query-response-format": format,
"-frontend.query-stats-enabled": "true",
})
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
require.NoError(t, s.Start(queryFrontend))
Expand Down Expand Up @@ -1726,6 +1727,14 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_query_frontend_queries_total went up
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
// check query stat metrics
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_seconds_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_series_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_samples_scanned_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_peak_samples"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_chunks_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(0), []string{"cortex_query_fetched_data_bytes_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
})
}
}
Expand Down
40 changes: 21 additions & 19 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,48 +112,48 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_total",
Help: "Number of samples fetched to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
h.queryScannedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_scanned_total",
Help: "Number of samples scanned to execute a query.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_peak_samples",
Help: "Highest count of samples considered to execute a query.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"user"})
}, []string{"source", "user"})

h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})
}, []string{"source", "user"})

h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_data_bytes_total",
Help: "Size of all data fetched to execute a query in bytes.",
}, []string{"user"})
}, []string{"source", "user"})

h.rejectedQueries = promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_rejected_queries_total",
Help: "The total number of queries that were rejected.",
},
[]string{"reason", "user"},
[]string{"reason", "source", "user"},
)

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
Expand Down Expand Up @@ -218,7 +218,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
http.Error(w, err.Error(), statusCode)
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc()
source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc()
}
return
}
Expand Down Expand Up @@ -261,7 +262,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

f.reportQueryStats(r, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
}

hs := w.Header()
Expand Down Expand Up @@ -335,7 +337,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
func (f *Handler) reportQueryStats(r *http.Request, source, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
wallTime := stats.LoadWallTime()
queryStorageWallTime := stats.LoadQueryStorageWallTime()
numResponseSeries := stats.LoadResponseSeries()
Expand All @@ -353,13 +355,13 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
dataSelectMinTime := stats.LoadDataSelectMinTime()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples))
f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples))
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.querySeconds.WithLabelValues(source, userID).Add(wallTime.Seconds())
f.queryFetchedSeries.WithLabelValues(source, userID).Add(float64(numFetchedSeries))
f.queryFetchedSamples.WithLabelValues(source, userID).Add(float64(numFetchedSamples))
f.queryScannedSamples.WithLabelValues(source, userID).Add(float64(numScannedSamples))
f.queryPeakSamples.WithLabelValues(source, userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(source, userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(source, userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

var (
Expand Down Expand Up @@ -468,7 +470,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
}
}
if len(reason) > 0 {
f.rejectedQueries.WithLabelValues(reason, userID).Inc()
f.rejectedQueries.WithLabelValues(reason, source, userID).Inc()
stats.LimitHit = reason
}
}
Expand Down
25 changes: 13 additions & 12 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/codes"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
util_api "github.com/cortexproject/cortex/pkg/util/api"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusRequestEntityTooLarge,
Expand All @@ -226,7 +227,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusTooManyRequests,
Expand All @@ -242,7 +243,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -258,7 +259,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -274,7 +275,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -290,7 +291,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -306,7 +307,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -322,7 +323,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -338,7 +339,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -354,7 +355,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand All @@ -370,7 +371,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID))
assert.Equal(t, float64(1), v)
},
expectedStatusCode: http.StatusUnprocessableEntity,
Expand Down Expand Up @@ -498,7 +499,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
req.Header = testData.header
handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
data, err := io.ReadAll(outputBuf)
require.NoError(t, err)
require.Equal(t, testData.expectedLog+"\n", string(data))
Expand Down
7 changes: 3 additions & 4 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package tripperware

import (
"context"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -157,7 +156,7 @@ func NewQueryTripperware(
now := time.Now()
userStr := tenant.JoinTenantIDs(tenantIDs)
activeUsers.UpdateUserTimestamp(userStr, now)
source := getSource(r.Header.Get("User-Agent"))
source := GetSource(r.Header.Get("User-Agent"))
queriesPerTenant.WithLabelValues(op, source, userStr).Inc()

if maxSubQuerySteps > 0 && (isQuery || isQueryRange) {
Expand Down Expand Up @@ -243,11 +242,11 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) {
return q.codec.DecodeResponse(ctx, response, r)
}

func getSource(userAgent string) string {
func GetSource(userAgent string) string {
if strings.Contains(userAgent, RulerUserAgent) {
// caller is ruler
return SourceRuler
}

return SourceAPI
}
}

0 comments on commit 9f10b72

Please sign in to comment.