From 0233a3986cd5b89892ef0d17821b83fefc0f9528 Mon Sep 17 00:00:00 2001 From: spacewander Date: Mon, 18 Nov 2024 16:10:06 +0800 Subject: [PATCH] check it per phase Signed-off-by: spacewander --- api/internal/consumer/consumer.go | 8 +-- api/pkg/filtermanager/api/phase.go | 63 ++++++++++++++++ api/pkg/filtermanager/config.go | 8 +-- api/pkg/filtermanager/filtermanager.go | 71 ++++++++----------- .../filtermanager_benchmark_test.go | 6 +- api/pkg/filtermanager/filtermanager_test.go | 9 ++- api/pkg/filtermanager/model/model.go | 12 ++-- api/pkg/plugins/plugins.go | 6 +- api/pkg/plugins/type.go | 2 +- types/plugins/demo/config.go | 8 ++- 10 files changed, 122 insertions(+), 71 deletions(-) create mode 100644 api/pkg/filtermanager/api/phase.go diff --git a/api/internal/consumer/consumer.go b/api/internal/consumer/consumer.go index 7c358986..5736b26a 100644 --- a/api/internal/consumer/consumer.go +++ b/api/internal/consumer/consumer.go @@ -94,10 +94,10 @@ func (c *Consumer) InitConfigs() error { } c.FilterConfigs[name] = &fmModel.ParsedFilterConfig{ - Name: name, - ParsedConfig: conf, - Factory: p.Factory, - CanSyncRun: p.ConfigParser.IsNonBlocking(), + Name: name, + ParsedConfig: conf, + Factory: p.Factory, + SyncRunPhases: p.ConfigParser.NonBlockingPhases(), } } diff --git a/api/pkg/filtermanager/api/phase.go b/api/pkg/filtermanager/api/phase.go new file mode 100644 index 00000000..12017d51 --- /dev/null +++ b/api/pkg/filtermanager/api/phase.go @@ -0,0 +1,63 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +type Phase int + +const ( + PhaseDecodeHeaders Phase = 0x01 + PhaseDecodeData = 0x02 + PhaseDecodeTrailers = 0x04 + PhaseDecodeRequest = 0x08 + PhaseEncodeHeaders = 0x10 + PhaseEncodeData = 0x20 + PhaseEncodeTrailers = 0x40 + PhaseEncodeResponse = 0x80 + PhaseOnLog = 0x100 +) + +var ( + AllPhases = PhaseDecodeHeaders | PhaseDecodeData | PhaseDecodeTrailers | PhaseDecodeRequest | + PhaseEncodeHeaders | PhaseEncodeData | PhaseEncodeTrailers | PhaseEncodeResponse | PhaseOnLog +) + +func (p Phase) Contains(phases Phase) bool { + return p&phases == phases +} + +func MethodToPhase(meth string) Phase { + switch meth { + case "DecodeHeaders": + return PhaseDecodeHeaders + case "DecodeData": + return PhaseDecodeData + case "DecodeTrailers": + return PhaseDecodeTrailers + case "DecodeRequest": + return PhaseDecodeRequest + case "EncodeHeaders": + return PhaseEncodeHeaders + case "EncodeData": + return PhaseEncodeData + case "EncodeTrailers": + return PhaseEncodeTrailers + case "EncodeResponse": + return PhaseEncodeResponse + case "OnLog": + return PhaseOnLog + default: + return 0 + } +} diff --git a/api/pkg/filtermanager/config.go b/api/pkg/filtermanager/config.go index 4d72b308..6cd411fe 100644 --- a/api/pkg/filtermanager/config.go +++ b/api/pkg/filtermanager/config.go @@ -232,10 +232,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC }) } else { conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{ - Name: proto.Name, - ParsedConfig: config, - Factory: plugin.Factory, - CanSyncRun: plugin.ConfigParser.IsNonBlocking(), + Name: proto.Name, + ParsedConfig: config, + Factory: plugin.Factory, + SyncRunPhases: plugin.ConfigParser.NonBlockingPhases(), }) _, ok := pkgPlugins.LoadPlugin(name).(pkgPlugins.ConsumerPlugin) diff --git a/api/pkg/filtermanager/filtermanager.go b/api/pkg/filtermanager/filtermanager.go index c8ec98dc..f89abcab 100644 --- a/api/pkg/filtermanager/filtermanager.go +++ b/api/pkg/filtermanager/filtermanager.go @@ -122,19 +122,6 @@ func (m *filterManager) DebugModeEnabled() bool { return m.config.enableDebugMode } -type phase int - -const ( - phaseDecodeHeaders phase = iota - phaseDecodeData - phaseDecodeTrailers - phaseDecodeRequest - phaseEncodeHeaders - phaseEncodeData - phaseEncodeTrailers - phaseEncodeResponse -) - func newSkipMethodsMap() map[string]bool { return map[string]bool{ "DecodeHeaders": true, @@ -222,7 +209,7 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF if overridden { // canSkipMethod contains canSyncRunMethod so we can safely check it in the same loop - canSyncRunMethod[meth] = canSyncRunMethod[meth] && fc.CanSyncRun + canSyncRunMethod[meth] = canSyncRunMethod[meth] && fc.SyncRunPhases.Contains(api.MethodToPhase(meth)) } } @@ -295,14 +282,14 @@ func (m *filterManager) recordLocalReplyPluginName(name string) { // off a goroutine and the goroutine panics. } -func (m *filterManager) handleAction(res api.ResultAction, phase phase, filter *model.FilterWrapper) (needReturn bool) { +func (m *filterManager) handleAction(res api.ResultAction, phase api.Phase, filter *model.FilterWrapper) (needReturn bool) { if res == api.Continue { return false } if res == api.WaitAllData { - if phase == phaseDecodeHeaders { + if phase == api.PhaseDecodeHeaders { m.decodeRequestNeeded = true - } else if phase == phaseEncodeHeaders { + } else if phase == api.PhaseEncodeHeaders { m.encodeResponseNeeded = true } else { api.LogErrorf("WaitAllData only allowed when processing headers, phase: %v. "+ @@ -314,7 +301,7 @@ func (m *filterManager) handleAction(res api.ResultAction, phase phase, filter * switch v := res.(type) { case *api.LocalResponse: m.recordLocalReplyPluginName(filter.Name) - m.localReply(v, phase < phaseEncodeHeaders) + m.localReply(v, phase < api.PhaseEncodeHeaders) return true default: api.LogErrorf("unknown result action: %+v", v) @@ -440,7 +427,7 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b f := m.filters[i] // We don't support DecodeRequest for now res = f.DecodeHeaders(m.reqHdr, endStream) - if m.handleAction(res, phaseDecodeHeaders, f) { + if m.handleAction(res, api.PhaseDecodeHeaders, f) { return capi.LocalReply } } @@ -493,7 +480,7 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b canSkipMethod[meth] = canSkipMethod[meth] && !overridden if overridden { - canSyncRunMethod[meth] = canSyncRunMethod[meth] && fc.CanSyncRun + canSyncRunMethod[meth] = canSyncRunMethod[meth] && fc.SyncRunPhases.Contains(api.MethodToPhase(meth)) } } } @@ -565,7 +552,7 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b for i := m.config.consumerFiltersEndAt; i < len(m.filters); i++ { f := m.filters[i] res = f.DecodeHeaders(m.reqHdr, endStream) - if m.handleAction(res, phaseDecodeHeaders, f) { + if m.handleAction(res, api.PhaseDecodeHeaders, f) { return capi.LocalReply } @@ -580,7 +567,7 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b // no body and no trailers res = f.DecodeRequest(m.reqHdr, nil, nil) - if m.handleAction(res, phaseDecodeRequest, f) { + if m.handleAction(res, api.PhaseDecodeRequest, f) { return capi.LocalReply } } @@ -600,7 +587,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf for i := 0; i < m.decodeIdx; i++ { f := m.filters[i] res = f.DecodeData(buf, endStreamInBody) - if m.handleAction(res, phaseDecodeData, f) { + if m.handleAction(res, api.PhaseDecodeData, f) { return false } } @@ -611,7 +598,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf for i := 0; i < m.decodeIdx; i++ { f := m.filters[i] res = f.DecodeTrailers(trailers) - if m.handleAction(res, phaseDecodeTrailers, f) { + if m.handleAction(res, api.PhaseDecodeTrailers, f) { return false } } @@ -619,7 +606,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf f := m.filters[m.decodeIdx] res = f.DecodeRequest(headers, buf, trailers) - if m.handleAction(res, phaseDecodeRequest, f) { + if m.handleAction(res, api.PhaseDecodeRequest, f) { return false } @@ -631,7 +618,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf // The endStream in DecodeHeaders indicates whether there is a body. // The body always exists when we hit this path. res = f.DecodeHeaders(headers, false) - if m.handleAction(res, phaseDecodeHeaders, f) { + if m.handleAction(res, api.PhaseDecodeHeaders, f) { return false } if m.decodeRequestNeeded { @@ -646,7 +633,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf for j := m.decodeIdx + 1; j < i; j++ { f := m.filters[j] res = f.DecodeData(buf, endStreamInBody) - if m.handleAction(res, phaseDecodeData, f) { + if m.handleAction(res, api.PhaseDecodeData, f) { return false } } @@ -656,7 +643,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf for j := m.decodeIdx + 1; j < i; j++ { f := m.filters[j] res = f.DecodeTrailers(trailers) - if m.handleAction(res, phaseDecodeTrailers, f) { + if m.handleAction(res, api.PhaseDecodeTrailers, f) { return false } } @@ -667,7 +654,7 @@ func (m *filterManager) DecodeRequest(headers api.RequestHeaderMap, buf capi.Buf m.decodeIdx = i f := m.filters[m.decodeIdx] res = f.DecodeRequest(headers, buf, trailers) - if m.handleAction(res, phaseDecodeRequest, f) { + if m.handleAction(res, api.PhaseDecodeRequest, f) { return false } i++ @@ -724,7 +711,7 @@ func (m *filterManager) decodeData(buf capi.BufferInstance, endStream bool) capi for i := 0; i < n; i++ { f := m.filters[i] res = f.DecodeData(buf, endStream) - if m.handleAction(res, phaseDecodeData, f) { + if m.handleAction(res, api.PhaseDecodeData, f) { return capi.LocalReply } } @@ -771,7 +758,7 @@ func (m *filterManager) decodeTrailers(trailers capi.RequestTrailerMap) capi.Sta if m.decodeIdx == -1 { for _, f := range m.filters { res = f.DecodeTrailers(trailers) - if m.handleAction(res, phaseDecodeTrailers, f) { + if m.handleAction(res, api.PhaseDecodeTrailers, f) { return capi.LocalReply } } @@ -825,7 +812,7 @@ func (m *filterManager) encodeHeaders(headers capi.ResponseHeaderMap, endStream for i := n - 1; i >= 0; i-- { f := m.filters[i] res = f.EncodeHeaders(headers, endStream) - if m.handleAction(res, phaseEncodeHeaders, f) { + if m.handleAction(res, api.PhaseEncodeHeaders, f) { return capi.LocalReply } @@ -838,7 +825,7 @@ func (m *filterManager) encodeHeaders(headers capi.ResponseHeaderMap, endStream // no body res = f.EncodeResponse(headers, nil, nil) - if m.handleAction(res, phaseEncodeResponse, f) { + if m.handleAction(res, api.PhaseEncodeResponse, f) { return capi.LocalReply } } @@ -858,7 +845,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B for i := n - 1; i > m.encodeIdx; i-- { f := m.filters[i] res = f.EncodeData(buf, endStreamInBody) - if m.handleAction(res, phaseEncodeData, f) { + if m.handleAction(res, api.PhaseEncodeData, f) { return false } } @@ -868,7 +855,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B for i := n - 1; i > m.encodeIdx; i-- { f := m.filters[i] res = f.EncodeTrailers(trailers) - if m.handleAction(res, phaseEncodeTrailers, f) { + if m.handleAction(res, api.PhaseEncodeTrailers, f) { return false } } @@ -876,7 +863,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B f := m.filters[m.encodeIdx] res = f.EncodeResponse(m.rspHdr, buf, nil) - if m.handleAction(res, phaseEncodeResponse, f) { + if m.handleAction(res, api.PhaseEncodeResponse, f) { return false } @@ -885,7 +872,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B for ; i >= 0; i-- { f := m.filters[i] res = f.EncodeHeaders(m.rspHdr, false) - if m.handleAction(res, phaseEncodeHeaders, f) { + if m.handleAction(res, api.PhaseEncodeHeaders, f) { return false } if m.encodeResponseNeeded { @@ -898,7 +885,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B for j := m.encodeIdx - 1; j > i; j-- { f := m.filters[j] res = f.EncodeData(buf, endStreamInBody) - if m.handleAction(res, phaseEncodeData, f) { + if m.handleAction(res, api.PhaseEncodeData, f) { return false } } @@ -908,7 +895,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B for j := m.encodeIdx - 1; j > i; j-- { f := m.filters[j] res = f.EncodeTrailers(trailers) - if m.handleAction(res, phaseEncodeTrailers, f) { + if m.handleAction(res, api.PhaseEncodeTrailers, f) { return false } } @@ -919,7 +906,7 @@ func (m *filterManager) EncodeResponse(headers api.ResponseHeaderMap, buf capi.B m.encodeIdx = i f := m.filters[m.encodeIdx] res = f.EncodeResponse(m.rspHdr, buf, nil) - if m.handleAction(res, phaseEncodeResponse, f) { + if m.handleAction(res, api.PhaseEncodeResponse, f) { return false } i-- @@ -963,7 +950,7 @@ func (m *filterManager) encodeData(buf capi.BufferInstance, endStream bool) capi for i := n - 1; i >= 0; i-- { f := m.filters[i] res = f.EncodeData(buf, endStream) - if m.handleAction(res, phaseEncodeData, f) { + if m.handleAction(res, api.PhaseEncodeData, f) { return capi.LocalReply } } @@ -1009,7 +996,7 @@ func (m *filterManager) encodeTrailers(trailers capi.ResponseTrailerMap) capi.St if m.encodeIdx == -1 { for _, f := range m.filters { res = f.EncodeTrailers(trailers) - if m.handleAction(res, phaseEncodeTrailers, f) { + if m.handleAction(res, api.PhaseEncodeTrailers, f) { return capi.LocalReply } } diff --git a/api/pkg/filtermanager/filtermanager_benchmark_test.go b/api/pkg/filtermanager/filtermanager_benchmark_test.go index 099a9cc1..5c0c2057 100644 --- a/api/pkg/filtermanager/filtermanager_benchmark_test.go +++ b/api/pkg/filtermanager/filtermanager_benchmark_test.go @@ -64,9 +64,9 @@ func BenchmarkFilterManagerAllPhaseCanSyncRun(b *testing.B) { config := initFilterManagerConfig("ns") config.parsed = []*model.ParsedFilterConfig{ { - Name: "allPhase", - Factory: PassThroughFactory, - CanSyncRun: true, + Name: "allPhase", + Factory: PassThroughFactory, + SyncRunPhases: api.AllPhases, }, } reqHdr := envoy.NewRequestHeaderMap(http.Header{}) diff --git a/api/pkg/filtermanager/filtermanager_test.go b/api/pkg/filtermanager/filtermanager_test.go index f2f07652..ac76582d 100644 --- a/api/pkg/filtermanager/filtermanager_test.go +++ b/api/pkg/filtermanager/filtermanager_test.go @@ -459,7 +459,7 @@ func TestFiltersFromConsumer(t *testing.T) { ParsedConfig: addRespConf{ hdrName: fmt.Sprintf("x-htnn-resp-%d", i), }, - CanSyncRun: true, + SyncRunPhases: api.PhaseEncodeHeaders, }, }, } @@ -708,12 +708,11 @@ func TestSyncRunWhenThereAreMultiFilters(t *testing.T) { ParsedConfig: addReqConf{ hdrName: "x-htnn-route", }, - CanSyncRun: false, }, { - Name: "access_field_on_log", - Factory: accessFieldOnLogFactory, - CanSyncRun: true, + Name: "access_field_on_log", + Factory: accessFieldOnLogFactory, + SyncRunPhases: api.AllPhases, }, } diff --git a/api/pkg/filtermanager/model/model.go b/api/pkg/filtermanager/model/model.go index a6580134..9ca6d087 100644 --- a/api/pkg/filtermanager/model/model.go +++ b/api/pkg/filtermanager/model/model.go @@ -30,12 +30,12 @@ type FilterConfig struct { } type ParsedFilterConfig struct { - Name string - ParsedConfig interface{} - InitOnce sync.Once - InitFailure error - Factory api.FilterFactory - CanSyncRun bool + Name string + ParsedConfig interface{} + InitOnce sync.Once + InitFailure error + Factory api.FilterFactory + SyncRunPhases api.Phase } type FilterWrapper struct { diff --git a/api/pkg/plugins/plugins.go b/api/pkg/plugins/plugins.go index 12b974c9..26b8de71 100644 --- a/api/pkg/plugins/plugins.go +++ b/api/pkg/plugins/plugins.go @@ -37,7 +37,7 @@ var ( type FilterConfigParser interface { Parse(input interface{}) (interface{}, error) Merge(parentConfig interface{}, childConfig interface{}) interface{} - IsNonBlocking() bool + NonBlockingPhases() api.Phase } type FilterFactoryAndParser struct { @@ -198,8 +198,8 @@ func (p *PluginMethodDefaultImpl) Merge(parent interface{}, child interface{}) i return child } -func (p *PluginMethodDefaultImpl) IsNonBlocking() bool { - return false +func (p *PluginMethodDefaultImpl) NonBlockingPhases() api.Phase { + return 0 } func ComparePluginOrder(a, b string) bool { diff --git a/api/pkg/plugins/type.go b/api/pkg/plugins/type.go index baa074fc..1c258586 100644 --- a/api/pkg/plugins/type.go +++ b/api/pkg/plugins/type.go @@ -142,7 +142,7 @@ type Plugin interface { Type() PluginType Order() PluginOrder Merge(parent interface{}, child interface{}) interface{} - IsNonBlocking() bool + NonBlockingPhases() api.Phase } type Initer interface { diff --git a/types/plugins/demo/config.go b/types/plugins/demo/config.go index 01963758..4d179172 100644 --- a/types/plugins/demo/config.go +++ b/types/plugins/demo/config.go @@ -51,7 +51,7 @@ func (p *Plugin) Order() plugins.PluginOrder { } } -// IsNonBlocking returns whether the plugin is non-blocking, default to false. +// NonBlockingPhases returns the phases of the plugin which can be run non-blockingly, default to 0. // If the plugin's filter doesn't contain any blocking operation, it should return true. // A blocking operation can be: // 1. I/O operation @@ -62,8 +62,10 @@ func (p *Plugin) Order() plugins.PluginOrder { // // If a phase only contains non-blocking plugins, it will be executed synchorously, which is // more effective. -func (p *Plugin) IsNonBlocking() bool { - return true +// +// Phase OnLog is always be executed synchorously so we don't need to specify it here. +func (p *Plugin) NonBlockingPhases() api.Phase { + return api.PhaseDecodeHeaders | api.PhaseEncodeHeaders } // Config returns api.PluginConfig's implementation used during configuration processing