From d171ffb91c760f45d3930ddfc372c6a846ae4a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Fri, 10 Nov 2023 19:04:30 +0800 Subject: [PATCH] filter manager: part 2, support local reply (#19) Signed-off-by: spacewander --- .golangci.yml | 6 +- Makefile | 3 + pkg/filtermanager/api/api.go | 63 ++- pkg/filtermanager/api/result_action.go | 23 + pkg/filtermanager/filtermanager.go | 238 +++++++--- plugins/casbin/filter.go | 8 +- plugins/casbin/filter_test.go | 9 +- plugins/demo/filter.go | 3 +- plugins/opa/filter.go | 12 +- plugins/opa/filter_test.go | 9 +- .../integration/plugins/filtermanager_test.go | 418 ++++++++++++++++++ tests/integration/plugins/test_plugins.go | 144 +++++- 12 files changed, 825 insertions(+), 111 deletions(-) create mode 100644 pkg/filtermanager/api/result_action.go diff --git a/.golangci.yml b/.golangci.yml index 45243efc..cd7cc67a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -30,8 +30,10 @@ linters-settings: package-average: 10 skip-tests: true gocritic: - enabled-tags: - - diagnostic + disabled-tags: + - style + - experimental + - opinionated unparam: check-exported: false diff --git a/Makefile b/Makefile index 2f1094e1..8f941b2e 100644 --- a/Makefile +++ b/Makefile @@ -131,3 +131,6 @@ lint-spell: build-dev-tools .PHONY: lint-spell-local lint-spell-local: codespell --skip '.git,.idea,test-envoy,go.mod,go.sum,*.svg' --check-filenames --check-hidden --ignore-words ./.ignore_words + +.PHONY: lint +lint: lint-go fmt-go lint-spell diff --git a/pkg/filtermanager/api/api.go b/pkg/filtermanager/api/api.go index 023190b4..890566b0 100644 --- a/pkg/filtermanager/api/api.go +++ b/pkg/filtermanager/api/api.go @@ -4,42 +4,67 @@ import ( "github.com/envoyproxy/envoy/contrib/golang/common/go/api" ) +type DecodeWholeRequestFilter interface { + NeedDecodeWholeRequest(headers api.RequestHeaderMap) bool + DecodeRequest(headers api.RequestHeaderMap, data api.BufferInstance, trailers api.RequestTrailerMap) ResultAction +} + +type EncodeWholeResponseFilter interface { + NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bool + EncodeResponse(headers api.ResponseHeaderMap, data api.BufferInstance, trailers api.ResponseTrailerMap) ResultAction +} + type Filter interface { - DecodeHeaders(RequestHeaderMap, bool) - DecodeData(BufferInstance, bool) - DecodeTrailers(RequestTrailerMap) + DecodeHeaders(headers RequestHeaderMap, endStream bool) ResultAction + DecodeData(data BufferInstance, endStream bool) ResultAction + DecodeTrailers(trailers RequestTrailerMap) ResultAction - EncodeHeaders(ResponseHeaderMap, bool) - EncodeData(BufferInstance, bool) - EncodeTrailers(ResponseTrailerMap) + EncodeHeaders(headers ResponseHeaderMap, endStream bool) ResultAction + EncodeData(data BufferInstance, endStream bool) ResultAction + EncodeTrailers(trailers ResponseTrailerMap) ResultAction OnLog() + + DecodeWholeRequestFilter + EncodeWholeResponseFilter } type PassThroughFilter struct{} -func (f *PassThroughFilter) DecodeHeaders(headers RequestHeaderMap, endStream bool) {} +func (f *PassThroughFilter) DecodeHeaders(headers RequestHeaderMap, endStream bool) ResultAction { + return Continue +} -func (f *PassThroughFilter) DecodeData(data BufferInstance, endStream bool) {} +func (f *PassThroughFilter) DecodeData(data BufferInstance, endStream bool) ResultAction { + return Continue +} -func (f *PassThroughFilter) DecodeTrailers(trailers RequestTrailerMap) {} +func (f *PassThroughFilter) DecodeTrailers(trailers RequestTrailerMap) ResultAction { + return Continue +} -func (f *PassThroughFilter) EncodeHeaders(headers ResponseHeaderMap, endStream bool) {} +func (f *PassThroughFilter) EncodeHeaders(headers ResponseHeaderMap, endStream bool) ResultAction { + return Continue +} -func (f *PassThroughFilter) EncodeData(data BufferInstance, endStream bool) {} +func (f *PassThroughFilter) EncodeData(data BufferInstance, endStream bool) ResultAction { + return Continue +} -func (f *PassThroughFilter) EncodeTrailers(trailers ResponseTrailerMap) {} +func (f *PassThroughFilter) EncodeTrailers(trailers ResponseTrailerMap) ResultAction { + return Continue +} func (f *PassThroughFilter) OnLog() {} -type DecodeWholeRequestFilter interface { - NeedDecodeWholeRequest(headers api.RequestHeaderMap) bool - DecodeRequest(headers api.RequestHeaderMap, buf api.BufferInstance, trailers api.RequestTrailerMap) +func (f *PassThroughFilter) NeedDecodeWholeRequest(headers api.RequestHeaderMap) bool { return false } +func (f *PassThroughFilter) DecodeRequest(headers api.RequestHeaderMap, data api.BufferInstance, trailers api.RequestTrailerMap) ResultAction { + return Continue } -type EncodeWholeResponseFilter interface { - NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bool - EncodeResponse(headers api.ResponseHeaderMap, buf api.BufferInstance, trailers api.ResponseTrailerMap) +func (f *PassThroughFilter) NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bool { return false } +func (f *PassThroughFilter) EncodeResponse(headers api.ResponseHeaderMap, data api.BufferInstance, trailers api.ResponseTrailerMap) ResultAction { + return Continue } type RequestHeaderMap = api.RequestHeaderMap @@ -59,8 +84,6 @@ type FilterCallbackHandler interface { StreamInfo() StreamInfo RecoverPanic() GetProperty(key string) (string, error) - // TODO: remove it later - SendLocalReply(responseCode int, bodyText string, headers map[string]string, grpcStatus int64, details string) } type FilterFactory func(callbacks FilterCallbackHandler) Filter diff --git a/pkg/filtermanager/api/result_action.go b/pkg/filtermanager/api/result_action.go new file mode 100644 index 00000000..d50cedfb --- /dev/null +++ b/pkg/filtermanager/api/result_action.go @@ -0,0 +1,23 @@ +package api + +import "net/http" + +type ResultAction interface { + OK() +} +type isResultAction struct { +} + +var ( + Continue ResultAction = nil +) + +func (i *isResultAction) OK() {} + +type LocalResponse struct { + isResultAction + + Code int + Msg string + Header http.Header +} diff --git a/pkg/filtermanager/filtermanager.go b/pkg/filtermanager/filtermanager.go index 3483d4d5..78645431 100644 --- a/pkg/filtermanager/filtermanager.go +++ b/pkg/filtermanager/filtermanager.go @@ -102,7 +102,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC } func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{}) interface{} { - // TODO: We have considered to implemented a Merge Policy between the LDS's filter & RDS's per route + // We have considered to implemented a Merge Policy between the LDS's filter & RDS's per route // config. A thought is to reuse the current Merge method. For example, considering we have // LDS: // - name: A @@ -169,30 +169,62 @@ func FilterManagerConfigFactory(c interface{}) capi.StreamFilterFactory { } } +func (m *filterManager) handleAction(res api.ResultAction) (needReturn bool) { + if res == api.Continue { + return false + } + + switch v := res.(type) { + case *api.LocalResponse: + m.localReply(v) + return true + default: + api.LogErrorf("unknown result action: %+v", v) + return false + } +} + +func (m *filterManager) localReply(v *api.LocalResponse) { + // TODO: support multiple same name header in Envoy + var hdr map[string]string + if v.Header != nil { + hdr = map[string]string{} + for k, vv := range map[string][]string(v.Header) { + hdr[k] = vv[0] + } + } + // TODO: provide JSON / gRPC reply according to the request info + if v.Code == 0 { + v.Code = 200 + } + m.callbacks.SendLocalReply(v.Code, v.Msg, hdr, 0, "") +} + func (m *filterManager) DecodeHeaders(header api.RequestHeaderMap, endStream bool) capi.StatusType { go func() { defer m.callbacks.RecoverPanic() + var res api.ResultAction for i, f := range m.filters { - runDecodeRequest := false - if wr, ok := f.(api.DecodeWholeRequestFilter); ok { - needed := wr.NeedDecodeWholeRequest(header) - if needed { - if !endStream { - m.decodeIdx = i - m.reqHdr = header - m.callbacks.Continue(capi.StopAndBuffer) - return - } - - // no body - runDecodeRequest = true - wr.DecodeRequest(header, nil, nil) + needed := f.NeedDecodeWholeRequest(header) + if needed { + if !endStream { + m.decodeIdx = i + m.reqHdr = header + // some filters, like authorization with request body, need to + // have a whole body before passing to the next filter + m.callbacks.Continue(capi.StopAndBuffer) + return } + + // no body + res = f.DecodeRequest(header, nil, nil) + } else { + res = f.DecodeHeaders(header, endStream) } - if !runDecodeRequest { - f.DecodeHeaders(header, endStream) + if m.handleAction(res) { + return } } m.callbacks.Continue(capi.Continue) @@ -204,38 +236,89 @@ func (m *filterManager) DecodeHeaders(header api.RequestHeaderMap, endStream boo func (m *filterManager) DecodeData(buf api.BufferInstance, endStream bool) capi.StatusType { go func() { defer m.callbacks.RecoverPanic() + var res api.ResultAction + + // We have discussed a lot about how to support processing data both streamingly and + // as a whole body. Here are some solutions we have considered: + // 1. let Envoy process data streamingly, and do buffering in Go. This solution is costly + // and may be broken if the buffered data at Go side is rewritten by later C++ filter. + // 2. separate the filters which need a whole body in a separate C++ filter. It can't + // be done without a special control plane. + // 3. add multiple virtual C++ filters to Envoy when init the Envoy Golang filter. It + // is complex because we need to share and move the state between multiple Envoy C++ + // filter. + // 4. when a filter requires a whole body, all the filters will use a whole body. + // Otherwise, streaming processing is used. It's simple and already satisfies our + // most demand, so we choose this way for now. n := len(m.filters) if m.decodeIdx == -1 { // every filter doesn't need buffered body for i := 0; i < n; i++ { f := m.filters[i] - f.DecodeData(buf, endStream) + res = f.DecodeData(buf, endStream) + if m.handleAction(res) { + return + } } m.callbacks.Continue(capi.Continue) } else { for i := 0; i < m.decodeIdx; i++ { f := m.filters[i] - f.DecodeData(buf, endStream) + res = f.DecodeData(buf, endStream) + if m.handleAction(res) { + return + } } - wr := m.filters[m.decodeIdx].(api.DecodeWholeRequestFilter) - wr.DecodeRequest(m.reqHdr, buf, nil) - for i := m.decodeIdx + 1; i < n; i++ { - f := m.filters[i] - runDecodeRequest := false - if wr, ok := f.(api.DecodeWholeRequestFilter); ok { - needed := wr.NeedDecodeWholeRequest(m.reqHdr) + + f := m.filters[m.decodeIdx] + res = f.DecodeRequest(m.reqHdr, buf, nil) + if m.handleAction(res) { + return + } + + i := m.decodeIdx + 1 + for i < n { + var needed bool + for ; i < n; i++ { + f := m.filters[i] + needed = f.NeedDecodeWholeRequest(m.reqHdr) if needed { - runDecodeRequest = true - wr.DecodeRequest(m.reqHdr, buf, nil) + break + } + } + + for j := m.decodeIdx + 1; j < i; j++ { + f := m.filters[j] + // The endStream in DecodeHeaders indicates whether there is a body. + // The body always exists when we hit this path. + res = f.DecodeHeaders(m.reqHdr, false) + if m.handleAction(res) { + return + } + } + // When there are multiple filters want to decode the whole req, + // run part of the DecodeData which is before them + for j := m.decodeIdx + 1; j < i; j++ { + f := m.filters[j] + res = f.DecodeData(buf, endStream) + if m.handleAction(res) { + return } } - if !runDecodeRequest { - f.DecodeHeaders(m.reqHdr, endStream) - f.DecodeData(buf, endStream) + + if needed { + m.decodeIdx = i + f := m.filters[m.decodeIdx] + res = f.DecodeRequest(m.reqHdr, buf, nil) + if m.handleAction(res) { + return + } + i++ } } + m.callbacks.Continue(capi.Continue) } }() @@ -246,29 +329,28 @@ func (m *filterManager) DecodeData(buf api.BufferInstance, endStream bool) capi. func (m *filterManager) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) capi.StatusType { go func() { defer m.callbacks.RecoverPanic() + var res api.ResultAction n := len(m.filters) for i := n - 1; i >= 0; i-- { f := m.filters[i] - runEncodeResponse := false - if wr, ok := f.(api.EncodeWholeResponseFilter); ok { - needed := wr.NeedEncodeWholeResponse(header) - if needed { - if !endStream { - m.encodeIdx = i - m.rspHdr = header - m.callbacks.Continue(capi.StopAndBuffer) - return - } - - // no body - runEncodeResponse = true - wr.EncodeResponse(header, nil, nil) + needed := f.NeedEncodeWholeResponse(header) + if needed { + if !endStream { + m.encodeIdx = i + m.rspHdr = header + m.callbacks.Continue(capi.StopAndBuffer) + return } + + // no body + res = f.EncodeResponse(header, nil, nil) + } else { + res = f.EncodeHeaders(header, endStream) } - if !runEncodeResponse { - f.EncodeHeaders(header, endStream) + if m.handleAction(res) { + return } } m.callbacks.Continue(capi.Continue) @@ -280,38 +362,72 @@ func (m *filterManager) EncodeHeaders(header api.ResponseHeaderMap, endStream bo func (m *filterManager) EncodeData(buf api.BufferInstance, endStream bool) capi.StatusType { go func() { defer m.callbacks.RecoverPanic() + var res api.ResultAction n := len(m.filters) if m.encodeIdx == -1 { // every filter doesn't need buffered body for i := n - 1; i >= 0; i-- { f := m.filters[i] - f.EncodeData(buf, endStream) + res = f.EncodeData(buf, endStream) + if m.handleAction(res) { + return + } } m.callbacks.Continue(capi.Continue) } else { for i := n - 1; i > m.encodeIdx; i-- { f := m.filters[i] - f.EncodeData(buf, endStream) + res = f.EncodeData(buf, endStream) + if m.handleAction(res) { + return + } } - wr := m.filters[m.encodeIdx].(api.EncodeWholeResponseFilter) - wr.EncodeResponse(m.rspHdr, buf, nil) - for i := m.encodeIdx - 1; i >= 0; i-- { - f := m.filters[i] - runEncodeResponse := false - if wr, ok := f.(api.EncodeWholeResponseFilter); ok { - needed := wr.NeedEncodeWholeResponse(m.rspHdr) + + f := m.filters[m.encodeIdx] + res = f.EncodeResponse(m.rspHdr, buf, nil) + if m.handleAction(res) { + return + } + + i := m.encodeIdx - 1 + for i >= 0 { + var needed bool + for ; i >= 0; i-- { + f := m.filters[i] + needed = f.NeedEncodeWholeResponse(m.rspHdr) if needed { - runEncodeResponse = true - wr.EncodeResponse(m.rspHdr, buf, nil) + break + } + } + + for j := m.encodeIdx - 1; j > i; j-- { + f := m.filters[j] + res = f.EncodeHeaders(m.rspHdr, false) + if m.handleAction(res) { + return + } + } + for j := m.encodeIdx - 1; j > i; j-- { + f := m.filters[j] + res = f.EncodeData(buf, endStream) + if m.handleAction(res) { + return } } - if !runEncodeResponse { - f.EncodeHeaders(m.rspHdr, endStream) - f.EncodeData(buf, endStream) + + if needed { + m.encodeIdx = i + f := m.filters[m.encodeIdx] + res = f.EncodeResponse(m.rspHdr, buf, nil) + if m.handleAction(res) { + return + } + i-- } } + m.callbacks.Continue(capi.Continue) } }() diff --git a/plugins/casbin/filter.go b/plugins/casbin/filter.go index c0cbaccc..5b0ce80e 100644 --- a/plugins/casbin/filter.go +++ b/plugins/casbin/filter.go @@ -25,7 +25,7 @@ type filter struct { config *config } -func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) { +func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.ResultAction { role, _ := header.Get(f.config.Token.Name) // role can be "" url := request.GetUrl(header) @@ -35,9 +35,11 @@ func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) { if !ok { api.LogInfof("reject forbidden user %s", role) - f.callbacks.SendLocalReply(403, "", nil, 0, "") - return + return &api.LocalResponse{ + Code: 403, + } } + return api.Continue } func (f *filter) OnLog() { diff --git a/plugins/casbin/filter_test.go b/plugins/casbin/filter_test.go index 9ff7f3dc..fc77b082 100644 --- a/plugins/casbin/filter_test.go +++ b/plugins/casbin/filter_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" + "mosn.io/moe/pkg/filtermanager/api" "mosn.io/moe/tests/pkg/envoy" ) @@ -67,8 +68,12 @@ func TestCasbin(t *testing.T) { wg.Add(1) go func() { // ensure the lock takes effect - f.DecodeHeaders(hdr, true) - assert.Equal(t, tt.status, cb.LocalResponseCode()) + lr, ok := f.DecodeHeaders(hdr, true).(*api.LocalResponse) + if !ok { + assert.Equal(t, tt.status, 0) + } else { + assert.Equal(t, tt.status, lr.Code) + } wg.Done() }() } diff --git a/plugins/demo/filter.go b/plugins/demo/filter.go index 38709fbe..91bf8e7a 100644 --- a/plugins/demo/filter.go +++ b/plugins/demo/filter.go @@ -23,8 +23,9 @@ type filter struct { config *Config } -func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) { +func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.ResultAction { header.Set(f.config.HostName, f.hello()) + return api.Continue } func (f *filter) hello() string { diff --git a/plugins/opa/filter.go b/plugins/opa/filter.go index c6284e7f..ba24153b 100644 --- a/plugins/opa/filter.go +++ b/plugins/opa/filter.go @@ -72,23 +72,21 @@ func (f *filter) isAllowed(input map[string]interface{}) (bool, error) { return opaResponse.Result.Allow, nil } -func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) { +func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.ResultAction { input, err := f.buildInput(header) if err != nil { api.LogErrorf("failed to build input: %v", err) - f.callbacks.SendLocalReply(503, "", nil, 0, "") - return + return &api.LocalResponse{Code: 503} } allow, err := f.isAllowed(input) if err != nil { api.LogErrorf("failed to call OPA server: %v", err) - f.callbacks.SendLocalReply(503, "", nil, 0, "") - return + return &api.LocalResponse{Code: 503} } if !allow { - f.callbacks.SendLocalReply(403, "", nil, 0, "") - return + return &api.LocalResponse{Code: 403} } + return api.Continue } diff --git a/plugins/opa/filter_test.go b/plugins/opa/filter_test.go index 22ee7104..3122aa7a 100644 --- a/plugins/opa/filter_test.go +++ b/plugins/opa/filter_test.go @@ -10,6 +10,7 @@ import ( "github.com/agiledragon/gomonkey/v2" "github.com/stretchr/testify/assert" + "mosn.io/moe/pkg/filtermanager/api" "mosn.io/moe/tests/pkg/envoy" ) @@ -85,8 +86,12 @@ func TestOpaRemote(t *testing.T) { }) defer patches.Reset() - f.DecodeHeaders(hdr, true) - assert.Equal(t, tt.status, cb.LocalResponseCode()) + lr, ok := f.DecodeHeaders(hdr, true).(*api.LocalResponse) + if !ok { + assert.Equal(t, tt.status, 0) + } else { + assert.Equal(t, tt.status, lr.Code) + } }) } } diff --git a/tests/integration/plugins/filtermanager_test.go b/tests/integration/plugins/filtermanager_test.go index ec950e35..5a233d2b 100644 --- a/tests/integration/plugins/filtermanager_test.go +++ b/tests/integration/plugins/filtermanager_test.go @@ -1,6 +1,7 @@ package plugins import ( + "bytes" "io" "net/http" "strconv" @@ -243,6 +244,8 @@ func TestFilterManagerDecode(t *testing.T) { func assertBodyHas(t *testing.T, exp string, resp *http.Response) { d, _ := io.ReadAll(resp.Body) assert.Contains(t, string(d), exp) + // set the body back so the next assertion can read the body + resp.Body = io.NopCloser(bytes.NewBuffer(d)) } func TestFilterManagerEncode(t *testing.T) { @@ -457,3 +460,418 @@ func TestFilterManagerEncode(t *testing.T) { }) } } + +func TestFilterManagerDecodeLocalReply(t *testing.T) { + dp, err := data_plane.StartDataPlane(t, &data_plane.Option{}) + if err != nil { + t.Fatalf("failed to start data plane: %v", err) + return + } + defer dp.Stop() + + dh := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Decode: true, + Headers: true, + }, + }, + }, + } + dd := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + }, + } + ddThenB := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + }, + } + dr := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + }, + } + bThenDh := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + { + Name: "localReply", + Config: &Config{ + Decode: true, + Headers: true, + }, + }, + }, + } + bThenDd := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + }, + } + + lrThenE := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + { + Name: "stream", + Config: &Config{ + Encode: true, + }, + }, + }, + } + fOrder := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + { + Name: "stream", + Config: &Config{ + Decode: true, + }, + }, + // should local reply in DecodeData after running all DecodeHeaders + }, + } + fOrderM := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + { + Name: "localReply", + Config: &Config{ + Decode: true, + Data: true, + }, + }, + // should local reply in DecodeData before DecodeRequest + { + Name: "buffer", + Config: &Config{ + Decode: true, + Need: true, + }, + }, + { + Name: "stream", + Config: &Config{ + Decode: true, + }, + }, + }, + } + + tests := []struct { + name string + config *filtermanager.FilterManagerConfig + expect func(t *testing.T, resp *http.Response) + }{ + { + name: "DecodeHeaders", + config: dh, + }, + { + name: "DecodeData", + config: dd, + }, + { + name: "DecodeData before DecodeRequest", + config: ddThenB, + }, + { + name: "DecodeRequest", + config: dr, + }, + { + name: "DecodeHeaders after DecodeRequest", + config: bThenDh, + }, + { + name: "DecodeData after DecodeRequest", + config: bThenDd, + }, + { + name: "LocalReply rewritten by Encode", + config: lrThenE, + expect: func(t *testing.T, resp *http.Response) { + assert.Equal(t, []string{"stream"}, resp.Header.Values("Run")) + assertBodyHas(t, "stream\n", resp) + }, + }, + { + name: "Ensure the header filters' order after DecodeRequest", + config: fOrder, + expect: func(t *testing.T, resp *http.Response) { + assert.Equal(t, "buffer|stream", resp.Header.Get("Order")) + }, + }, + { + name: "Ensure the header filters' order between multiple DecodeRequest", + config: fOrderM, + expect: func(t *testing.T, resp *http.Response) { + assert.Equal(t, "buffer", resp.Header.Get("Order")) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controlPlane.UseGoPluginConfig(tt.config) + resp, err := dp.Post("/echo", nil, strings.NewReader("any")) + assert.Nil(t, err) + assert.Equal(t, 206, resp.StatusCode) + assert.Equal(t, []string{"reply"}, resp.Header.Values("local")) + assertBodyHas(t, "ok", resp) + + if tt.expect != nil { + tt.expect(t, resp) + } + }) + } +} + +func TestFilterManagerEncodeLocalReply(t *testing.T) { + dp, err := data_plane.StartDataPlane(t, &data_plane.Option{}) + if err != nil { + t.Fatalf("failed to start data plane: %v", err) + return + } + defer dp.Stop() + + eh := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Headers: true, + }, + }, + }, + } + ed := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Data: true, + }, + }, + }, + } + er := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Need: true, + }, + }, + }, + } + edThenB := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "buffer", + Config: &Config{ + Encode: true, + Need: true, + }, + }, + { + Name: "localReply", + Config: &Config{ + Encode: true, + Data: true, + }, + }, + }, + } + bThenEh := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Headers: true, + }, + }, + { + Name: "buffer", + Config: &Config{ + Encode: true, + Need: true, + }, + }, + }, + } + bThenEd := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Data: true, + }, + }, + { + Name: "buffer", + Config: &Config{ + Encode: true, + Need: true, + }, + }, + }, + } + bThenSThenEh := &filtermanager.FilterManagerConfig{ + Plugins: []*filtermanager.FilterConfig{ + { + Name: "localReply", + Config: &Config{ + Encode: true, + Headers: true, + }, + }, + { + Name: "stream", + Config: &Config{ + Encode: true, + }, + }, + { + Name: "buffer", + Config: &Config{ + Encode: true, + Need: true, + }, + }, + }, + } + + tests := []struct { + name string + config *filtermanager.FilterManagerConfig + expect func(t *testing.T, resp *http.Response) + }{ + { + name: "EncodeHeaders", + config: eh, + }, + { + name: "EncodeData", + config: ed, + }, + { + name: "EncodeResponse", + config: er, + }, + { + name: "EncodeData before EncodeResponse", + config: edThenB, + }, + { + name: "EncodeHeaders after EncodeResponse", + config: bThenEh, + }, + { + name: "EncodeData after EncodeResponse", + config: bThenEd, + }, + { + name: "Buffer all, then run header filters from stream and local reply", + config: bThenSThenEh, + expect: func(t *testing.T, resp *http.Response) { + // only EncodeData in localReply is run + assertBody(t, "ok", resp) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controlPlane.UseGoPluginConfig(tt.config) + hdr := http.Header{} + hdr.Add("from", "reply") + resp, err := dp.Post("/echo", hdr, strings.NewReader("any")) + assert.Nil(t, err) + assert.Equal(t, 206, resp.StatusCode) + assert.Equal(t, "reply", resp.Header.Get("local")) + assertBodyHas(t, "ok", resp) + + if tt.expect != nil { + tt.expect(t, resp) + } + }) + } +} diff --git a/tests/integration/plugins/test_plugins.go b/tests/integration/plugins/test_plugins.go index b440752b..245e94c7 100644 --- a/tests/integration/plugins/test_plugins.go +++ b/tests/integration/plugins/test_plugins.go @@ -2,16 +2,20 @@ package plugins import ( "encoding/json" + "net/http" "runtime/debug" + "strings" "mosn.io/moe/pkg/filtermanager/api" "mosn.io/moe/pkg/plugins" ) type Config struct { - Need bool `json:"need"` - Decode bool `json:"decode"` - Encode bool `json:"encode"` + Need bool `json:"need"` + Decode bool `json:"decode"` + Encode bool `json:"encode"` + Headers bool `json:"headers"` + Data bool `json:"data"` } type streamPlugin struct { @@ -35,29 +39,33 @@ type streamFilter struct { config *Config } -func (f *streamFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) { +func (f *streamFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Add("run", "stream") + return api.Continue } -func (f *streamFilter) DecodeData(data api.BufferInstance, endStream bool) { +func (f *streamFilter) DecodeData(data api.BufferInstance, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) if f.config.Decode { data.AppendString("stream\n") } + return api.Continue } -func (f *streamFilter) EncodeHeaders(headers api.ResponseHeaderMap, endStream bool) { +func (f *streamFilter) EncodeHeaders(headers api.ResponseHeaderMap, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Add("run", "stream") headers.Del("content-length") + return api.Continue } -func (f *streamFilter) EncodeData(data api.BufferInstance, endStream bool) { +func (f *streamFilter) EncodeData(data api.BufferInstance, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) if f.config.Encode { data.AppendString("stream\n") } + return api.Continue } func (p *streamPlugin) ConfigFactory() api.FilterConfigFactory { @@ -95,12 +103,13 @@ func (f *bufferFilter) NeedDecodeWholeRequest(headers api.RequestHeaderMap) bool return !ok && f.config.Need } -func (f *bufferFilter) DecodeRequest(headers api.RequestHeaderMap, buf api.BufferInstance, trailer api.RequestTrailerMap) { +func (f *bufferFilter) DecodeRequest(headers api.RequestHeaderMap, buf api.BufferInstance, trailer api.RequestTrailerMap) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Add("run", "buffer") if buf != nil && f.config.Decode { buf.AppendString("buffer\n") } + return api.Continue } func (f *bufferFilter) NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bool { @@ -109,38 +118,43 @@ func (f *bufferFilter) NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bo return !ok && f.config.Need } -func (f *bufferFilter) EncodeResponse(headers api.ResponseHeaderMap, buf api.BufferInstance, trailers api.ResponseTrailerMap) { +func (f *bufferFilter) EncodeResponse(headers api.ResponseHeaderMap, buf api.BufferInstance, trailers api.ResponseTrailerMap) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Add("run", "buffer") headers.Del("content-length") if buf != nil && f.config.Encode { buf.AppendString("buffer\n") } + return api.Continue } -func (f *bufferFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) { +func (f *bufferFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Add("run", "no buffer") + return api.Continue } -func (f *bufferFilter) DecodeData(data api.BufferInstance, endStream bool) { +func (f *bufferFilter) DecodeData(data api.BufferInstance, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) if f.config.Decode { data.AppendString("no buffer\n") } + return api.Continue } -func (f *bufferFilter) EncodeHeaders(headers api.ResponseHeaderMap, endStream bool) { +func (f *bufferFilter) EncodeHeaders(headers api.ResponseHeaderMap, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) headers.Del("content-length") headers.Add("run", "no buffer") + return api.Continue } -func (f *bufferFilter) EncodeData(data api.BufferInstance, endStream bool) { +func (f *bufferFilter) EncodeData(data api.BufferInstance, endStream bool) api.ResultAction { api.LogInfof("traceback: %s", string(debug.Stack())) if f.config.Encode { data.AppendString("no buffer\n") } + return api.Continue } func (p *bufferPlugin) ConfigFactory() api.FilterConfigFactory { @@ -151,6 +165,109 @@ func (p *bufferPlugin) ConfigParser() api.FilterConfigParser { return plugins.NewPluginConfigParser(&parser{}) } +type localReplyPlugin struct { + plugins.PluginMethodDefaultImpl +} + +func localReplyConfigFactory(c interface{}) api.FilterFactory { + conf := c.(*Config) + return func(callbacks api.FilterCallbackHandler) api.Filter { + return &localReplyFilter{ + callbacks: callbacks, + config: conf, + } + } +} + +type localReplyFilter struct { + api.PassThroughFilter + + callbacks api.FilterCallbackHandler + config *Config + reqHdr api.RequestHeaderMap +} + +func (f *localReplyFilter) NeedDecodeWholeRequest(headers api.RequestHeaderMap) bool { + api.LogInfof("traceback: %s", string(debug.Stack())) + return f.config.Need +} + +func (f *localReplyFilter) NewLocalResponse(reply string) *api.LocalResponse { + hdr := http.Header{} + hdr.Set("local", reply) + + runFilters := f.reqHdr.Values("run") + if len(runFilters) > 0 { + hdr.Set("order", strings.Join(runFilters, "|")) + } + return &api.LocalResponse{Code: 206, Msg: "ok", Header: hdr} +} + +func (f *localReplyFilter) DecodeRequest(headers api.RequestHeaderMap, buf api.BufferInstance, trailer api.RequestTrailerMap) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + f.reqHdr = headers + if f.config.Decode { + return f.NewLocalResponse("reply") + } + return api.Continue +} + +func (f *localReplyFilter) NeedEncodeWholeResponse(headers api.ResponseHeaderMap) bool { + api.LogInfof("traceback: %s", string(debug.Stack())) + return f.config.Need +} + +func (f *localReplyFilter) EncodeResponse(headers api.ResponseHeaderMap, buf api.BufferInstance, trailers api.ResponseTrailerMap) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + if f.config.Encode { + r, _ := headers.Get("echo-from") + return f.NewLocalResponse(r) + } + return api.Continue +} + +func (f *localReplyFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + f.reqHdr = headers + if f.config.Decode && f.config.Headers { + return f.NewLocalResponse("reply") + } + return api.Continue +} + +func (f *localReplyFilter) DecodeData(data api.BufferInstance, endStream bool) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + if f.config.Decode && f.config.Data { + return f.NewLocalResponse("reply") + } + return api.Continue +} + +func (f *localReplyFilter) EncodeHeaders(headers api.ResponseHeaderMap, endStream bool) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + if f.config.Encode && f.config.Headers { + r, _ := headers.Get("echo-from") + return f.NewLocalResponse(r) + } + return api.Continue +} + +func (f *localReplyFilter) EncodeData(data api.BufferInstance, endStream bool) api.ResultAction { + api.LogInfof("traceback: %s", string(debug.Stack())) + if f.config.Encode && f.config.Data { + return f.NewLocalResponse("reply") + } + return api.Continue +} + +func (p *localReplyPlugin) ConfigFactory() api.FilterConfigFactory { + return localReplyConfigFactory +} + +func (p *localReplyPlugin) ConfigParser() api.FilterConfigParser { + return plugins.NewPluginConfigParser(&parser{}) +} + type parser struct { } @@ -175,4 +292,5 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} { func init() { plugins.RegisterHttpPlugin("stream", &streamPlugin{}) plugins.RegisterHttpPlugin("buffer", &bufferPlugin{}) + plugins.RegisterHttpPlugin("localReply", &localReplyPlugin{}) }