diff --git a/CHANGELOG.md b/CHANGELOG.md index 81ddc29d5d7..746bd9a6c47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott) **BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing. * [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar) +* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott) # v2.6.0-rc.0 diff --git a/cmd/tempo-cli/cmd-query-metrics-query-range.go b/cmd/tempo-cli/cmd-query-metrics-query-range.go index 56d06aa5dad..7f15b766b81 100644 --- a/cmd/tempo-cli/cmd-query-metrics-query-range.go +++ b/cmd/tempo-cli/cmd-query-metrics-query-range.go @@ -114,7 +114,7 @@ func (cmd *metricsQueryCmd) queryRangeHTTP(req *tempopb.QueryRangeRequest) error return err } - httpReq = api.BuildQueryRangeRequest(httpReq, req) + httpReq = api.BuildQueryRangeRequest(httpReq, req, "") httpReq.Header = http.Header{} err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq) if err != nil { diff --git a/modules/frontend/metrics_query_handler.go b/modules/frontend/metrics_query_handler.go index 2038d5887eb..46a83816039 100644 --- a/modules/frontend/metrics_query_handler.go +++ b/modules/frontend/metrics_query_handler.go @@ -45,7 +45,7 @@ func newQueryInstantStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTri URL: &url.URL{Path: downstreamPath}, Header: http.Header{}, Body: io.NopCloser(bytes.NewReader([]byte{})), - }, qr) + }, qr, "") // dedicated cols are never passed from the caller httpReq = httpReq.Clone(ctx) var finalResponse *tempopb.QueryInstantResponse @@ -110,7 +110,7 @@ func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripp // Clone existing to keep it unaltered. req = req.Clone(req.Context()) req.URL.Path = strings.ReplaceAll(req.URL.Path, api.PathMetricsQueryInstant, api.PathMetricsQueryRange) - req = api.BuildQueryRangeRequest(req, qr) + req = api.BuildQueryRangeRequest(req, qr, "") // dedicated cols are never passed from the caller combiner, err := combiner.NewTypedQueryRange(qr, false) if err != nil { diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index dd8e760a466..4c0ed89afe1 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -29,7 +29,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp URL: &url.URL{Path: downstreamPath}, Header: http.Header{}, Body: io.NopCloser(bytes.NewReader([]byte{})), - }, req) + }, req, "") // dedicated cols are never passed from the caller ctx := srv.Context() httpReq = httpReq.WithContext(ctx) diff --git a/modules/frontend/metrics_query_range_handler_test.go b/modules/frontend/metrics_query_range_handler_test.go index 19c50dac201..43fcd5ae20a 100644 --- a/modules/frontend/metrics_query_range_handler_test.go +++ b/modules/frontend/metrics_query_range_handler_test.go @@ -56,7 +56,7 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) { Start: uint64(1100 * time.Second), End: uint64(1200 * time.Second), Step: uint64(100 * time.Second), - }) + }, "") ctx := user.InjectOrgID(httpReq.Context(), tenant) httpReq = httpReq.WithContext(ctx) diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 0357b480159..eaa57cab2e6 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -208,6 +208,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s defer close(reqCh) queryHash := hashForQueryRangeRequest(&searchReq) + colsToJSON := api.NewDedicatedColumnsToJSON() exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas))) for _, m := range metas { @@ -231,7 +232,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { subR := parent.Clone(ctx) - dc, err := m.DedicatedColumns.ToTempopb() + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) if err != nil { // errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) continue @@ -253,18 +254,18 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s Step: step, QueryMode: searchReq.QueryMode, // New RF1 fields - BlockID: m.BlockID.String(), - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Version: m.Version, - Encoding: m.Encoding.String(), - Size_: m.Size, - FooterSize: m.FooterSize, - DedicatedColumns: dc, - Exemplars: exemplars, + BlockID: m.BlockID.String(), + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Version: m.Version, + Encoding: m.Encoding.String(), + Size_: m.Size, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + Exemplars: exemplars, } - subR = api.BuildQueryRangeRequest(subR, queryRangeReq) + subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON) prepareRequestForQueriers(subR, tenantID) pipelineR := pipeline.NewHTTPRequest(subR) @@ -302,16 +303,11 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest searchReq.QueryMode = querier.QueryModeRecent searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this - req := s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID) - - return req -} - -func (s *queryRangeSharder) toUpstreamRequest(ctx context.Context, req tempopb.QueryRangeRequest, parent *http.Request, tenantID string) *http.Request { - subR := parent.Clone(ctx) - subR = api.BuildQueryRangeRequest(subR, &req) + subR := parent.Clone(parent.Context()) + subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators prepareRequestForQueriers(subR, tenantID) + return subR } diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index e87d0b0f96d..9278d7d74fc 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -303,6 +303,7 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req defer close(reqCh) queryHash := hashForSearchRequest(searchReq) + colsToJSON := api.NewDedicatedColumnsToJSON() for _, m := range metas { pages := pagesPerRequest(m, bytesPerRequest) @@ -314,25 +315,25 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { subR := parent.Clone(ctx) - dc, err := m.DedicatedColumns.ToTempopb() + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) if err != nil { errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) continue } subR, err = api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{ - BlockID: blockID, - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Encoding: m.Encoding.String(), - IndexPageSize: m.IndexPageSize, - TotalRecords: m.TotalRecords, - DataEncoding: m.DataEncoding, - Version: m.Version, - Size_: m.Size, - FooterSize: m.FooterSize, - DedicatedColumns: dc, - }) + BlockID: blockID, + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Encoding: m.Encoding.String(), + IndexPageSize: m.IndexPageSize, + TotalRecords: m.TotalRecords, + DataEncoding: m.DataEncoding, + Version: m.Version, + Size_: m.Size, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + }, dedColsJSON) if err != nil { errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) continue diff --git a/modules/querier/external/client.go b/modules/querier/external/client.go index 3b77edad92c..1be7dfa1f61 100644 --- a/modules/querier/external/client.go +++ b/modules/querier/external/client.go @@ -3,6 +3,7 @@ package external import ( "bytes" "context" + "encoding/json" "fmt" "io" "math/rand" @@ -149,7 +150,11 @@ func (s *Client) Search(ctx context.Context, maxBytes int, searchReq *tempopb.Se if err != nil { return nil, fmt.Errorf("external endpoint failed to make new request: %w", err) } - req, err = api.BuildSearchBlockRequest(req, searchReq) + columnsJSON, err := json.Marshal(searchReq.DedicatedColumns) + if err != nil { + return nil, err + } + req, err = api.BuildSearchBlockRequest(req, searchReq, string(columnsJSON)) if err != nil { return nil, fmt.Errorf("external endpoint failed to build search block request: %w", err) } diff --git a/pkg/api/dedicated_columns_to_json.go b/pkg/api/dedicated_columns_to_json.go new file mode 100644 index 00000000000..457ab161566 --- /dev/null +++ b/pkg/api/dedicated_columns_to_json.go @@ -0,0 +1,44 @@ +package api + +import ( + "encoding/json" + "unsafe" + + "github.com/grafana/tempo/tempodb/backend" +) + +type DedicatedColumnsToJSON struct { + columnsToJSON map[uint64]string +} + +func NewDedicatedColumnsToJSON() *DedicatedColumnsToJSON { + return &DedicatedColumnsToJSON{ + columnsToJSON: make(map[uint64]string), + } +} + +func (d *DedicatedColumnsToJSON) JSONForDedicatedColumns(cols backend.DedicatedColumns) (string, error) { + if len(cols) == 0 { + return "", nil + } + + hash := cols.Hash() + if jsonString, ok := d.columnsToJSON[hash]; ok { + return jsonString, nil + } + + proto, err := cols.ToTempopb() + if err != nil { + return "", err + } + + jsonBytes, err := json.Marshal(proto) + if err != nil { + return "", err + } + + jsonString := unsafe.String(unsafe.SliceData(jsonBytes), len(jsonBytes)) + d.columnsToJSON[hash] = jsonString + + return jsonString, nil +} diff --git a/pkg/api/dedicated_columns_to_json_test.go b/pkg/api/dedicated_columns_to_json_test.go new file mode 100644 index 00000000000..b6fc9b70cad --- /dev/null +++ b/pkg/api/dedicated_columns_to_json_test.go @@ -0,0 +1,66 @@ +package api + +import ( + "encoding/json" + "math/rand/v2" + "testing" + + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/stretchr/testify/require" +) + +func TestDedicatedColumnsToJson(t *testing.T) { + d := NewDedicatedColumnsToJSON() + + testCols := []backend.DedicatedColumns{} + for i := 0; i < 10; i++ { + testCols = append(testCols, randoDedicatedCols()) + } + + // do all test cols 2x to test caching + for i := 0; i < 2; i++ { + for _, cols := range testCols { + expectedJSON := dedicatedColsToJSON(t, cols) + actualJSON, err := d.JSONForDedicatedColumns(cols) + require.NoError(t, err) + + require.Equal(t, expectedJSON, actualJSON, "iteration %d, cols: %v", i, cols) + } + } +} + +func dedicatedColsToJSON(t *testing.T, cols backend.DedicatedColumns) string { + t.Helper() + + proto, err := cols.ToTempopb() + require.NoError(t, err) + + jsonBytes, err := json.Marshal(proto) + require.NoError(t, err) + + return string(jsonBytes) +} + +// randoDedicatedCols generates a random set of cols for testing +func randoDedicatedCols() backend.DedicatedColumns { + colCount := rand.IntN(5) + 1 + ret := make([]backend.DedicatedColumn, 0, colCount) + + for i := 0; i < colCount; i++ { + scope := backend.DedicatedColumnScopeSpan + if rand.IntN(2) == 0 { + scope = backend.DedicatedColumnScopeResource + } + + col := backend.DedicatedColumn{ + Scope: scope, + Name: test.RandomString(), + Type: backend.DedicatedColumnTypeString, + } + + ret = append(ret, col) + } + + return ret +} diff --git a/pkg/api/http.go b/pkg/api/http.go index f0753ebc07e..2a4ed06c0f2 100644 --- a/pkg/api/http.go +++ b/pkg/api/http.go @@ -442,7 +442,10 @@ func BuildQueryInstantRequest(req *http.Request, searchReq *tempopb.QueryInstant return req } -func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequest) *http.Request { +// BuildQueryRangeRequest takes a tempopb.QueryRangeRequest and populates the passed http.Request +// dedicatedColumnsJSON should be generated using the DedicatedColumnsToJSON struct which produces the expected string +// value and memoizes results to prevent redundant marshaling. +func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequest, dedicatedColumnsJSON string) *http.Request { if req == nil { req = &http.Request{ URL: &url.URL{}, @@ -466,9 +469,9 @@ func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequ qb.addParam(urlParamEncoding, searchReq.Encoding) qb.addParam(urlParamSize, strconv.Itoa(int(searchReq.Size_))) qb.addParam(urlParamFooterSize, strconv.Itoa(int(searchReq.FooterSize))) - if len(searchReq.DedicatedColumns) > 0 { - columnsJSON, _ := json.Marshal(searchReq.DedicatedColumns) - qb.addParam(urlParamDedicatedColumns, string(columnsJSON)) + + if len(dedicatedColumnsJSON) > 0 && dedicatedColumnsJSON != "null" { // if a caller marshals a nil dedicated cols we will receive the string "null" + qb.addParam(urlParamDedicatedColumns, dedicatedColumnsJSON) } if len(searchReq.Query) > 0 { @@ -642,7 +645,9 @@ func BuildSearchRequest(req *http.Request, searchReq *tempopb.SearchRequest) (*h // BuildSearchBlockRequest takes a tempopb.SearchBlockRequest and populates the passed http.Request // with the appropriate params. If no http.Request is provided a new one is created. -func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest) (*http.Request, error) { +// dedicatedColumnsJSON should be generated using the DedicatedColumnsToJSON struct which produces the expected string +// value and memoizes results to prevent redundant marshaling. +func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest, dedicatedColumnsJSON string) (*http.Request, error) { if req == nil { req = &http.Request{ URL: &url.URL{}, @@ -665,12 +670,8 @@ func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRe qb.addParam(urlParamDataEncoding, searchReq.DataEncoding) qb.addParam(urlParamVersion, searchReq.Version) qb.addParam(urlParamFooterSize, strconv.FormatUint(uint64(searchReq.FooterSize), 10)) - if len(searchReq.DedicatedColumns) > 0 { - columnsJSON, err := json.Marshal(searchReq.DedicatedColumns) - if err != nil { - return nil, err - } - qb.addParam(urlParamDedicatedColumns, string(columnsJSON)) + if len(dedicatedColumnsJSON) > 0 && dedicatedColumnsJSON != "null" { // if a caller marshals a nil dedicated cols we will receive the string "null" + qb.addParam(urlParamDedicatedColumns, dedicatedColumnsJSON) } req.URL.RawQuery = qb.query() diff --git a/pkg/api/http_test.go b/pkg/api/http_test.go index 0204f03dd21..6f340ebd38e 100644 --- a/pkg/api/http_test.go +++ b/pkg/api/http_test.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -477,7 +478,10 @@ func TestBuildSearchBlockRequest(t *testing.T) { } for _, tc := range tests { - actualURL, err := BuildSearchBlockRequest(tc.httpReq, tc.req) + jsonBytes, err := json.Marshal(tc.req.DedicatedColumns) + require.NoError(t, err) + + actualURL, err := BuildSearchBlockRequest(tc.httpReq, tc.req, string(jsonBytes)) assert.NoError(t, err) assert.Equal(t, tc.query, actualURL.URL.String()) } @@ -716,7 +720,10 @@ func TestQueryRangeRoundtrip(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - httpReq := BuildQueryRangeRequest(nil, tc.req) + jsonBytes, err := json.Marshal(tc.req.DedicatedColumns) + require.NoError(t, err) + + httpReq := BuildQueryRangeRequest(nil, tc.req, string(jsonBytes)) actualReq, err := ParseQueryRangeRequest(httpReq) require.NoError(t, err) assert.Equal(t, tc.req, actualReq) diff --git a/pkg/api/query_builder.go b/pkg/api/query_builder.go index 4addb11dea0..7a43191ffec 100644 --- a/pkg/api/query_builder.go +++ b/pkg/api/query_builder.go @@ -19,17 +19,21 @@ func newQueryBuilder(init string) *queryBuilder { return qb } -// jpe - test me - // addParam adds a new key/val pair to the query // like https://cs.opensource.google/go/go/+/refs/tags/go1.22.5:src/net/url/url.go;l=972 func (qb *queryBuilder) addParam(key, value string) { if qb.builder.Len() > 0 { qb.builder.WriteByte('&') } - qb.builder.WriteString(url.QueryEscape(key)) + + keyStr := url.QueryEscape(key) + valueStr := url.QueryEscape(value) + + qb.builder.Grow(len(keyStr) + len(valueStr) + 1) + + qb.builder.WriteString(keyStr) qb.builder.WriteByte('=') - qb.builder.WriteString(url.QueryEscape(value)) + qb.builder.WriteString(valueStr) } func (qb *queryBuilder) query() string { diff --git a/pkg/api/query_builder_test.go b/pkg/api/query_builder_test.go new file mode 100644 index 00000000000..159c6086002 --- /dev/null +++ b/pkg/api/query_builder_test.go @@ -0,0 +1,38 @@ +package api + +import ( + "math/rand/v2" + "net/url" + "testing" + + "github.com/grafana/tempo/pkg/util/test" + "github.com/stretchr/testify/require" +) + +func TestQueryBuilder(t *testing.T) { + numParams := rand.IntN(10) + 1 + + qb := newQueryBuilder("") + params := url.Values{} + + for i := 0; i < numParams; i++ { + key := test.RandomString() + value := test.RandomString() + + qb.addParam(key, value) + params.Add(key, value) + } + + // url sorts params but query builder does not. parse the query builder + // string with url to guarantee its valid and also to sort the params + actualQuery, err := url.ParseQuery(qb.query()) + require.NoError(t, err) + + actual := url.URL{} + actual.RawQuery = actualQuery.Encode() + + expected := url.URL{} + expected.RawQuery = params.Encode() + + require.Equal(t, expected.RawQuery, actual.RawQuery) +}