From cea43061a868da459f836081149199ba05fd68a0 Mon Sep 17 00:00:00 2001 From: rambohe Date: Tue, 7 Jan 2025 01:14:17 +1100 Subject: [PATCH] feat: improve filter chain and FilterFinder interface (#2242) Signed-off-by: rambohe-ch --- cmd/yurthub/app/config/config.go | 7 +- pkg/yurthub/filter/interfaces.go | 2 +- pkg/yurthub/filter/manager/manager.go | 17 ++- pkg/yurthub/filter/manager/manager_test.go | 137 +++++++++++++++++- .../filterchain.go => objectfilter/chain.go} | 16 +- pkg/yurthub/filter/responsefilter/filter.go | 37 +---- .../proxy/multiplexer/multiplexerlist.go | 8 +- .../proxy/multiplexer/multiplexerproxy.go | 6 +- .../proxy/multiplexer/multiplexerwatch.go | 9 +- .../multiplexer/testing/fake_filtermanager.go | 8 +- pkg/yurthub/proxy/pool/pool.go | 6 +- pkg/yurthub/proxy/proxy.go | 6 +- pkg/yurthub/proxy/remote/loadbalancer.go | 12 +- 13 files changed, 195 insertions(+), 76 deletions(-) rename pkg/yurthub/filter/{filterchain/filterchain.go => objectfilter/chain.go} (73%) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 8c605bfc63d..54373cbb096 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -51,6 +51,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate" certificatemgr "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" @@ -95,7 +96,7 @@ type YurtHubConfiguration struct { NodePoolInformerFactory dynamicinformer.DynamicSharedInformerFactory WorkingMode util.WorkingMode KubeletHealthGracePeriod time.Duration - FilterManager *manager.Manager + FilterFinder filter.FilterFinder CoordinatorServer *url.URL MinRequestTimeout time.Duration TenantNs string @@ -157,7 +158,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { } tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations) registerInformers(options, sharedFactory, workingMode, tenantNs) - filterManager, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager) + filterFinder, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager) if err != nil { klog.Errorf("could not create filter manager, %v", err) return nil, err @@ -181,7 +182,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { SharedFactory: sharedFactory, NodePoolInformerFactory: dynamicSharedFactory, KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, - FilterManager: filterManager, + FilterFinder: filterFinder, MinRequestTimeout: options.MinRequestTimeout, TenantNs: tenantNs, YurtHubProxyServerAddr: fmt.Sprintf("%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index 9770e887398..6da552a6281 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -61,7 +61,7 @@ type ObjectFilter interface { type FilterFinder interface { FindResponseFilter(req *http.Request) (ResponseFilter, bool) - FindObjectFilters(req *http.Request) ObjectFilter + FindObjectFilter(req *http.Request) (ObjectFilter, bool) } type NodeGetter func(name string) (*v1.Node, error) diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index 10ab21eb2ea..1469d7346df 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -29,8 +29,8 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/approver" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" - "github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain" "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/objectfilter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -46,7 +46,7 @@ func NewFilterManager(options *yurtoptions.YurtHubOptions, sharedFactory informers.SharedInformerFactory, dynamicSharedFactory dynamicinformer.DynamicSharedInformerFactory, proxiedClient kubernetes.Interface, - serializerManager *serializer.SerializerManager) (*Manager, error) { + serializerManager *serializer.SerializerManager) (filter.FilterFinder, error) { if !options.EnableResourceFilter { return nil, nil } @@ -113,19 +113,22 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, return nil, false } -func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter { - objectFilters := make([]filter.ObjectFilter, 0) +func (m *Manager) FindObjectFilter(req *http.Request) (filter.ObjectFilter, bool) { approved, filterNames := m.Approver.Approve(req) if !approved { - return nil + return nil, false } + objectFilters := make([]filter.ObjectFilter, 0) for i := range filterNames { if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok { objectFilters = append(objectFilters, objectFilter) } } - filters := filterchain.FilterChain(objectFilters) - return filters + if len(objectFilters) == 0 { + return nil, false + } + + return objectfilter.CreateFilterChain(objectFilters), true } diff --git a/pkg/yurthub/filter/manager/manager_test.go b/pkg/yurthub/filter/manager/manager_test.go index af97ea60ae2..f67d7e73117 100644 --- a/pkg/yurthub/filter/manager/manager_test.go +++ b/pkg/yurthub/filter/manager/manager_test.go @@ -131,8 +131,8 @@ func TestFindResponseFilter(t *testing.T) { stopper := make(chan struct{}) defer close(stopper) - mgr, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager) - if tt.mgrIsNil && mgr == nil { + finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager) + if tt.mgrIsNil && finder == nil { return } @@ -152,7 +152,7 @@ func TestFindResponseFilter(t *testing.T) { var isFound bool var responseFilter filter.ResponseFilter var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - responseFilter, isFound = mgr.FindResponseFilter(req) + responseFilter, isFound = finder.FindResponseFilter(req) }) handler = util.WithRequestClientComponent(handler, util2.WorkingModeEdge) @@ -171,6 +171,137 @@ func TestFindResponseFilter(t *testing.T) { } } +func TestFindObjectFilter(t *testing.T) { + fakeClient := &fake.Clientset{} + scheme := runtime.NewScheme() + apis.AddToScheme(scheme) + fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + serializerManager := serializer.NewSerializerManager() + + testcases := map[string]struct { + enableResourceFilter bool + workingMode string + disabledResourceFilters []string + enableDummyIf bool + userAgent string + verb string + path string + mgrIsNil bool + isFound bool + names sets.Set[string] + }{ + "disable resource filter": { + enableResourceFilter: false, + mgrIsNil: true, + }, + "get master service filter": { + enableResourceFilter: true, + enableDummyIf: true, + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/services", + isFound: true, + names: sets.New("masterservice"), + }, + "get discard cloud service and node port isolation filter": { + enableResourceFilter: true, + enableDummyIf: true, + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/services", + isFound: true, + names: sets.New("discardcloudservice", "nodeportisolation"), + }, + "get service topology filter": { + enableResourceFilter: true, + enableDummyIf: false, + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/endpoints", + isFound: true, + names: sets.New("servicetopology"), + }, + "disable service topology filter": { + enableResourceFilter: true, + disabledResourceFilters: []string{"servicetopology"}, + enableDummyIf: true, + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/endpoints", + isFound: false, + }, + "can't get discard cloud service filter in cloud mode": { + enableResourceFilter: true, + workingMode: "cloud", + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/services", + isFound: true, + names: sets.New("nodeportisolation"), + }, + } + + resolver := newTestRequestInfoResolver() + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + options := &options.YurtHubOptions{ + EnableResourceFilter: tt.enableResourceFilter, + WorkingMode: tt.workingMode, + DisabledResourceFilters: make([]string, 0), + EnableDummyIf: tt.enableDummyIf, + NodeName: "test", + YurtHubProxySecurePort: 10268, + HubAgentDummyIfIP: "127.0.0.1", + YurtHubProxyHost: "127.0.0.1", + } + options.DisabledResourceFilters = append(options.DisabledResourceFilters, tt.disabledResourceFilters...) + + sharedFactory, nodePoolFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), + dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, 24*time.Hour) + + stopper := make(chan struct{}) + defer close(stopper) + + finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager) + if tt.mgrIsNil && finder == nil { + return + } + + sharedFactory.Start(stopper) + nodePoolFactory.Start(stopper) + + req, err := http.NewRequest(tt.verb, tt.path, nil) + if err != nil { + t.Errorf("failed to create request, %v", err) + } + req.RemoteAddr = "127.0.0.1" + + if len(tt.userAgent) != 0 { + req.Header.Set("User-Agent", tt.userAgent) + } + + var isFound bool + var objectFilter filter.ObjectFilter + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + objectFilter, isFound = finder.FindObjectFilter(req) + }) + + handler = util.WithRequestClientComponent(handler, util2.WorkingModeEdge) + handler = filters.WithRequestInfo(handler, resolver) + handler.ServeHTTP(httptest.NewRecorder(), req) + + if !tt.isFound && isFound == tt.isFound { + return + } + + names := strings.Split(objectFilter.Name(), ",") + if !tt.names.Equal(sets.New(names...)) { + t.Errorf("expect filter names %v, but got %v", sets.List(tt.names), names) + } + }) + } +} + func newTestRequestInfoResolver() *request.RequestInfoFactory { return &request.RequestInfoFactory{ APIPrefixes: sets.NewString("api", "apis"), diff --git a/pkg/yurthub/filter/filterchain/filterchain.go b/pkg/yurthub/filter/objectfilter/chain.go similarity index 73% rename from pkg/yurthub/filter/filterchain/filterchain.go rename to pkg/yurthub/filter/objectfilter/chain.go index 2c3270af7f9..4f070a45517 100644 --- a/pkg/yurthub/filter/filterchain/filterchain.go +++ b/pkg/yurthub/filter/objectfilter/chain.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package filterchain +package objectfilter import ( "strings" @@ -26,9 +26,15 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter" ) -type FilterChain []filter.ObjectFilter +type filterChain []filter.ObjectFilter -func (chain FilterChain) Name() string { +func CreateFilterChain(objFilters []filter.ObjectFilter) filter.ObjectFilter { + chain := make(filterChain, 0) + chain = append(chain, objFilters...) + return chain +} + +func (chain filterChain) Name() string { var names []string for i := range chain { names = append(names, chain[i].Name()) @@ -36,12 +42,12 @@ func (chain FilterChain) Name() string { return strings.Join(names, ",") } -func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] { +func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] { // do nothing return map[string]sets.Set[string]{} } -func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (chain filterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { for i := range chain { obj = chain[i].Filter(obj, stopCh) if yurtutil.IsNil(obj) { diff --git a/pkg/yurthub/filter/responsefilter/filter.go b/pkg/yurthub/filter/responsefilter/filter.go index eb6df24994c..87c24401c10 100644 --- a/pkg/yurthub/filter/responsefilter/filter.go +++ b/pkg/yurthub/filter/responsefilter/filter.go @@ -22,17 +22,16 @@ import ( "errors" "io" "net/http" - "strings" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" yurtutil "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/objectfilter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -214,38 +213,6 @@ func createSerializer(respContentType string, info *apirequest.RequestInfo, sm * return sm.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) } -type filterChain []filter.ObjectFilter - -func createFilterChain(objFilters []filter.ObjectFilter) filter.ObjectFilter { - chain := make(filterChain, 0) - chain = append(chain, objFilters...) - return chain -} - -func (chain filterChain) Name() string { - var names []string - for i := range chain { - names = append(names, chain[i].Name()) - } - return strings.Join(names, ",") -} - -func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] { - // do nothing - return map[string]sets.Set[string]{} -} - -func (chain filterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { - for i := range chain { - obj = chain[i].Filter(obj, stopCh) - if yurtutil.IsNil(obj) { - break - } - } - - return obj -} - type responseFilter struct { objectFilter filter.ObjectFilter serializerManager *serializer.SerializerManager @@ -253,7 +220,7 @@ type responseFilter struct { func CreateResponseFilter(objectFilters []filter.ObjectFilter, serializerManager *serializer.SerializerManager) filter.ResponseFilter { return &responseFilter{ - objectFilter: createFilterChain(objectFilters), + objectFilter: objectfilter.CreateFilterChain(objectFilters), serializerManager: serializerManager, } } diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go index acdaaf889e8..cab7a38d16a 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go @@ -82,8 +82,12 @@ func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersion return nil, errors.Wrapf(err, "failed to get list from cache") } - if obj, err = sp.filterListObject(obj, sp.filterMgr.FindObjectFilters(r)); err != nil { - return nil, errors.Wrapf(err, "failed to filter list object") + if !yurtutil.IsNil(sp.filterFinder) { + if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { + if obj, err = sp.filterListObject(obj, objectFilter); err != nil { + return nil, errors.Wrapf(err, "failed to filter list object") + } + } } return obj, nil diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go index 95061ae70b1..fa20768a24d 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go @@ -46,11 +46,11 @@ import ( type multiplexerProxy struct { requestsMultiplexerManager multiplexer.MultiplexerManager - filterMgr filter.FilterFinder + filterFinder filter.FilterFinder stop <-chan struct{} } -func NewMultiplexerProxy(filterMgr filter.FilterFinder, +func NewMultiplexerProxy(filterFinder filter.FilterFinder, cacheManager multiplexer.MultiplexerManager, multiplexerResources []schema.GroupVersionResource, stop <-chan struct{}) (*multiplexerProxy, error) { @@ -58,7 +58,7 @@ func NewMultiplexerProxy(filterMgr filter.FilterFinder, sp := &multiplexerProxy{ stop: stop, requestsMultiplexerManager: cacheManager, - filterMgr: filterMgr, + filterFinder: filterFinder, } for _, gvr := range multiplexerResources { diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go index f90ba22b444..3263cb23fb6 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" + yurtutil "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -109,7 +110,13 @@ func (sp *multiplexerProxy) multiplexerWatch(w http.ResponseWriter, r *http.Requ } klog.V(3).InfoS("Starting watch", "path", r.URL.Path, "resourceVersion", listOpts.ResourceVersion, "labels", listOpts.LabelSelector, "fields", listOpts.FieldSelector, "timeout", timeout) - serveWatch(newFilterWatch(watcher, sp.filterMgr.FindObjectFilters(r)), reqScope, outputMediaType, r, w, timeout) + if !yurtutil.IsNil(sp.filterFinder) { + if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { + serveWatch(newFilterWatch(watcher, objectFilter), reqScope, outputMediaType, r, w, timeout) + return + } + } + serveWatch(watcher, reqScope, outputMediaType, r, w, timeout) } func getTimeout(opts *metainternalversion.ListOptions) time.Duration { diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go index 26f7489ccc5..a7cf6cb0ba4 100644 --- a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go @@ -29,8 +29,8 @@ func (fm *EmptyFilterManager) FindResponseFilter(req *http.Request) (filter.Resp return nil, false } -func (fm *EmptyFilterManager) FindObjectFilters(req *http.Request) filter.ObjectFilter { - return nil +func (fm *EmptyFilterManager) FindObjectFilter(req *http.Request) (filter.ObjectFilter, bool) { + return nil, false } type FakeEndpointSliceFilter struct { @@ -41,8 +41,8 @@ func (fm *FakeEndpointSliceFilter) FindResponseFilter(req *http.Request) (filter return nil, false } -func (fm *FakeEndpointSliceFilter) FindObjectFilters(req *http.Request) filter.ObjectFilter { +func (fm *FakeEndpointSliceFilter) FindObjectFilter(req *http.Request) (filter.ObjectFilter, bool) { return &IgnoreEndpointslicesWithNodeName{ fm.NodeName, - } + }, true } diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index d8dffbdf443..652935445b1 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -30,7 +30,7 @@ import ( yurtutil "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/transport" hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -44,7 +44,7 @@ const ( type YurtCoordinatorProxy struct { yurtCoordinatorProxy *util.RemoteProxy localCacheMgr cachemanager.CacheManager - filterMgr *manager.Manager + filterMgr filter.FilterFinder isCoordinatorReady func() bool stopCh <-chan struct{} } @@ -53,7 +53,7 @@ func NewYurtCoordinatorProxy( localCacheMgr cachemanager.CacheManager, transportMgrGetter func() transport.Interface, coordinatorServerURLGetter func() *url.URL, - filterMgr *manager.Manager, + filterMgr filter.FilterFinder, isCoordinatorReady func() bool, stopCh <-chan struct{}) (*YurtCoordinatorProxy, error) { diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 2fae2998b63..249e3e55113 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -98,7 +98,7 @@ func NewYurtReverseProxyHandler( transportMgr, coordinatorGetter, cloudHealthChecker, - yurtHubCfg.FilterManager, + yurtHubCfg.FilterFinder, yurtHubCfg.WorkingMode, stopCh) if err != nil { @@ -143,7 +143,7 @@ func NewYurtReverseProxyHandler( localCacheMgr, coordinatorTransportMgrGetter, coordinatorServerURLGetter, - yurtHubCfg.FilterManager, + yurtHubCfg.FilterFinder, isCoordinatorReady, stopCh) if err != nil { @@ -173,7 +173,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg.PostStartHooks = make(map[string]func() error) } yurtHubCfg.PostStartHooks[multiplexerProxyPostHookName] = func() error { - if yurtProxy.multiplexerProxy, err = multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterManager, + if yurtProxy.multiplexerProxy, err = multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterFinder, yurtHubCfg.RequestMultiplexerManager, yurtHubCfg.MultiplexerResources, stopCh); err != nil { diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 1ed7dc0da02..97dda15a8fa 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -32,7 +32,7 @@ import ( yurtutil "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/transport" @@ -133,7 +133,7 @@ type loadBalancer struct { backends []*util.RemoteProxy algo loadBalancerAlgo localCacheMgr cachemanager.CacheManager - filterManager *manager.Manager + filterFinder filter.FilterFinder coordinatorGetter func() yurtcoordinator.Coordinator workingMode hubutil.WorkingMode stopCh <-chan struct{} @@ -147,12 +147,12 @@ func NewLoadBalancer( transportMgr transport.Interface, coordinatorGetter func() yurtcoordinator.Coordinator, healthChecker healthchecker.MultipleBackendsHealthChecker, - filterManager *manager.Manager, + filterFinder filter.FilterFinder, workingMode hubutil.WorkingMode, stopCh <-chan struct{}) (LoadBalancer, error) { lb := &loadBalancer{ localCacheMgr: localCacheMgr, - filterManager: filterManager, + filterFinder: filterFinder, coordinatorGetter: coordinatorGetter, workingMode: workingMode, stopCh: stopCh, @@ -291,8 +291,8 @@ func (lb *loadBalancer) modifyResponse(resp *http.Response) error { req = req.WithContext(ctx) // filter response data - if lb.filterManager != nil { - if responseFilter, ok := lb.filterManager.FindResponseFilter(req); ok { + if !yurtutil.IsNil(lb.filterFinder) { + if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) if err != nil {