Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filtermanager: cache consumer filter #324

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Consumer struct {
generation int
ConsumerConfigs map[string]api.PluginConsumerConfig `json:"-"`
FilterConfigs map[string]*model.ParsedFilterConfig `json:"-"`

// fields that generated from the configuration
CanSkipMethod map[string]bool
FilterNames []string
FilterWrappers []*model.FilterWrapper
}

func (c *Consumer) Marshal() string {
Expand Down
95 changes: 48 additions & 47 deletions pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type FilterManagerConfig struct {
type filterManagerConfig struct {
consumerFiltersEndAt int

current []*model.ParsedFilterConfig
pool *sync.Pool
parsed []*model.ParsedFilterConfig
pool *sync.Pool
}

func initFilterManagerConfig(namespace string) *filterManagerConfig {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
// No configuration
if any.GetTypeUrl() == "" {
conf := &filterManagerConfig{
current: []*model.ParsedFilterConfig{},
parsed: []*model.ParsedFilterConfig{},
}
return conf, nil
}
Expand All @@ -108,7 +108,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC

plugins := fmConfig.Plugins
conf := initFilterManagerConfig(fmConfig.Namespace)
conf.current = make([]*model.ParsedFilterConfig, 0, len(plugins))
conf.parsed = make([]*model.ParsedFilterConfig, 0, len(plugins))

consumerFiltersEndAt := 0
i := 0
Expand All @@ -126,12 +126,12 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
// As we can't control what is returned from a plugin, we need to
// avoid the failure by providing a special factory, which also
// indicates something is wrong.
conf.current = append(conf.current, &model.ParsedFilterConfig{
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
Factory: InternalErrorFactory,
})
} else {
conf.current = append(conf.current, &model.ParsedFilterConfig{
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
Expand Down Expand Up @@ -168,21 +168,9 @@ func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{})
return child
}

type filterWrapper struct {
api.Filter
name string
}

func newFilterWrapper(name string, f api.Filter) *filterWrapper {
return &filterWrapper{
Filter: f,
name: name,
}
}

type filterManager struct {
filters []*filterWrapper
consumerFilters []*filterWrapper
filters []*model.FilterWrapper
consumerFilters []*model.FilterWrapper

decodeRequestNeeded bool
decodeIdx int
Expand Down Expand Up @@ -345,7 +333,7 @@ func newSkipMethodsMap() map[string]bool {

func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.StreamFilter {
conf := c.(*filterManagerConfig)
newConfig := conf.current
parsedConfig := conf.parsed

fm := conf.pool.Get().(*filterManager)
fm.callbacks.FilterCallbackHandler = cb
Expand All @@ -355,8 +343,8 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
canSkipMethod = newSkipMethodsMap()
}

filters := make([]*filterWrapper, len(newConfig))
for i, fc := range newConfig {
filters := make([]*model.FilterWrapper, len(parsedConfig))
for i, fc := range parsedConfig {
factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, fm.callbacks)
Expand All @@ -371,12 +359,14 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
canSkipMethod[meth] = canSkipMethod[meth] && ok
}
}
filters[i] = newFilterWrapper(fc.Name, f)
filters[i] = model.NewFilterWrapper(fc.Name, f)
}

if fm.canSkipMethod == nil {
fm.canSkipMethod = canSkipMethod
}

// We can't cache the slice of filters as it may be changed by consumer
fm.filters = filters

if conf.consumerFiltersEndAt != 0 {
Expand All @@ -393,7 +383,6 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) capi.Str
fm.canSkipEncodeHeaders = fm.canSkipMethod["EncodeHeaders"]
fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"]
fm.canSkipOnLog = fm.canSkipMethod["OnLog"]
// TODO: add cache for consumer so that we can also cache the skip result too

return fm
}
Expand Down Expand Up @@ -502,29 +491,41 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo
}

if len(c.FilterConfigs) > 0 {
canSkipMethod := newSkipMethodsMap()
filters := make([]*filterWrapper, 0, len(c.FilterConfigs))
names := make([]string, 0, len(c.FilterConfigs))
for name, fc := range c.FilterConfigs {
names = append(names, name)

factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
for meth := range canSkipMethod {
ok, err := isMethodFromPassThroughFilter(f, meth)
if err != nil {
api.LogErrorf("failed to check method %s in filter: %v", meth, err)
// canSkipMethod[meth] will be false
if c.FilterWrappers == nil {
c.FilterWrappers = make([]*model.FilterWrapper, 0, len(c.FilterConfigs))
canSkipMethod := newSkipMethodsMap()
names := make([]string, 0, len(c.FilterConfigs))
for name, fc := range c.FilterConfigs {
names = append(names, name)

factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
for meth := range canSkipMethod {
ok, err := isMethodFromPassThroughFilter(f, meth)
if err != nil {
api.LogErrorf("failed to check method %s in filter: %v", meth, err)
// canSkipMethod[meth] will be false
}
canSkipMethod[meth] = canSkipMethod[meth] && ok
}
canSkipMethod[meth] = canSkipMethod[meth] && ok
nf := model.NewFilterWrapper(name, f)
c.FilterWrappers = append(c.FilterWrappers, nf)
}
nf := newFilterWrapper(name, f)
filters = append(filters, nf)
}

api.LogInfof("add filters %v from consumer %s", names, c.Name())
c.FilterNames = names
c.CanSkipMethod = canSkipMethod
} else {
for i, name := range c.FilterNames {
fc := c.FilterConfigs[name]
factory := fc.Factory
config := fc.ParsedConfig
f := factory(config, m.callbacks)
c.FilterWrappers[i].Filter = f
}
}

canSkipMethod := c.CanSkipMethod
m.canSkipDecodeData = m.canSkipDecodeData && canSkipMethod["DecodeData"] && canSkipMethod["DecodeRequest"]
m.canSkipEncodeHeaders = m.canSkipEncodeData && canSkipMethod["EncodeHeaders"]
m.canSkipEncodeData = m.canSkipEncodeData && canSkipMethod["EncodeData"] && canSkipMethod["EncodeResponse"]
Expand All @@ -533,14 +534,14 @@ func (m *filterManager) DecodeHeaders(headers api.RequestHeaderMap, endStream bo
// TODO: add field to control if merging is allowed
i := 0
for _, f := range m.filters {
if c.FilterConfigs[f.name] == nil {
if c.FilterConfigs[f.Name] == nil {
m.filters[i] = f
i++
}
}
m.filters = append(m.filters[:i], filters...)
m.filters = append(m.filters[:i], c.FilterWrappers...)
sort.Slice(m.filters, func(i, j int) bool {
return pkgPlugins.ComparePluginOrder(m.filters[i].name, m.filters[j].name)
return pkgPlugins.ComparePluginOrder(m.filters[i].Name, m.filters[j].Name)
})
}
}
Expand Down
47 changes: 45 additions & 2 deletions pkg/filtermanager/filtermanager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package filtermanager

import (
"net/http"
"strconv"
"testing"

pkgConsumer "mosn.io/htnn/pkg/consumer"
"mosn.io/htnn/pkg/filtermanager/api"
"mosn.io/htnn/pkg/filtermanager/model"
"mosn.io/htnn/plugins/tests/pkg/envoy"
Expand All @@ -29,7 +31,7 @@ import (
func BenchmarkFilterManagerAllPhase(b *testing.B) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
config.parsed = []*model.ParsedFilterConfig{
{
Name: "allPhase",
Factory: PassThroughFactory,
Expand Down Expand Up @@ -74,7 +76,7 @@ func (f *regularFilter) OnLog() {
func BenchmarkFilterManagerRegular(b *testing.B) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
config.parsed = []*model.ParsedFilterConfig{
{
Name: "regular",
Factory: regularFactory,
Expand All @@ -89,3 +91,44 @@ func BenchmarkFilterManagerRegular(b *testing.B) {
m.OnLog()
}
}

func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) {
envoy.DisableLogInTest() // otherwise, there is too much output
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.consumerFiltersEndAt = 1

consumers := map[string]*pkgConsumer.Consumer{}
num := 10
reqHdrs := make([]api.RequestHeaderMap, num)
for i := 0; i < num; i++ {
c := pkgConsumer.Consumer{
FilterConfigs: map[string]*model.ParsedFilterConfig{
"regular": {
Name: "regular",
Factory: regularFactory,
},
},
}
consumers[strconv.Itoa(i)] = &c
h := http.Header{}
h.Add("Consumer", strconv.Itoa(i))
reqHdrs[i] = envoy.NewRequestHeaderMap(h)
}
config.parsed = []*model.ParsedFilterConfig{
{
Name: "set_consumer",
Factory: setConsumerFactory,
ParsedConfig: setConsumerConf{
Consumers: consumers,
},
},
}

for n := 0; n < b.N; n++ {
m := FilterManagerFactory(config, cb)
m.DecodeHeaders(reqHdrs[n%num], false)
cb.WaitContinued()
m.OnLog()
}
}
Loading