Skip to content

Commit

Permalink
Add -frontend.enabled-ruler-query-stats flag
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 13, 2025
1 parent 5dd1072 commit 3665b72
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
* [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6501
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4094,6 +4094,12 @@ The `query_frontend_config` configures the Cortex query-frontend.
# CLI flag: -frontend.query-stats-enabled
[query_stats_enabled: <boolean> | default = false]
# If enabled, report the log of the query stats for queries coming from the
# ruler to evaluate rules. It only takes effect when '-ruler.frontend-address'
# is configured.
# CLI flag: -frontend.enabled-ruler-query-stats
[enabled_ruler_query_stats-log: <boolean> | default = false]
# If a querier disconnects without sending notification about graceful shutdown,
# the query-frontend will keep the querier in the tenant's shard until the
# forget delay has passed. This feature is useful to reduce the blast radius
Expand Down
54 changes: 40 additions & 14 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ const (

// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
EnabledRulerQueryStatsLog bool `yaml:"enabled_ruler_query_stats-log"`
}

func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query.")
f.BoolVar(&cfg.EnabledRulerQueryStatsLog, "frontend.enabled-ruler-query-stats", false, "If enabled, report the log of the query stats for queries coming from the ruler to evaluate rules. It only takes effect when '-ruler.frontend-address' is configured.")
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand Down Expand Up @@ -226,10 +228,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

source := tripperware.GetSource(r.Header.Get("User-Agent"))
// Log request
if f.cfg.QueryStatsEnabled {
queryString = f.parseRequestQueryString(r, buf)
f.logQueryRequest(r, queryString)
f.logQueryRequest(r, queryString, source)
}

startTime := time.Now()
Expand Down Expand Up @@ -262,7 +265,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

source := tripperware.GetSource(r.Header.Get("User-Agent"))
f.reportQueryStats(r, source, userID, queryString, queryResponseTime, stats, err, statusCode, resp)
}

Expand Down Expand Up @@ -303,7 +305,7 @@ func formatGrafanaStatsFields(r *http.Request) []interface{} {
}

// logQueryRequest logs query request before query execution.
func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values) {
func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values, source string) {
logMessage := []interface{}{
"msg", "query request",
"component", "query-frontend",
Expand All @@ -314,9 +316,17 @@ func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values) {
if len(grafanaFields) > 0 {
logMessage = append(logMessage, grafanaFields...)
}
logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
switch source {
case tripperware.SourceAPI:
// always report a if source is not a `ruler`.
logMessage = append(logMessage, formatQueryString(queryString)...)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
case tripperware.SourceRuler:
if f.cfg.EnabledRulerQueryStatsLog {
logMessage = append(logMessage, formatQueryString(queryString)...)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
}
}

// reportSlowQuery reports slow queries.
Expand Down Expand Up @@ -435,11 +445,27 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
logMessage = append(logMessage, "error", s.Message())
}
}
logMessage = append(logMessage, formatQueryString(queryString)...)
if error != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

switch source {
case tripperware.SourceAPI:
// always report a if source is not a `ruler`.
logMessage = append(logMessage, formatQueryString(queryString)...)
if error != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
case tripperware.SourceRuler:
if f.cfg.EnabledRulerQueryStatsLog {
// report a log only if `-frontend.enabled-ruler-query-stats`
// is enabled when queries come from the Ruler.
logMessage = append(logMessage, formatQueryString(queryString)...)
if error != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
} else {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
}
}

var reason string
Expand Down
40 changes: 32 additions & 8 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,28 +413,31 @@ func TestHandler_ServeHTTP(t *testing.T) {
func TestReportQueryStatsFormat(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil)
userID := "fake"
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
resp := &http.Response{ContentLength: 1000}
responseTime := time.Second
statusCode := http.StatusOK

type testCase struct {
queryString url.Values
queryStats *querier_stats.QueryStats
header http.Header
responseErr error
expectedLog string
queryString url.Values
queryStats *querier_stats.QueryStats
header http.Header
responseErr error
expectedLog string
enabledRulerQueryStatsLog bool
source string
}

tests := map[string]testCase{
"should not include query and header details if empty": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`,
source: tripperware.SourceAPI,
},
"should include query length and string at the end": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`,
source: tripperware.SourceAPI,
},
"should include query stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -451,14 +454,17 @@ func TestReportQueryStatsFormat(t *testing.T) {
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`,
source: tripperware.SourceAPI,
},
"should include user agent": {
header: http.Header{"User-Agent": []string{"Grafana"}},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`,
source: tripperware.SourceAPI,
},
"should include response error": {
responseErr: errors.New("foo_err"),
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`,
source: tripperware.SourceAPI,
},
"should include query priority": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
Expand All @@ -467,6 +473,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
PriorityAssigned: true,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
source: tripperware.SourceAPI,
},
"should include data fetch min and max time": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
Expand All @@ -475,6 +482,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
DataSelectMinTime: 1704067200000,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
source: tripperware.SourceAPI,
},
"should include query stats with store gateway stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -493,16 +501,32 @@ func TestReportQueryStatsFormat(t *testing.T) {
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
source: tripperware.SourceAPI,
},
"should not report a log": {
expectedLog: ``,
source: tripperware.SourceRuler,
enabledRulerQueryStatsLog: false,
},
"should report a log": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`,
source: tripperware.SourceRuler,
enabledRulerQueryStatsLog: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, EnabledRulerQueryStatsLog: testData.enabledRulerQueryStatsLog}, http.DefaultTransport, logger, nil)
req.Header = testData.header
handler.reportQueryStats(req, tripperware.SourceAPI, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
handler.reportQueryStats(req, testData.source, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
data, err := io.ReadAll(outputBuf)
require.NoError(t, err)
require.Equal(t, testData.expectedLog+"\n", string(data))
if testData.expectedLog == "" {
require.Empty(t, string(data))
} else {
require.Equal(t, testData.expectedLog+"\n", string(data))
}
})
}
}

0 comments on commit 3665b72

Please sign in to comment.