-
Notifications
You must be signed in to change notification settings - Fork 510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a configurable list of TraceQL queries that are immediately 400'ed #3780
Changes from all commits
a564243
be0f7c8
27d2320
4f34840
6eb353d
84df15d
3b7ff49
a694532
ee974d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,6 +83,8 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo | |
cacheWare := pipeline.NewCachingWare(cacheProvider, cache.RoleFrontendSearch, logger) | ||
statusCodeWare := pipeline.NewStatusCodeAdjustWare() | ||
traceIDStatusCodeWare := pipeline.NewStatusCodeAdjustWareWithAllowedCode(http.StatusNotFound) | ||
searchQueryFilterWare := pipeline.NewTraceQueryFilterWareWithDenyList(cfg.Search.BlockedQueries, pipeline.ParseSearchRequestQuery) | ||
metricQueryFilterWate := pipeline.NewTraceQueryFilterWareWithDenyList(cfg.Metrics.BlockedQueries, pipeline.ParseMetricRangeRequestQuery) | ||
|
||
tracePipeline := pipeline.Build( | ||
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{ | ||
|
@@ -97,7 +99,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo | |
multiTenantMiddleware(cfg, logger), | ||
newAsyncSearchSharder(reader, o, cfg.Search.Sharder, logger), | ||
}, | ||
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare}, | ||
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare, searchQueryFilterWare}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. searchQueryFilterWare should be the first in the list of the AsyncMiddleware. this will allow it to refuse a request before we do any work. it does mean it needs to be shaped like: https://github.com/grafana/tempo/blob/main/modules/frontend/search_sharder.go#L51 |
||
next) | ||
|
||
searchTagsPipeline := pipeline.Build( | ||
|
@@ -121,7 +123,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo | |
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{ | ||
multiTenantUnsupportedMiddleware(cfg, logger), | ||
}, | ||
[]pipeline.Middleware{statusCodeWare, retryWare}, | ||
[]pipeline.Middleware{statusCodeWare, retryWare, metricQueryFilterWate}, | ||
next) | ||
|
||
// traceql metrics | ||
|
@@ -130,7 +132,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo | |
multiTenantMiddleware(cfg, logger), | ||
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, logger), | ||
}, | ||
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare}, | ||
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare, searchQueryFilterWare}, | ||
next) | ||
|
||
traces := newTraceIDHandler(cfg, o, tracePipeline, logger) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,105 @@ | ||||||
package pipeline | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"github.com/grafana/tempo/pkg/api" | ||||||
"io" | ||||||
"net/http" | ||||||
"regexp" | ||||||
"strings" | ||||||
"sync" | ||||||
) | ||||||
|
||||||
type traceQueryFilterWare struct { | ||||||
next http.RoundTripper | ||||||
filters []*regexp.Regexp | ||||||
parseFn func(r *http.Request) string | ||||||
} | ||||||
|
||||||
func NewTraceQueryFilterWare(next http.RoundTripper) http.RoundTripper { | ||||||
return &traceQueryFilterWare{ | ||||||
next: next, | ||||||
} | ||||||
} | ||||||
|
||||||
func NewTraceQueryFilterWareWithDenyList(denyList []string, parseFunc func(r *http.Request) string) Middleware { | ||||||
var filter []*regexp.Regexp | ||||||
for i := range denyList { | ||||||
exp, err := regexp.Compile(denyList[i]) | ||||||
if err == nil { | ||||||
filter = append(filter, exp) | ||||||
} | ||||||
} | ||||||
|
||||||
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { | ||||||
return traceQueryFilterWare{ | ||||||
next: next, | ||||||
filters: filter, | ||||||
parseFn: parseFunc, | ||||||
} | ||||||
}) | ||||||
} | ||||||
|
||||||
func (c traceQueryFilterWare) RoundTrip(req *http.Request) (*http.Response, error) { | ||||||
if c.filters == nil || len(c.filters) == 0 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in go
Suggested change
|
||||||
return c.next.RoundTrip(req) | ||||||
} | ||||||
|
||||||
query := c.parseFn(req) | ||||||
|
||||||
if len(query) == 0 || query == "" { | ||||||
return c.next.RoundTrip(req) | ||||||
} | ||||||
|
||||||
match := make(chan bool, len(c.filters)) | ||||||
wg := sync.WaitGroup{} | ||||||
for range c.filters { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for concurrency. let's keep this simple and just check them one at a time inline |
||||||
wg.Add(1) | ||||||
} | ||||||
|
||||||
go func(qry string) { | ||||||
defer wg.Done() | ||||||
for _, re := range c.filters { | ||||||
if re.MatchString(qry) { | ||||||
match <- true | ||||||
return | ||||||
} | ||||||
} | ||||||
match <- false | ||||||
}(query) | ||||||
|
||||||
go func() { | ||||||
wg.Wait() | ||||||
close(match) | ||||||
}() | ||||||
|
||||||
if <-match { | ||||||
|
||||||
return &http.Response{ | ||||||
StatusCode: http.StatusBadRequest, | ||||||
Status: http.StatusText(http.StatusBadRequest), | ||||||
Body: io.NopCloser(strings.NewReader("Query is temporarily blocked by your administrator.")), | ||||||
}, nil | ||||||
|
||||||
} | ||||||
return c.next.RoundTrip(req) | ||||||
} | ||||||
|
||||||
func ParseSearchRequestQuery(req *http.Request) string { | ||||||
|
||||||
//query, _ := api.ParseSearchTagValuesRequestV2(req) | ||||||
|
||||||
//query, _ := api.ParseSearchRequest(req) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be this one. |
||||||
|
||||||
query := req.URL.String() | ||||||
|
||||||
fmt.Printf("Parsed Query: %v\n", query) | ||||||
|
||||||
//return query.GetQuery() | ||||||
return query | ||||||
} | ||||||
|
||||||
func ParseMetricRangeRequestQuery(req *http.Request) string { | ||||||
query, _ := api.ParseSpanMetricsRequest(req) | ||||||
return query.GetQuery() | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package pipeline | ||
joe-elliott marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"fmt" | ||
"github.com/stretchr/testify/require" | ||
"io" | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"strings" | ||
"testing" | ||
) | ||
|
||
func TestTraceQueryFilterWareMetrics(t *testing.T) { | ||
|
||
} | ||
|
||
func TestTraceQueryFilterWareSearch(t *testing.T) { | ||
|
||
} | ||
|
||
func TestTraceQueryFilterWare(t *testing.T) { | ||
|
||
tests := []struct { | ||
name string | ||
query string | ||
denyList []string | ||
expectedResp *http.Response | ||
}{ | ||
{ | ||
name: "no query", | ||
query: "", | ||
denyList: []string{ | ||
"GET", | ||
"POST", | ||
}, | ||
expectedResp: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Status: http.StatusText(http.StatusOK), | ||
Body: io.NopCloser(strings.NewReader("foo")), | ||
}, | ||
}, | ||
{ | ||
name: "query matches regex", | ||
query: "span.http.method='GET'", | ||
denyList: []string{ | ||
"GET", | ||
}, | ||
expectedResp: &http.Response{ | ||
StatusCode: http.StatusBadRequest, | ||
Status: http.StatusText(http.StatusBadRequest), | ||
Body: io.NopCloser(strings.NewReader("Query is temporarily blocked by your administrator.")), | ||
}, | ||
}, | ||
{ | ||
name: "query does not match regex", | ||
query: "span.http.method='GET'", | ||
denyList: []string{ | ||
"status", | ||
"start", | ||
}, | ||
expectedResp: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Status: http.StatusText(http.StatusOK), | ||
Body: io.NopCloser(strings.NewReader("foo")), | ||
}, | ||
}, | ||
{ | ||
name: "empty deny list", | ||
query: "service.name=cart", | ||
denyList: []string{}, | ||
expectedResp: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Status: http.StatusText(http.StatusOK), | ||
Body: io.NopCloser(strings.NewReader("foo")), | ||
}, | ||
}, | ||
{ | ||
name: "query matches multiple patterns", | ||
query: "span.http.method='GET'&&span.http.status_code>=200", | ||
denyList: []string{ | ||
"GET", | ||
"span", | ||
"200", | ||
}, | ||
expectedResp: &http.Response{ | ||
StatusCode: http.StatusBadRequest, | ||
Status: http.StatusText(http.StatusBadRequest), | ||
Body: io.NopCloser(strings.NewReader("Query is temporarily blocked by your administrator.")), | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
u := buildSearchTagValuesQueryUrl("service.name", tc.query) | ||
req := httptest.NewRequest("GET", u, nil) | ||
test := NewTraceQueryFilterWareWithDenyList(tc.denyList, ParseSearchRequestQuery) | ||
next := RoundTripperFunc(func(req *http.Request) (*http.Response, error) { | ||
resp := &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Status: http.StatusText(http.StatusOK), | ||
Body: io.NopCloser(strings.NewReader("foo")), | ||
} | ||
return resp, nil | ||
}) | ||
|
||
rt, err := test.Wrap(next).RoundTrip(req) | ||
|
||
require.NoError(t, err) | ||
|
||
require.Equal(t, tc.expectedResp, rt) | ||
|
||
}) | ||
|
||
} | ||
} | ||
|
||
func TestTraceQueryFilterWareNoDenyList(t *testing.T) { | ||
|
||
} | ||
|
||
func buildSearchTagsQueryUrl(query string) string { | ||
joinURL, _ := url.Parse("http://localhost:3200/api/v2/search/tags") | ||
q := joinURL.Query() | ||
q.Set("q", query) | ||
joinURL.RawQuery = q.Encode() | ||
return fmt.Sprint(joinURL) | ||
} | ||
|
||
func buildSearchTagValuesQueryUrl(key string, query string) string { | ||
urlPath := fmt.Sprintf("/api/v2/search/tag/%s/values", key) | ||
joinURL, _ := url.Parse("http://localhost:3200" + urlPath + "?") | ||
q := joinURL.Query() | ||
q.Set("q", query) | ||
joinURL.RawQuery = q.Encode() | ||
return fmt.Sprint(joinURL) | ||
} | ||
|
||
func buildMetricQueryUrl(query string) string { | ||
urlPath := "/api/metrics/query_range" | ||
joinURL, _ := url.Parse("http://localhost:3200" + urlPath + "?") | ||
q := joinURL.Query() | ||
q.Set("q", query) | ||
joinURL.RawQuery = q.Encode() | ||
return fmt.Sprint(joinURL) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.