From ca4915086b9117bc8804ec114c577969ce30d25d Mon Sep 17 00:00:00 2001 From: spacewander Date: Fri, 23 Feb 2024 11:11:59 +0800 Subject: [PATCH] filtermanager: cache consumer filter Signed-off-by: spacewander --- pkg/consumer/consumer.go | 5 + pkg/filtermanager/filtermanager.go | 95 ++++++++++--------- .../filtermanager_benchmark_test.go | 47 ++++++++- pkg/filtermanager/filtermanager_test.go | 81 +++++++++++----- pkg/filtermanager/model/model.go | 15 +++ plugins/tests/pkg/envoy/capi.go | 9 ++ 6 files changed, 177 insertions(+), 75 deletions(-) diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index fd15a759..44d3d179 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -44,6 +44,11 @@ type Consumer struct { generation int ConsumerConfigs map[string]api.PluginConsumerConfig `json:"-"` FilterConfigs map[string]*model.ParsedFilterConfig `json:"-"` + + // fields that generated from the configuration + CanSkipMethod map[string]bool + FilterNames []string + FilterWrappers []*model.FilterWrapper } func (c *Consumer) Marshal() string { diff --git a/pkg/filtermanager/filtermanager.go b/pkg/filtermanager/filtermanager.go index fcf42ca3..da9ebddb 100644 --- a/pkg/filtermanager/filtermanager.go +++ b/pkg/filtermanager/filtermanager.go @@ -53,8 +53,8 @@ type FilterManagerConfig struct { type filterManagerConfig struct { consumerFiltersEndAt int - current []*model.ParsedFilterConfig - pool *sync.Pool + parsed []*model.ParsedFilterConfig + pool *sync.Pool } func initFilterManagerConfig(namespace string) *filterManagerConfig { @@ -83,7 +83,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC // No configuration if any.GetTypeUrl() == "" { conf := &filterManagerConfig{ - current: []*model.ParsedFilterConfig{}, + parsed: []*model.ParsedFilterConfig{}, } return conf, nil } @@ -108,7 +108,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC plugins := fmConfig.Plugins conf := initFilterManagerConfig(fmConfig.Namespace) - conf.current = make([]*model.ParsedFilterConfig, 0, len(plugins)) + conf.parsed = make([]*model.ParsedFilterConfig, 0, len(plugins)) consumerFiltersEndAt := 0 i := 0 @@ -126,12 +126,12 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC // As we can't control what is returned from a plugin, we need to // avoid the failure by providing a special factory, which also // indicates something is wrong. - conf.current = append(conf.current, &model.ParsedFilterConfig{ + conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{ Name: proto.Name, Factory: InternalErrorFactory, }) } else { - conf.current = append(conf.current, &model.ParsedFilterConfig{ + conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{ Name: proto.Name, ParsedConfig: config, Factory: plugin.Factory, @@ -168,21 +168,9 @@ func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{}) return child } -type filterWrapper struct { - api.Filter - name string -} - -func newFilterWrapper(name string, f api.Filter) *filterWrapper { - return &filterWrapper{ - Filter: f, - name: name, - } -} - type filterManager struct { - filters []*filterWrapper - consumerFilters []*filterWrapper + filters []*model.FilterWrapper + consumerFilters []*model.FilterWrapper decodeRequestNeeded bool decodeIdx int @@ -345,7 +333,7 @@ func newSkipMethodsMap() map[string]bool { func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.StreamFilter { conf := c.(*filterManagerConfig) - newConfig := conf.current + parsedConfig := conf.parsed fm := conf.pool.Get().(*filterManager) fm.callbacks.FilterCallbackHandler = cb @@ -355,8 +343,8 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str canSkipMethod = newSkipMethodsMap() } - filters := make([]*filterWrapper, len(newConfig)) - for i, fc := range newConfig { + filters := make([]*model.FilterWrapper, len(parsedConfig)) + for i, fc := range parsedConfig { factory := fc.Factory config := fc.ParsedConfig f := factory(config, fm.callbacks) @@ -371,12 +359,14 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str canSkipMethod[meth] = canSkipMethod[meth] && ok } } - filters[i] = newFilterWrapper(fc.Name, f) + filters[i] = model.NewFilterWrapper(fc.Name, f) } if fm.canSkipMethod == nil { fm.canSkipMethod = canSkipMethod } + + // We can't cache the slice of filters as it may be changed by consumer fm.filters = filters if conf.consumerFiltersEndAt != 0 { @@ -393,7 +383,6 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str fm.canSkipEncodeHeaders = fm.canSkipMethod["EncodeHeaders"] fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"] fm.canSkipOnLog = fm.canSkipMethod["OnLog"] - // TODO: add cache for consumer so that we can also cache the skip result too return fm } @@ -502,29 +491,41 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo } if len(c.FilterConfigs) > 0 { - canSkipMethod := newSkipMethodsMap() - filters := make([]*filterWrapper, 0, len(c.FilterConfigs)) - names := make([]string, 0, len(c.FilterConfigs)) - for name, fc := range c.FilterConfigs { - names = append(names, name) - - factory := fc.Factory - config := fc.ParsedConfig - f := factory(config, m.callbacks) - for meth := range canSkipMethod { - ok, err := isMethodFromPassThroughFilter(f, meth) - if err != nil { - api.LogErrorf("failed to check method %s in filter: %v", meth, err) - // canSkipMethod[meth] will be false + if c.FilterWrappers == nil { + c.FilterWrappers = make([]*model.FilterWrapper, 0, len(c.FilterConfigs)) + canSkipMethod := newSkipMethodsMap() + names := make([]string, 0, len(c.FilterConfigs)) + for name, fc := range c.FilterConfigs { + names = append(names, name) + + factory := fc.Factory + config := fc.ParsedConfig + f := factory(config, m.callbacks) + for meth := range canSkipMethod { + ok, err := isMethodFromPassThroughFilter(f, meth) + if err != nil { + api.LogErrorf("failed to check method %s in filter: %v", meth, err) + // canSkipMethod[meth] will be false + } + canSkipMethod[meth] = canSkipMethod[meth] && ok } - canSkipMethod[meth] = canSkipMethod[meth] && ok + nf := model.NewFilterWrapper(name, f) + c.FilterWrappers = append(c.FilterWrappers, nf) } - nf := newFilterWrapper(name, f) - filters = append(filters, nf) - } - api.LogInfof("add filters %v from consumer %s", names, c.Name()) + c.FilterNames = names + c.CanSkipMethod = canSkipMethod + } else { + for i, name := range c.FilterNames { + fc := c.FilterConfigs[name] + factory := fc.Factory + config := fc.ParsedConfig + f := factory(config, m.callbacks) + c.FilterWrappers[i].Filter = f + } + } + canSkipMethod := c.CanSkipMethod m.canSkipDecodeData = m.canSkipDecodeData && canSkipMethod["DecodeData"] && canSkipMethod["DecodeRequest"] m.canSkipEncodeHeaders = m.canSkipEncodeData && canSkipMethod["EncodeHeaders"] m.canSkipEncodeData = m.canSkipEncodeData && canSkipMethod["EncodeData"] && canSkipMethod["EncodeResponse"] @@ -533,14 +534,14 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo // TODO: add field to control if merging is allowed i := 0 for _, f := range m.filters { - if c.FilterConfigs[f.name] == nil { + if c.FilterConfigs[f.Name] == nil { m.filters[i] = f i++ } } - m.filters = append(m.filters[:i], filters...) + m.filters = append(m.filters[:i], c.FilterWrappers...) sort.Slice(m.filters, func(i, j int) bool { - return pkgPlugins.ComparePluginOrder(m.filters[i].name, m.filters[j].name) + return pkgPlugins.ComparePluginOrder(m.filters[i].Name, m.filters[j].Name) }) } } diff --git a/pkg/filtermanager/filtermanager_benchmark_test.go b/pkg/filtermanager/filtermanager_benchmark_test.go index 74cc9bc5..aadfe9e9 100644 --- a/pkg/filtermanager/filtermanager_benchmark_test.go +++ b/pkg/filtermanager/filtermanager_benchmark_test.go @@ -16,8 +16,10 @@ package filtermanager import ( "net/http" + "strconv" "testing" + pkgConsumer "mosn.io/htnn/pkg/consumer" "mosn.io/htnn/pkg/filtermanager/api" "mosn.io/htnn/pkg/filtermanager/model" "mosn.io/htnn/plugins/tests/pkg/envoy" @@ -29,7 +31,7 @@ import ( func BenchmarkFilterManagerAllPhase(b *testing.B) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "allPhase", Factory: PassThroughFactory, @@ -74,7 +76,7 @@ func (f *regularFilter) OnLog() { func BenchmarkFilterManagerRegular(b *testing.B) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "regular", Factory: regularFactory, @@ -89,3 +91,44 @@ func BenchmarkFilterManagerRegular(b *testing.B) { m.OnLog() } } + +func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) { + envoy.DisableLogInTest() // otherwise, there is too much output + cb := envoy.NewCAPIFilterCallbackHandler() + config := initFilterManagerConfig("ns") + config.consumerFiltersEndAt = 1 + + consumers := map[string]*pkgConsumer.Consumer{} + num := 10 + reqHdrs := make([]api.RequestHeaderMap, num) + for i := 0; i < num; i++ { + c := pkgConsumer.Consumer{ + FilterConfigs: map[string]*model.ParsedFilterConfig{ + "regular": { + Name: "regular", + Factory: regularFactory, + }, + }, + } + consumers[strconv.Itoa(i)] = &c + h := http.Header{} + h.Add("Consumer", strconv.Itoa(i)) + reqHdrs[i] = envoy.NewRequestHeaderMap(h) + } + config.parsed = []*model.ParsedFilterConfig{ + { + Name: "set_consumer", + Factory: setConsumerFactory, + ParsedConfig: setConsumerConf{ + Consumers: consumers, + }, + }, + } + + for n := 0; n < b.N; n++ { + m := FilterManagerFactory(config, cb) + m.DecodeHeaders(reqHdrs[n%num], false) + cb.WaitContinued() + m.OnLog() + } +} diff --git a/pkg/filtermanager/filtermanager_test.go b/pkg/filtermanager/filtermanager_test.go index 0bfc350c..d810fb3a 100644 --- a/pkg/filtermanager/filtermanager_test.go +++ b/pkg/filtermanager/filtermanager_test.go @@ -15,7 +15,9 @@ package filtermanager import ( + "fmt" "net/http" + "strconv" "testing" "github.com/agiledragon/gomonkey/v2" @@ -77,7 +79,7 @@ func TestParse(t *testing.T) { func TestPassThrough(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "passthrough", Factory: PassThroughFactory, @@ -146,7 +148,7 @@ func TestLocalReplyJSON_UseReqHeader(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "test", Factory: PassThroughFactory, @@ -219,7 +221,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "test", Factory: PassThroughFactory, @@ -255,7 +257,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) { func TestLocalReplyJSON_DoNotChangeMsgIfContentTypeIsGiven(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "test", Factory: PassThroughFactory, @@ -281,33 +283,27 @@ func TestLocalReplyJSON_DoNotChangeMsgIfContentTypeIsGiven(t *testing.T) { }, lr) } +type setConsumerConf struct { + Consumers map[string]*pkgConsumer.Consumer +} + func setConsumerFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter { return &setConsumerFilter{ callbacks: callbacks, + conf: c.(setConsumerConf), } } type setConsumerFilter struct { api.PassThroughFilter + conf setConsumerConf callbacks api.FilterCallbackHandler } func (f *setConsumerFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { - f.callbacks.SetConsumer(&pkgConsumer.Consumer{ - FilterConfigs: map[string]*model.ParsedFilterConfig{ - "on_log": { - Name: "on_log", - Factory: onLogFactory, - }, - "add_req": { - Name: "add_req", - Factory: addReqFactory, - ParsedConfig: addReqConf{ - hdrName: "x-htnn-consumer", - }, - }, - }, - }) + key, _ := headers.Get("Consumer") + c := f.conf.Consumers[key] + f.callbacks.SetConsumer(c) return api.Continue } @@ -346,7 +342,7 @@ func (f *addReqFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream boo func TestSkipMethodWhenThereAreMultiFilters(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") - config.current = []*model.ParsedFilterConfig{ + config.parsed = []*model.ParsedFilterConfig{ { Name: "add_req", Factory: addReqFactory, @@ -372,10 +368,35 @@ func TestFiltersFromConsumer(t *testing.T) { cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.consumerFiltersEndAt = 1 - config.current = []*model.ParsedFilterConfig{ + + consumers := map[string]*pkgConsumer.Consumer{} + for i := 0; i < 10; i++ { + c := pkgConsumer.Consumer{ + FilterConfigs: map[string]*model.ParsedFilterConfig{ + "add_req": { + Name: "add_req", + Factory: addReqFactory, + ParsedConfig: addReqConf{ + hdrName: fmt.Sprintf("x-htnn-consumer-%d", i), + }, + }, + }, + } + if i%2 == 0 { + c.FilterConfigs["on_log"] = &model.ParsedFilterConfig{ + Name: "on_log", + Factory: onLogFactory, + } + } + consumers[strconv.Itoa(i)] = &c + } + config.parsed = []*model.ParsedFilterConfig{ { Name: "set_consumer", Factory: setConsumerFactory, + ParsedConfig: setConsumerConf{ + Consumers: consumers, + }, }, { Name: "add_req", @@ -385,19 +406,27 @@ func TestFiltersFromConsumer(t *testing.T) { }, }, } - for i := 0; i < 2; i++ { + for i := 0; i < 20; i++ { m := FilterManagerFactory(config, cb).(*filterManager) assert.Equal(t, true, m.canSkipOnLog) assert.Equal(t, 1, len(m.filters)) - hdr := envoy.NewRequestHeaderMap(http.Header{}) + h := http.Header{} + idx := i % 10 + h.Add("consumer", strconv.Itoa(idx)) + hdr := envoy.NewRequestHeaderMap(h) m.DecodeHeaders(hdr, true) cb.WaitContinued() - assert.Equal(t, false, m.canSkipOnLog) - assert.Equal(t, 2, len(m.filters)) + if idx%2 == 0 { + assert.Equal(t, false, m.canSkipOnLog) + assert.Equal(t, 2, len(m.filters)) + } else { + assert.Equal(t, true, m.canSkipOnLog) + assert.Equal(t, 1, len(m.filters)) + } _, ok := hdr.Get("x-htnn-route") assert.False(t, ok) - _, ok = hdr.Get("x-htnn-consumer") + _, ok = hdr.Get(fmt.Sprintf("x-htnn-consumer-%d", idx)) assert.True(t, ok) } } diff --git a/pkg/filtermanager/model/model.go b/pkg/filtermanager/model/model.go index 7cf61a34..18266411 100644 --- a/pkg/filtermanager/model/model.go +++ b/pkg/filtermanager/model/model.go @@ -14,6 +14,9 @@ package model +// This package puts filtermanager relative definitions that use across the internal packages. +// It's not a part of the API, so it's not recommended to use it in plugin code. + import "mosn.io/htnn/pkg/filtermanager/api" type FilterConfig struct { @@ -26,3 +29,15 @@ type ParsedFilterConfig struct { ParsedConfig interface{} Factory api.FilterFactory } + +type FilterWrapper struct { + api.Filter + Name string +} + +func NewFilterWrapper(name string, f api.Filter) *FilterWrapper { + return &FilterWrapper{ + Filter: f, + Name: name, + } +} diff --git a/plugins/tests/pkg/envoy/capi.go b/plugins/tests/pkg/envoy/capi.go index 9e40144f..70ca8a7c 100644 --- a/plugins/tests/pkg/envoy/capi.go +++ b/plugins/tests/pkg/envoy/capi.go @@ -33,7 +33,16 @@ func init() { capi.SetCommonCAPI(&fakeCapi{}) } +var logDisabled bool + +func DisableLogInTest() { + logDisabled = true +} + func logInGo(level capi.LogType, message string) { + if logDisabled { + return + } log.Printf("[%s] %s\n", level, message) }