Skip to content

Commit

Permalink
feat: improve filter chain and FilterFinder interface (#2242)
Browse files Browse the repository at this point in the history
Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch authored Jan 6, 2025
1 parent 290d189 commit cea4306
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 76 deletions.
7 changes: 4 additions & 3 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate"
certificatemgr "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
Expand Down Expand Up @@ -95,7 +96,7 @@ type YurtHubConfiguration struct {
NodePoolInformerFactory dynamicinformer.DynamicSharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterManager *manager.Manager
FilterFinder filter.FilterFinder
CoordinatorServer *url.URL
MinRequestTimeout time.Duration
TenantNs string
Expand Down Expand Up @@ -157,7 +158,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations)
registerInformers(options, sharedFactory, workingMode, tenantNs)
filterManager, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager)
filterFinder, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager)
if err != nil {
klog.Errorf("could not create filter manager, %v", err)
return nil, err
Expand All @@ -181,7 +182,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
SharedFactory: sharedFactory,
NodePoolInformerFactory: dynamicSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterManager: filterManager,
FilterFinder: filterFinder,
MinRequestTimeout: options.MinRequestTimeout,
TenantNs: tenantNs,
YurtHubProxyServerAddr: fmt.Sprintf("%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort),
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type ObjectFilter interface {

type FilterFinder interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
FindObjectFilter(req *http.Request) (ObjectFilter, bool)
}

type NodeGetter func(name string) (*v1.Node, error)
17 changes: 10 additions & 7 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/approver"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/objectfilter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
Expand All @@ -46,7 +46,7 @@ func NewFilterManager(options *yurtoptions.YurtHubOptions,
sharedFactory informers.SharedInformerFactory,
dynamicSharedFactory dynamicinformer.DynamicSharedInformerFactory,
proxiedClient kubernetes.Interface,
serializerManager *serializer.SerializerManager) (*Manager, error) {
serializerManager *serializer.SerializerManager) (filter.FilterFinder, error) {
if !options.EnableResourceFilter {
return nil, nil
}
Expand Down Expand Up @@ -113,19 +113,22 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,
return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
func (m *Manager) FindObjectFilter(req *http.Request) (filter.ObjectFilter, bool) {
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil
return nil, false
}

objectFilters := make([]filter.ObjectFilter, 0)
for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)
}
}

filters := filterchain.FilterChain(objectFilters)
return filters
if len(objectFilters) == 0 {
return nil, false
}

return objectfilter.CreateFilterChain(objectFilters), true
}
137 changes: 134 additions & 3 deletions pkg/yurthub/filter/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func TestFindResponseFilter(t *testing.T) {
stopper := make(chan struct{})
defer close(stopper)

mgr, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager)
if tt.mgrIsNil && mgr == nil {
finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager)
if tt.mgrIsNil && finder == nil {
return
}

Expand All @@ -152,7 +152,7 @@ func TestFindResponseFilter(t *testing.T) {
var isFound bool
var responseFilter filter.ResponseFilter
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responseFilter, isFound = mgr.FindResponseFilter(req)
responseFilter, isFound = finder.FindResponseFilter(req)
})

handler = util.WithRequestClientComponent(handler, util2.WorkingModeEdge)
Expand All @@ -171,6 +171,137 @@ func TestFindResponseFilter(t *testing.T) {
}
}

func TestFindObjectFilter(t *testing.T) {
fakeClient := &fake.Clientset{}
scheme := runtime.NewScheme()
apis.AddToScheme(scheme)
fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme)
serializerManager := serializer.NewSerializerManager()

testcases := map[string]struct {
enableResourceFilter bool
workingMode string
disabledResourceFilters []string
enableDummyIf bool
userAgent string
verb string
path string
mgrIsNil bool
isFound bool
names sets.Set[string]
}{
"disable resource filter": {
enableResourceFilter: false,
mgrIsNil: true,
},
"get master service filter": {
enableResourceFilter: true,
enableDummyIf: true,
userAgent: "kubelet",
verb: "GET",
path: "/api/v1/services",
isFound: true,
names: sets.New("masterservice"),
},
"get discard cloud service and node port isolation filter": {
enableResourceFilter: true,
enableDummyIf: true,
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/services",
isFound: true,
names: sets.New("discardcloudservice", "nodeportisolation"),
},
"get service topology filter": {
enableResourceFilter: true,
enableDummyIf: false,
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/endpoints",
isFound: true,
names: sets.New("servicetopology"),
},
"disable service topology filter": {
enableResourceFilter: true,
disabledResourceFilters: []string{"servicetopology"},
enableDummyIf: true,
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/endpoints",
isFound: false,
},
"can't get discard cloud service filter in cloud mode": {
enableResourceFilter: true,
workingMode: "cloud",
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/services",
isFound: true,
names: sets.New("nodeportisolation"),
},
}

resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
options := &options.YurtHubOptions{
EnableResourceFilter: tt.enableResourceFilter,
WorkingMode: tt.workingMode,
DisabledResourceFilters: make([]string, 0),
EnableDummyIf: tt.enableDummyIf,
NodeName: "test",
YurtHubProxySecurePort: 10268,
HubAgentDummyIfIP: "127.0.0.1",
YurtHubProxyHost: "127.0.0.1",
}
options.DisabledResourceFilters = append(options.DisabledResourceFilters, tt.disabledResourceFilters...)

sharedFactory, nodePoolFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour),
dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, 24*time.Hour)

stopper := make(chan struct{})
defer close(stopper)

finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager)
if tt.mgrIsNil && finder == nil {
return
}

sharedFactory.Start(stopper)
nodePoolFactory.Start(stopper)

req, err := http.NewRequest(tt.verb, tt.path, nil)
if err != nil {
t.Errorf("failed to create request, %v", err)
}
req.RemoteAddr = "127.0.0.1"

if len(tt.userAgent) != 0 {
req.Header.Set("User-Agent", tt.userAgent)
}

var isFound bool
var objectFilter filter.ObjectFilter
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
objectFilter, isFound = finder.FindObjectFilter(req)
})

handler = util.WithRequestClientComponent(handler, util2.WorkingModeEdge)
handler = filters.WithRequestInfo(handler, resolver)
handler.ServeHTTP(httptest.NewRecorder(), req)

if !tt.isFound && isFound == tt.isFound {
return
}

names := strings.Split(objectFilter.Name(), ",")
if !tt.names.Equal(sets.New(names...)) {
t.Errorf("expect filter names %v, but got %v", sets.List(tt.names), names)
}
})
}
}

func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package filterchain
package objectfilter

import (
"strings"
Expand All @@ -26,22 +26,28 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
)

type FilterChain []filter.ObjectFilter
type filterChain []filter.ObjectFilter

func (chain FilterChain) Name() string {
func CreateFilterChain(objFilters []filter.ObjectFilter) filter.ObjectFilter {
chain := make(filterChain, 0)
chain = append(chain, objFilters...)
return chain
}

func (chain filterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {
func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
func (chain filterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
Expand Down
37 changes: 2 additions & 35 deletions pkg/yurthub/filter/responsefilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ import (
"errors"
"io"
"net/http"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/objectfilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)
Expand Down Expand Up @@ -214,46 +213,14 @@ func createSerializer(respContentType string, info *apirequest.RequestInfo, sm *
return sm.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource)
}

type filterChain []filter.ObjectFilter

func createFilterChain(objFilters []filter.ObjectFilter) filter.ObjectFilter {
chain := make(filterChain, 0)
chain = append(chain, objFilters...)
return chain
}

func (chain filterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain filterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break
}
}

return obj
}

type responseFilter struct {
objectFilter filter.ObjectFilter
serializerManager *serializer.SerializerManager
}

func CreateResponseFilter(objectFilters []filter.ObjectFilter, serializerManager *serializer.SerializerManager) filter.ResponseFilter {
return &responseFilter{
objectFilter: createFilterChain(objectFilters),
objectFilter: objectfilter.CreateFilterChain(objectFilters),
serializerManager: serializerManager,
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/yurthub/proxy/multiplexer/multiplexerlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersion
return nil, errors.Wrapf(err, "failed to get list from cache")
}

if obj, err = sp.filterListObject(obj, sp.filterMgr.FindObjectFilters(r)); err != nil {
return nil, errors.Wrapf(err, "failed to filter list object")
if !yurtutil.IsNil(sp.filterFinder) {
if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists {
if obj, err = sp.filterListObject(obj, objectFilter); err != nil {
return nil, errors.Wrapf(err, "failed to filter list object")
}
}
}

return obj, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurthub/proxy/multiplexer/multiplexerproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ import (

type multiplexerProxy struct {
requestsMultiplexerManager multiplexer.MultiplexerManager
filterMgr filter.FilterFinder
filterFinder filter.FilterFinder
stop <-chan struct{}
}

func NewMultiplexerProxy(filterMgr filter.FilterFinder,
func NewMultiplexerProxy(filterFinder filter.FilterFinder,
cacheManager multiplexer.MultiplexerManager,
multiplexerResources []schema.GroupVersionResource,
stop <-chan struct{}) (*multiplexerProxy, error) {

sp := &multiplexerProxy{
stop: stop,
requestsMultiplexerManager: cacheManager,
filterMgr: filterMgr,
filterFinder: filterFinder,
}

for _, gvr := range multiplexerResources {
Expand Down
Loading

0 comments on commit cea4306

Please sign in to comment.