Skip to content

Commit

Permalink
filtermanager: cache consumer filter (#324)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Feb 26, 2024
1 parent 66eb69e commit d8ce0ee
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 75 deletions.
5 changes: 5 additions & 0 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Consumer struct {
generation int
ConsumerConfigs map[string]api.PluginConsumerConfig `json:"-"`
FilterConfigs map[string]*model.ParsedFilterConfig `json:"-"`

// fields that generated from the configuration
CanSkipMethod map[string]bool
FilterNames []string
FilterWrappers []*model.FilterWrapper
}

func (c *Consumer) Marshal() string {
Expand Down
95 changes: 48 additions & 47 deletions pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type FilterManagerConfig struct {
type filterManagerConfig struct {
consumerFiltersEndAt int

current []*model.ParsedFilterConfig
pool *sync.Pool
parsed []*model.ParsedFilterConfig
pool *sync.Pool
}

func initFilterManagerConfig(namespace string) *filterManagerConfig {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
// No configuration
if any.GetTypeUrl() == "" {
conf := &filterManagerConfig{
current: []*model.ParsedFilterConfig{},
parsed: []*model.ParsedFilterConfig{},
}
return conf, nil
}
Expand All @@ -108,7 +108,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC

plugins := fmConfig.Plugins
conf := initFilterManagerConfig(fmConfig.Namespace)
conf.current = make([]*model.ParsedFilterConfig, 0, len(plugins))
conf.parsed = make([]*model.ParsedFilterConfig, 0, len(plugins))

consumerFiltersEndAt := 0
i := 0
Expand All @@ -126,12 +126,12 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
// As we can't control what is returned from a plugin, we need to
// avoid the failure by providing a special factory, which also
// indicates something is wrong.
conf.current = append(conf.current, &model.ParsedFilterConfig{
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
Factory: InternalErrorFactory,
})
} else {
conf.current = append(conf.current, &model.ParsedFilterConfig{
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
Expand Down Expand Up @@ -168,21 +168,9 @@ func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{})
return child
}

type filterWrapper struct {
api.Filter
name string
}

func newFilterWrapper(name string, f api.Filter) *filterWrapper {
return &filterWrapper{
Filter: f,
name: name,
}
}

type filterManager struct {
filters []*filterWrapper
consumerFilters []*filterWrapper
filters []*model.FilterWrapper
consumerFilters []*model.FilterWrapper

decodeRequestNeeded bool
decodeIdx int
Expand Down Expand Up @@ -345,7 +333,7 @@ func newSkipMethodsMap() map[string]bool {

func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.StreamFilter {
conf := c.(*filterManagerConfig)
newConfig := conf.current
parsedConfig := conf.parsed

fm := conf.pool.Get().(*filterManager)
fm.callbacks.FilterCallbackHandler = cb
Expand All @@ -355,8 +343,8 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
canSkipMethod = newSkipMethodsMap()
}

filters := make([]*filterWrapper, len(newConfig))
for i, fc := range newConfig {
filters := make([]*model.FilterWrapper, len(parsedConfig))
for i, fc := range parsedConfig {
factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, fm.callbacks)
Expand All @@ -371,12 +359,14 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
canSkipMethod[meth] = canSkipMethod[meth] && ok
}
}
filters[i] = newFilterWrapper(fc.Name, f)
filters[i] = model.NewFilterWrapper(fc.Name, f)
}

if fm.canSkipMethod == nil {
fm.canSkipMethod = canSkipMethod
}

// We can't cache the slice of filters as it may be changed by consumer
fm.filters = filters

if conf.consumerFiltersEndAt != 0 {
Expand All @@ -393,7 +383,6 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
fm.canSkipEncodeHeaders = fm.canSkipMethod["EncodeHeaders"]
fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"]
fm.canSkipOnLog = fm.canSkipMethod["OnLog"]
// TODO: add cache for consumer so that we can also cache the skip result too

return fm
}
Expand Down Expand Up @@ -502,29 +491,41 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo
}

if len(c.FilterConfigs) > 0 {
canSkipMethod := newSkipMethodsMap()
filters := make([]*filterWrapper, 0, len(c.FilterConfigs))
names := make([]string, 0, len(c.FilterConfigs))
for name, fc := range c.FilterConfigs {
names = append(names, name)

factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
for meth := range canSkipMethod {
ok, err := isMethodFromPassThroughFilter(f, meth)
if err != nil {
api.LogErrorf("failed to check method %s in filter: %v", meth, err)
// canSkipMethod[meth] will be false
if c.FilterWrappers == nil {
c.FilterWrappers = make([]*model.FilterWrapper, 0, len(c.FilterConfigs))
canSkipMethod := newSkipMethodsMap()
names := make([]string, 0, len(c.FilterConfigs))
for name, fc := range c.FilterConfigs {
names = append(names, name)

factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
for meth := range canSkipMethod {
ok, err := isMethodFromPassThroughFilter(f, meth)
if err != nil {
api.LogErrorf("failed to check method %s in filter: %v", meth, err)
// canSkipMethod[meth] will be false
}
canSkipMethod[meth] = canSkipMethod[meth] && ok
}
canSkipMethod[meth] = canSkipMethod[meth] && ok
nf := model.NewFilterWrapper(name, f)
c.FilterWrappers = append(c.FilterWrappers, nf)
}
nf := newFilterWrapper(name, f)
filters = append(filters, nf)
}

api.LogInfof("add filters %v from consumer %s", names, c.Name())
c.FilterNames = names
c.CanSkipMethod = canSkipMethod
} else {
for i, name := range c.FilterNames {
fc := c.FilterConfigs[name]
factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
c.FilterWrappers[i].Filter = f
}
}

canSkipMethod := c.CanSkipMethod
m.canSkipDecodeData = m.canSkipDecodeData && canSkipMethod["DecodeData"] && canSkipMethod["DecodeRequest"]
m.canSkipEncodeHeaders = m.canSkipEncodeData && canSkipMethod["EncodeHeaders"]
m.canSkipEncodeData = m.canSkipEncodeData && canSkipMethod["EncodeData"] && canSkipMethod["EncodeResponse"]
Expand All @@ -533,14 +534,14 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo
// TODO: add field to control if merging is allowed
i := 0
for _, f := range m.filters {
if c.FilterConfigs[f.name] == nil {
if c.FilterConfigs[f.Name] == nil {
m.filters[i] = f
i++
}
}
m.filters = append(m.filters[:i], filters...)
m.filters = append(m.filters[:i], c.FilterWrappers...)
sort.Slice(m.filters, func(i, j int) bool {
return pkgPlugins.ComparePluginOrder(m.filters[i].name, m.filters[j].name)
return pkgPlugins.ComparePluginOrder(m.filters[i].Name, m.filters[j].Name)
})
}
}
Expand Down
47 changes: 45 additions & 2 deletions pkg/filtermanager/filtermanager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package filtermanager

import (
"net/http"
"strconv"
"testing"

pkgConsumer "mosn.io/htnn/pkg/consumer"
"mosn.io/htnn/pkg/filtermanager/api"
"mosn.io/htnn/pkg/filtermanager/model"
"mosn.io/htnn/plugins/tests/pkg/envoy"
Expand All @@ -29,7 +31,7 @@ import (
func BenchmarkFilterManagerAllPhase(b *testing.B) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
config.parsed = []*model.ParsedFilterConfig{
{
Name: "allPhase",
Factory: PassThroughFactory,
Expand Down Expand Up @@ -74,7 +76,7 @@ func (f *regularFilter) OnLog() {
func BenchmarkFilterManagerRegular(b *testing.B) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
config.parsed = []*model.ParsedFilterConfig{
{
Name: "regular",
Factory: regularFactory,
Expand All @@ -89,3 +91,44 @@ func BenchmarkFilterManagerRegular(b *testing.B) {
m.OnLog()
}
}

func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) {
envoy.DisableLogInTest() // otherwise, there is too much output
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.consumerFiltersEndAt = 1

consumers := map[string]*pkgConsumer.Consumer{}
num := 10
reqHdrs := make([]api.RequestHeaderMap, num)
for i := 0; i < num; i++ {
c := pkgConsumer.Consumer{
FilterConfigs: map[string]*model.ParsedFilterConfig{
"regular": {
Name: "regular",
Factory: regularFactory,
},
},
}
consumers[strconv.Itoa(i)] = &c
h := http.Header{}
h.Add("Consumer", strconv.Itoa(i))
reqHdrs[i] = envoy.NewRequestHeaderMap(h)
}
config.parsed = []*model.ParsedFilterConfig{
{
Name: "set_consumer",
Factory: setConsumerFactory,
ParsedConfig: setConsumerConf{
Consumers: consumers,
},
},
}

for n := 0; n < b.N; n++ {
m := FilterManagerFactory(config, cb)
m.DecodeHeaders(reqHdrs[n%num], false)
cb.WaitContinued()
m.OnLog()
}
}
Loading

0 comments on commit d8ce0ee

Please sign in to comment.